diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-06 22:54:03 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-06 22:54:03 +0200 |
commit | a09f019cc5370cedffa5adf146a3899a8fa6cd90 (patch) | |
tree | 2f8b263031798103bfd7cc2abc90c41db03d4599 /src | |
parent | 87f2b4d2fc8835f8adda69adb6b51fd73ffb20a5 (diff) | |
download | garage-a09f019cc5370cedffa5adf146a3899a8fa6cd90.tar.gz garage-a09f019cc5370cedffa5adf146a3899a8fa6cd90.zip |
Seems to be fixed
Diffstat (limited to 'src')
-rw-r--r-- | src/membership.rs | 42 | ||||
-rw-r--r-- | src/rpc.rs | 23 |
2 files changed, 29 insertions, 36 deletions
diff --git a/src/membership.rs b/src/membership.rs index b025c2bb..8b067686 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -4,7 +4,6 @@ use std::time::Duration; use std::net::{IpAddr, SocketAddr}; use futures::future::join_all; -use futures::stream::StreamExt; use hyper::client::Client; use tokio::sync::RwLock; use sha2::{Sha256, Digest}; @@ -37,15 +36,24 @@ pub struct Members { impl Members { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { - self.status.insert(info.id.clone(), + let addr = SocketAddr::new(ip, info.rpc_port); + let old_status = self.status.insert(info.id.clone(), NodeStatus{ - addr: SocketAddr::new(ip, info.rpc_port), + addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - }).is_none() + }); + match old_status { + None => { + eprintln!("Discovered new node (ping): {}", hex::encode(info.id)); + true + } + Some(x) => x.addr != addr, + } } fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool { if !self.status.contains_key(id) { + eprintln!("Discovered new node (advertisment): {}", hex::encode(id)); self.status.insert(id.clone(), NodeStatus{ addr: addr.clone(), @@ -104,7 +112,7 @@ impl System { pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) { let members = self.members.read().await; - let to = members.status.keys().cloned().collect::<Vec<_>>(); + let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>(); drop(members); rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await; } @@ -140,7 +148,9 @@ impl System { { let mut members = self.members.write().await; let is_new = members.handle_ping(from.ip(), ping); - members.recalculate_status_hash(); + if is_new { + members.recalculate_status_hash(); + } let status_hash = members.status_hash.clone(); let config_version = members.config.version; drop(members); @@ -187,6 +197,7 @@ impl System { propagate.push(node.clone()); } } + drop(members); if propagate.len() > 0 { tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); @@ -214,6 +225,7 @@ impl System { let members = self.members.read().await; let ping_addrs = members.status.iter() + .filter(|(id, _)| **id != self.id) .map(|(id, status)| (id.clone(), status.addr.clone())) .collect::<Vec<_>>(); drop(members); @@ -257,13 +269,15 @@ impl System { } } - pub async fn pull_status(self: Arc<Self>, peer: UUID) { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullStatus, - PING_TIMEOUT).await; - if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { - self.handle_advertise_nodes_up(&nodes).await; + pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static { + async move { + let resp = rpc_call(self.clone(), + &peer, + &Message::PullStatus, + PING_TIMEOUT).await; + if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { + let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await; + } } } @@ -273,7 +287,7 @@ impl System { &Message::PullConfig, PING_TIMEOUT).await; if let Ok(Message::AdvertiseConfig(config)) = resp { - self.handle_advertise_config(&config).await; + let _: Result<_, _> = self.handle_advertise_config(&config).await; } } } @@ -24,31 +24,10 @@ pub async fn rpc_call_many(sys: Arc<System>, timeout: Duration) -> Vec<Result<Message, Error>> { - let resp_stream = to.iter() + let mut resp_stream = to.iter() .map(|to| rpc_call(sys.clone(), to, msg, timeout)) .collect::<FuturesUnordered<_>>(); - collect_rpc_results(resp_stream, stop_after).await -} - -pub async fn rpc_call_many_addr(sys: Arc<System>, - to: &[SocketAddr], - msg: &Message, - stop_after: Option<usize>, - timeout: Duration) - -> Vec<Result<Message, Error>> -{ - let resp_stream = to.iter() - .map(|to| rpc_call_addr(sys.clone(), to, msg, timeout)) - .collect::<FuturesUnordered<_>>(); - - collect_rpc_results(resp_stream, stop_after).await -} - -async fn collect_rpc_results(mut resp_stream: FuturesUnordered<impl Future<Output=Result<Message, Error>>>, - stop_after: Option<usize>) - -> Vec<Result<Message, Error>> -{ let mut results = vec![]; let mut n_ok = 0; while let Some(resp) = resp_stream.next().await { |