diff options
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 99 |
1 files changed, 87 insertions, 12 deletions
diff --git a/src/membership.rs b/src/membership.rs index aa51e0fa..1ce567a7 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,4 +1,6 @@ use std::sync::Arc; +use std::hash::Hash as StdHash; +use std::hash::Hasher; use std::path::PathBuf; use std::io::{Read}; use std::collections::HashMap; @@ -34,6 +36,19 @@ pub struct Members { pub status_hash: Hash, pub config: NetworkConfig, + pub ring: Vec<RingEntry>, + pub n_datacenters: usize, +} + +pub struct NodeStatus { + pub addr: SocketAddr, + pub remaining_ping_attempts: usize, +} + +pub struct RingEntry { + pub location: Hash, + pub node: UUID, + pub datacenter: u64, } impl Members { @@ -43,7 +58,6 @@ impl Members { NodeStatus{ addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - datacenter: info.datacenter.clone(), }); match old_status { None => { @@ -61,18 +75,78 @@ impl Members { let mut hasher = Sha256::new(); eprintln!("Current set of pingable nodes: --"); for (id, status) in nodes { - eprintln!("{} {} ({})", hex::encode(id), status.addr, status.datacenter); + eprintln!("{} {}", hex::encode(id), status.addr); hasher.input(format!("{} {}\n", hex::encode(id), status.addr)); } eprintln!("END --"); self.status_hash.copy_from_slice(&hasher.result()[..]); } -} -pub struct NodeStatus { - pub addr: SocketAddr, - pub datacenter: String, - pub remaining_ping_attempts: usize, + fn rebuild_ring(&mut self) { + let mut new_ring = vec![]; + let mut datacenters = vec![]; + + for (id, config) in self.config.members.iter() { + let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); + config.datacenter.hash(&mut dc_hasher); + let datacenter = dc_hasher.finish(); + + if !datacenters.contains(&datacenter) { + datacenters.push(datacenter); + } + + for i in 0..config.n_tokens { + let mut location_hasher = Sha256::new(); + location_hasher.input(format!("{} {}", hex::encode(id), i)); + let mut location = [0u8; 32]; + location.copy_from_slice(&location_hasher.result()[..]); + + new_ring.push(RingEntry{ + location, + node: id.clone(), + datacenter, + }) + } + } + + new_ring.sort_by_key(|x| x.location); + self.ring = new_ring; + self.n_datacenters = datacenters.len(); + } + + fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { + if n >= self.config.members.len() { + return self.config.members.keys().cloned().collect::<Vec<_>>(); + } + + let start = match self.ring.binary_search_by_key(from, |x| x.location) { + Ok(i) => i, + Err(i) => if i == 0 { + self.ring.len() - 1 + } else { + i - 1 + } + }; + let mut ret = vec![]; + let mut datacenters = vec![]; + + for delta in 0..self.ring.len() { + if ret.len() == n { + break; + } + + let i = (start + delta) % self.ring.len(); + + if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { + ret.push(self.ring[i].node.clone()); + } else if !datacenters.contains(&self.ring[i].datacenter) { + ret.push(self.ring[i].node.clone()); + datacenters.push(self.ring[i].datacenter); + } + } + + ret + } } fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> { @@ -106,8 +180,11 @@ impl System { status: HashMap::new(), status_hash: [0u8; 32], config: net_config, + ring: Vec::new(), + n_datacenters: 0, }; members.recalculate_status_hash(); + members.rebuild_ring(); System{ config, id, @@ -135,7 +212,6 @@ impl System { let members = self.members.read().await; Message::Ping(PingMessage{ id: self.id, - datacenter: self.config.datacenter.clone(), rpc_port: self.config.rpc_port, status_hash: members.status_hash.clone(), config_version: members.config.version, @@ -184,7 +260,6 @@ impl System { to_advertise.push(AdvertisedNode{ id: info.id.clone(), addr: addr.clone(), - datacenter: info.datacenter.clone(), }); } if is_new || members.status_hash != info.status_hash { @@ -247,7 +322,6 @@ impl System { mem.push(AdvertisedNode{ id: node.clone(), addr: status.addr.clone(), - datacenter: status.datacenter.clone(), }); } Ok(Message::AdvertiseNodesUp(mem)) @@ -274,7 +348,6 @@ impl System { let old_self = members.status.insert(node.id.clone(), NodeStatus{ addr: self_addr, - datacenter: self.config.datacenter.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, }); has_changed = match old_self { @@ -303,9 +376,11 @@ impl System { { let mut members = self.members.write().await; if adv.version > members.config.version { - members.config = adv.clone(); tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); + + members.config = adv.clone(); self.save_network_config().await; + members.rebuild_ring(); } Ok(Message::Ok) |