diff options
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/membership.rs | 156 | ||||
-rw-r--r-- | src/proto.rs | 1 |
3 files changed, 83 insertions, 76 deletions
diff --git a/src/main.rs b/src/main.rs index 0fcda4e0..05b0a73a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,8 @@ pub struct Opt { #[derive(Deserialize, Debug)] pub struct Config { + datacenter: String, + metadata_dir: PathBuf, data_dir: PathBuf, diff --git a/src/membership.rs b/src/membership.rs index e468c9b0..7aaa0759 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -41,30 +41,17 @@ impl Members { NodeStatus{ addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, + datacenter: info.datacenter.clone(), }); match old_status { None => { - eprintln!("Discovered new node (ping): {}", hex::encode(info.id)); + eprintln!("Newly pingable node: {}", hex::encode(info.id)); true } Some(x) => x.addr != addr, } } - fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool { - if !self.status.contains_key(id) { - eprintln!("Discovered new node (advertisment): {}", hex::encode(id)); - self.status.insert(id.clone(), - NodeStatus{ - addr: addr.clone(), - remaining_ping_attempts: MAX_FAILED_PINGS, - }); - true - } else { - false - } - } - fn recalculate_status_hash(&mut self) { let mut nodes = self.status.iter().collect::<Vec<_>>(); nodes.sort_by_key(|(id, _status)| *id); @@ -72,7 +59,7 @@ impl Members { let mut hasher = Sha256::new(); eprintln!("Current set of pingable nodes: --"); for (id, status) in nodes { - eprintln!("{} {}", hex::encode(id), status.addr); + eprintln!("{} {} ({})", hex::encode(id), status.addr, status.datacenter); hasher.input(format!("{} {}\n", hex::encode(id), status.addr)); } eprintln!("END --"); @@ -82,6 +69,7 @@ impl Members { pub struct NodeStatus { pub addr: SocketAddr, + pub datacenter: String, pub remaining_ping_attempts: usize, } @@ -109,6 +97,7 @@ impl System { let members = self.members.read().await; Message::Ping(PingMessage{ id: self.id, + datacenter: self.config.datacenter.clone(), rpc_port: self.config.rpc_port, status_hash: members.status_hash.clone(), config_version: members.config.version, @@ -123,27 +112,69 @@ impl System { } pub async fn bootstrap(self: Arc<Self>) { + let bootstrap_peers = self.config.bootstrap_peers + .iter() + .map(|ip| (ip.clone(), None)) + .collect::<Vec<_>>(); + self.clone().ping_nodes(bootstrap_peers).await; + + tokio::spawn(self.ping_loop()); + } + + pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { let ping_msg = self.make_ping().await; let ping_resps = join_all( - self.config.bootstrap_peers.iter().cloned() - .map(|to| { + peers.iter() + .map(|(addr, id_option)| { let sys = self.clone(); let ping_msg_ref = &ping_msg; async move { - (to.clone(), rpc_call_addr(sys, &to, ping_msg_ref, PING_TIMEOUT).await) + (id_option, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await) } })).await; let mut members = self.members.write().await; - for (addr, ping_resp) in ping_resps { + + let mut has_changes = false; + let mut to_advertise = vec![]; + + for (id_option, addr, ping_resp) in ping_resps { if let Ok(Message::Ping(info)) = ping_resp { - members.handle_ping(addr.ip(), &info); + let is_new = members.handle_ping(addr.ip(), &info); + if is_new { + has_changes = true; + to_advertise.push(AdvertisedNode{ + id: info.id.clone(), + addr: addr.clone(), + }); + } + if is_new || members.status_hash != info.status_hash { + tokio::spawn(self.clone().pull_status(info.id.clone())); + } + if is_new || members.config.version < info.config_version { + tokio::spawn(self.clone().pull_config(info.id.clone())); + } + } else if let Some(id) = id_option { + let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0); + if remaining_attempts == 0 { + eprintln!("Removing node {} after too many failed pings", hex::encode(id)); + members.status.remove(id); + has_changes = true; + } else { + if let Some(st) = members.status.get_mut(id) { + st.remaining_ping_attempts = remaining_attempts - 1; + } + } } } - members.recalculate_status_hash(); + if has_changes { + members.recalculate_status_hash(); + } drop(members); - tokio::spawn(self.ping_loop()); + if to_advertise.len() > 0 { + self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await; + } } pub async fn handle_ping(self: Arc<Self>, @@ -191,21 +222,36 @@ impl System { adv: &[AdvertisedNode]) -> Result<Message, Error> { - let mut propagate = vec![]; + let mut to_ping = vec![]; let mut members = self.members.write().await; + let mut has_changed = false; + for node in adv.iter() { - let is_new = members.handle_advertise_node(&node.id, &node.addr); - if is_new { - tokio::spawn(self.clone().pull_status(node.id.clone())); - tokio::spawn(self.clone().pull_config(node.id.clone())); - propagate.push(node.clone()); + if node.id == self.id { + // learn our own ip address + let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); + let old_self = members.status.insert(node.id.clone(), + NodeStatus{ + addr: self_addr, + datacenter: self.config.datacenter.clone(), + remaining_ping_attempts: MAX_FAILED_PINGS, + }); + has_changed = match old_self { + None => true, + Some(x) => x.addr != self_addr, + }; + } else if !members.status.contains_key(&node.id) { + to_ping.push((node.addr.clone(), Some(node.id.clone()))); } } - - if propagate.len() > 0 { + if has_changed { members.recalculate_status_hash(); - tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); + } + drop(members); + + if to_ping.len() > 0 { + tokio::spawn(self.clone().ping_nodes(to_ping)); } Ok(Message::Ok) @@ -231,53 +277,11 @@ impl System { let members = self.members.read().await; let ping_addrs = members.status.iter() .filter(|(id, _)| **id != self.id) - .map(|(id, status)| (id.clone(), status.addr.clone())) + .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) .collect::<Vec<_>>(); drop(members); - let ping_msg = self.make_ping().await; - let ping_resps = join_all( - ping_addrs.iter() - .map(|(id, addr)| { - let sys = self.clone(); - let ping_msg_ref = &ping_msg; - async move { - (id, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await) - } - })).await; - - let mut members = self.members.write().await; - let mut has_changes = false; - - for (id, addr, ping_resp) in ping_resps { - if let Ok(Message::Ping(ping)) = ping_resp { - let is_new = members.handle_ping(addr.ip(), &ping); - if is_new { - has_changes = true; - } - if is_new || members.status_hash != ping.status_hash { - tokio::spawn(self.clone().pull_status(ping.id.clone())); - } - if is_new || members.config.version < ping.config_version { - tokio::spawn(self.clone().pull_config(ping.id.clone())); - } - } else { - let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0); - if remaining_attempts == 0 { - eprintln!("Removing node {} after too many failed pings", hex::encode(id)); - members.status.remove(id); - has_changes = true; - } else { - if let Some(st) = members.status.get_mut(id) { - st.remaining_ping_attempts = remaining_attempts - 1; - } - } - } - } - if has_changes { - members.recalculate_status_hash(); - } - drop(members); + self.clone().ping_nodes(ping_addrs).await; restart_at.await } diff --git a/src/proto.rs b/src/proto.rs index 8b60784e..3a950c6c 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -18,6 +18,7 @@ pub enum Message { #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { pub id: UUID, + pub datacenter: String, pub rpc_port: u16, pub status_hash: Hash, |