diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 25d7561989..a8c0aea950 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -569,10 +569,11 @@ def __init__(self, options_map): def make_token_replica_map(self, token_to_host_owner, ring): replica_map = {} - for i in range(len(ring)): + ring_len = len(ring) + for i in range(ring_len): j, hosts = 0, list() - while len(hosts) < self.replication_factor and j < len(ring): - token = ring[(i + j) % len(ring)] + while len(hosts) < self.replication_factor and j < ring_len: + token = ring[(i + j) % ring_len] host = token_to_host_owner[token] if host not in hosts: hosts.append(host) @@ -629,10 +630,14 @@ def make_token_replica_map(self, token_to_host_owner, ring): hosts_per_dc = defaultdict(set) for i, token in enumerate(ring): host = token_to_host_owner[token] - dc_to_token_offset[host.datacenter].append(i) - if host.datacenter and host.rack: - dc_racks[host.datacenter].add(host.rack) - hosts_per_dc[host.datacenter].add(host) + host_dc = host.datacenter + if host_dc in dc_rf_map: + # if the host is in a DC that has a replication factor, add it + # to the list of token offsets for that DC + dc_to_token_offset[host_dc].append(i) + if host.rack: + dc_racks[host_dc].add(host.rack) + hosts_per_dc[host_dc].add(host) # A map of DCs to an index into the dc_to_token_offset value for that dc. # This is how we keep track of advancing around the ring for each DC. @@ -644,8 +649,6 @@ def make_token_replica_map(self, token_to_host_owner, ring): # go through each DC and find the replicas in that DC for dc in dc_to_token_offset.keys(): - if dc not in dc_rf_map: - continue # advance our per-DC index until we're up to at least the # current token in the ring @@ -657,34 +660,34 @@ def make_token_replica_map(self, token_to_host_owner, ring): dc_to_current_index[dc] = index replicas_remaining = dc_rf_map[dc] - replicas_this_dc = 0 + num_replicas_this_dc = 0 skipped_hosts = [] racks_placed = set() - racks_this_dc = dc_racks[dc] - hosts_this_dc = len(hosts_per_dc[dc]) + num_racks_this_dc = len(dc_racks[dc]) + num_hosts_this_dc = len(hosts_per_dc[dc]) for token_offset_index in range(index, index+num_tokens): - if token_offset_index >= len(token_offsets): - token_offset_index = token_offset_index - len(token_offsets) + if replicas_remaining == 0 or num_replicas_this_dc == num_hosts_this_dc: + break + + if token_offset_index >= num_tokens: + token_offset_index = token_offset_index - num_tokens token_offset = token_offsets[token_offset_index] host = token_to_host_owner[ring[token_offset]] - if replicas_remaining == 0 or replicas_this_dc == hosts_this_dc: - break - if host in replicas: continue - if host.rack in racks_placed and len(racks_placed) < len(racks_this_dc): + if host.rack in racks_placed and len(racks_placed) < num_racks_this_dc: skipped_hosts.append(host) continue replicas.append(host) - replicas_this_dc += 1 + num_replicas_this_dc += 1 replicas_remaining -= 1 racks_placed.add(host.rack) - if len(racks_placed) == len(racks_this_dc): + if len(racks_placed) == num_racks_this_dc: for host in skipped_hosts: if replicas_remaining == 0: break