From 3477864142ed09c36abea1111937b829fb41c8a4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 10 Apr 2020 22:01:48 +0200 Subject: Fix the Sync issue. Details: So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type. --- src/membership.rs | 340 +++++++++++++++++++++++++++++------------------------- 1 file changed, 186 insertions(+), 154 deletions(-) (limited to 'src/membership.rs') diff --git a/src/membership.rs b/src/membership.rs index 3f7a84c4..b713c7a4 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,22 +1,22 @@ -use std::sync::Arc; +use std::collections::HashMap; use std::hash::Hash as StdHash; use std::hash::Hasher; +use std::io::Read; +use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; -use std::io::{Read}; -use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; -use std::net::{IpAddr, SocketAddr}; -use sha2::{Sha256, Digest}; -use tokio::prelude::*; use futures::future::join_all; +use sha2::{Digest, Sha256}; +use tokio::prelude::*; use tokio::sync::RwLock; -use crate::server::Config; -use crate::error::Error; use crate::data::*; +use crate::error::Error; use crate::proto::*; use crate::rpc_client::*; +use crate::server::Config; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); @@ -36,8 +36,8 @@ pub struct Members { pub status_hash: Hash, pub config: NetworkConfig, - pub ring: Vec, - pub n_datacenters: usize, + pub ring: Vec, + pub n_datacenters: usize, } pub struct NodeStatus { @@ -47,19 +47,21 @@ pub struct NodeStatus { #[derive(Debug)] pub struct RingEntry { - pub location: Hash, - pub node: UUID, - pub datacenter: u64, + pub location: Hash, + pub node: UUID, + pub datacenter: u64, } impl Members { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); - let old_status = self.status.insert(info.id.clone(), - NodeStatus{ + let old_status = self.status.insert( + info.id.clone(), + NodeStatus { addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - }); + }, + ); match old_status { None => { eprintln!("Newly pingable node: {}", hex::encode(&info.id)); @@ -80,122 +82,129 @@ impl Members { hasher.input(format!("{} {}\n", hex::encode(&id), status.addr)); } eprintln!("END --"); - self.status_hash.as_slice_mut().copy_from_slice(&hasher.result()[..]); + self.status_hash + .as_slice_mut() + .copy_from_slice(&hasher.result()[..]); } - fn rebuild_ring(&mut self) { - let mut new_ring = vec![]; - let mut datacenters = vec![]; + fn rebuild_ring(&mut self) { + let mut new_ring = vec![]; + let mut datacenters = vec![]; - for (id, config) in self.config.members.iter() { - let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); - config.datacenter.hash(&mut dc_hasher); - let datacenter = dc_hasher.finish(); + for (id, config) in self.config.members.iter() { + let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); + config.datacenter.hash(&mut dc_hasher); + let datacenter = dc_hasher.finish(); - if !datacenters.contains(&datacenter) { - datacenters.push(datacenter); - } + if !datacenters.contains(&datacenter) { + datacenters.push(datacenter); + } - for i in 0..config.n_tokens { - let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); + for i in 0..config.n_tokens { + let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); - new_ring.push(RingEntry{ - location: location.into(), - node: id.clone(), - datacenter, - }) - } - } + new_ring.push(RingEntry { + location: location.into(), + node: id.clone(), + datacenter, + }) + } + } - new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); - self.ring = new_ring; - self.n_datacenters = datacenters.len(); + new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); + self.ring = new_ring; + self.n_datacenters = datacenters.len(); eprintln!("RING: --"); for e in self.ring.iter() { eprintln!("{:?}", e); } eprintln!("END --"); - } - - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::>(); - } - - let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { - Ok(i) => i, - Err(i) => if i == 0 { - self.ring.len() - 1 - } else { - i - 1 - } - }; + } + + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { + if n >= self.config.members.len() { + return self.config.members.keys().cloned().collect::>(); + } + + let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { + Ok(i) => i, + Err(i) => { + if i == 0 { + self.ring.len() - 1 + } else { + i - 1 + } + } + }; self.walk_ring_from_pos(start, n) - } + } fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec { - let mut ret = vec![]; - let mut datacenters = vec![]; + let mut ret = vec![]; + let mut datacenters = vec![]; - for delta in 0..self.ring.len() { - if ret.len() == n { - break; - } + for delta in 0..self.ring.len() { + if ret.len() == n { + break; + } - let i = (start + delta) % self.ring.len(); + let i = (start + delta) % self.ring.len(); - if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { - ret.push(self.ring[i].node.clone()); - } else if !datacenters.contains(&self.ring[i].datacenter) { - ret.push(self.ring[i].node.clone()); - datacenters.push(self.ring[i].datacenter); - } - } + if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { + ret.push(self.ring[i].node.clone()); + } else if !datacenters.contains(&self.ring[i].datacenter) { + ret.push(self.ring[i].node.clone()); + datacenters.push(self.ring[i].datacenter); + } + } - ret + ret } } fn read_network_config(metadata_dir: &PathBuf) -> Result { - let mut path = metadata_dir.clone(); - path.push("network_config"); + let mut path = metadata_dir.clone(); + path.push("network_config"); let mut file = std::fs::OpenOptions::new() .read(true) .open(path.as_path())?; - + let mut net_config_bytes = vec![]; file.read_to_end(&mut net_config_bytes)?; - let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; + let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; Ok(net_config) } impl System { pub fn new(config: Config, id: UUID) -> Self { - let net_config = match read_network_config(&config.metadata_dir) { - Ok(x) => x, - Err(e) => { - println!("No valid previous network configuration stored ({}), starting fresh.", e); - NetworkConfig{ + let net_config = match read_network_config(&config.metadata_dir) { + Ok(x) => x, + Err(e) => { + println!( + "No valid previous network configuration stored ({}), starting fresh.", + e + ); + NetworkConfig { members: HashMap::new(), version: 0, } - }, - }; - let mut members = Members{ - status: HashMap::new(), - status_hash: Hash::default(), - config: net_config, - ring: Vec::new(), - n_datacenters: 0, - }; + } + }; + let mut members = Members { + status: HashMap::new(), + status_hash: Hash::default(), + config: net_config, + ring: Vec::new(), + n_datacenters: 0, + }; members.recalculate_status_hash(); - members.rebuild_ring(); - System{ + members.rebuild_ring(); + System { config, id, rpc_client: RpcClient::new(), @@ -203,24 +212,26 @@ impl System { } } - async fn save_network_config(self: Arc) { - let mut path = self.config.metadata_dir.clone(); - path.push("network_config"); + async fn save_network_config(self: Arc) { + let mut path = self.config.metadata_dir.clone(); + path.push("network_config"); - let members = self.members.read().await; - let data = rmp_to_vec_all_named(&members.config) - .expect("Error while encoding network config"); - drop(members); + let members = self.members.read().await; + let data = + rmp_to_vec_all_named(&members.config).expect("Error while encoding network config"); + drop(members); - let mut f = tokio::fs::File::create(path.as_path()).await - .expect("Could not create network_config"); - f.write_all(&data[..]).await - .expect("Could not write network_config"); - } + let mut f = tokio::fs::File::create(path.as_path()) + .await + .expect("Could not create network_config"); + f.write_all(&data[..]) + .await + .expect("Could not write network_config"); + } pub async fn make_ping(&self) -> Message { let members = self.members.read().await; - Message::Ping(PingMessage{ + Message::Ping(PingMessage { id: self.id.clone(), rpc_port: self.config.rpc_port, status_hash: members.status_hash.clone(), @@ -230,13 +241,20 @@ impl System { pub async fn broadcast(self: Arc, msg: Message, timeout: Duration) { let members = self.members.read().await; - let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::>(); + let to = members + .status + .keys() + .filter(|x| **x != self.id) + .cloned() + .collect::>(); drop(members); rpc_call_many(self.clone(), &to[..], &msg, timeout).await; } pub async fn bootstrap(self: Arc) { - let bootstrap_peers = self.config.bootstrap_peers + let bootstrap_peers = self + .config + .bootstrap_peers .iter() .map(|ip| (ip.clone(), None)) .collect::>(); @@ -247,16 +265,19 @@ impl System { pub async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { let ping_msg = self.make_ping().await; - let ping_resps = join_all( - peers.iter() - .map(|(addr, id_option)| { - let sys = self.clone(); - let ping_msg_ref = &ping_msg; - async move { - (id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await) - } - })).await; - + let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { + let sys = self.clone(); + let ping_msg_ref = &ping_msg; + async move { + ( + id_option, + addr.clone(), + sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await, + ) + } + })) + .await; + let mut members = self.members.write().await; let mut has_changes = false; @@ -267,7 +288,7 @@ impl System { let is_new = members.handle_ping(addr.ip(), &info); if is_new { has_changes = true; - to_advertise.push(AdvertisedNode{ + to_advertise.push(AdvertisedNode { id: info.id.clone(), addr: addr.clone(), }); @@ -279,9 +300,16 @@ impl System { tokio::spawn(self.clone().pull_config(info.id.clone())); } } else if let Some(id) = id_option { - let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0); + let remaining_attempts = members + .status + .get(id) + .map(|x| x.remaining_ping_attempts) + .unwrap_or(0); if remaining_attempts == 0 { - eprintln!("Removing node {} after too many failed pings", hex::encode(&id)); + eprintln!( + "Removing node {} after too many failed pings", + hex::encode(&id) + ); members.status.remove(&id); has_changes = true; } else { @@ -297,15 +325,16 @@ impl System { drop(members); if to_advertise.len() > 0 { - self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await; + self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT) + .await; } } - pub async fn handle_ping(self: Arc, - from: &SocketAddr, - ping: &PingMessage) - -> Result - { + pub async fn handle_ping( + self: Arc, + from: &SocketAddr, + ping: &PingMessage, + ) -> Result { let mut members = self.members.write().await; let is_new = members.handle_ping(from.ip(), ping); if is_new { @@ -329,7 +358,7 @@ impl System { let members = self.members.read().await; let mut mem = vec![]; for (node, status) in members.status.iter() { - mem.push(AdvertisedNode{ + mem.push(AdvertisedNode { id: node.clone(), addr: status.addr.clone(), }); @@ -342,10 +371,10 @@ impl System { Ok(Message::AdvertiseConfig(members.config.clone())) } - pub async fn handle_advertise_nodes_up(self: Arc, - adv: &[AdvertisedNode]) - -> Result - { + pub async fn handle_advertise_nodes_up( + self: Arc, + adv: &[AdvertisedNode], + ) -> Result { let mut to_ping = vec![]; let mut members = self.members.write().await; @@ -355,11 +384,13 @@ impl System { if node.id == self.id { // learn our own ip address let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); - let old_self = members.status.insert(node.id.clone(), - NodeStatus{ + let old_self = members.status.insert( + node.id.clone(), + NodeStatus { addr: self_addr, remaining_ping_attempts: MAX_FAILED_PINGS, - }); + }, + ); has_changed = match old_self { None => true, Some(x) => x.addr != self_addr, @@ -380,18 +411,20 @@ impl System { Ok(Message::Ok) } - pub async fn handle_advertise_config(self: Arc, - adv: &NetworkConfig) - -> Result - { + pub async fn handle_advertise_config( + self: Arc, + adv: &NetworkConfig, + ) -> Result { let mut members = self.members.write().await; if adv.version > members.config.version { - members.config = adv.clone(); - members.rebuild_ring(); + members.rebuild_ring(); - tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); - tokio::spawn(self.clone().save_network_config()); + tokio::spawn( + self.clone() + .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT), + ); + tokio::spawn(self.clone().save_network_config()); } Ok(Message::Ok) @@ -400,12 +433,14 @@ impl System { pub async fn ping_loop(self: Arc) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); - + let members = self.members.read().await; - let ping_addrs = members.status.iter() - .filter(|(id, _)| **id != self.id) - .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) - .collect::>(); + let ping_addrs = members + .status + .iter() + .filter(|(id, _)| **id != self.id) + .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) + .collect::>(); drop(members); self.clone().ping_nodes(ping_addrs).await; @@ -414,12 +449,12 @@ impl System { } } - pub fn pull_status(self: Arc, peer: UUID) -> impl futures::future::Future + Send + 'static { + pub fn pull_status( + self: Arc, + peer: UUID, + ) -> impl futures::future::Future + Send + 'static { async move { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullStatus, - PING_TIMEOUT).await; + let resp = rpc_call(self.clone(), &peer, &Message::PullStatus, PING_TIMEOUT).await; if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await; } @@ -427,10 +462,7 @@ impl System { } pub async fn pull_config(self: Arc, peer: UUID) { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullConfig, - PING_TIMEOUT).await; + let resp = rpc_call(self.clone(), &peer, &Message::PullConfig, PING_TIMEOUT).await; if let Ok(Message::AdvertiseConfig(config)) = resp { let _: Result<_, _> = self.handle_advertise_config(&config).await; } -- cgit v1.2.3