diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-11 23:53:32 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-11 23:53:32 +0200 |
commit | 9c931f5edacbaaab746ecf180fac2dd7062d0336 (patch) | |
tree | f29cfd82f573ac871408256a33e11f9153bae1da /src/membership.rs | |
parent | 5dd59e437d5af84dfa2cf5dcc2c15807b971002d (diff) | |
download | garage-9c931f5edacbaaab746ecf180fac2dd7062d0336.tar.gz garage-9c931f5edacbaaab746ecf180fac2dd7062d0336.zip |
Keep network status & ring in a tokio::sync::watch
advantages
- reads don't prevent preparing writes
- can be followed from other parts of the system by cloning the receiver
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 185 |
1 files changed, 108 insertions, 77 deletions
diff --git a/src/membership.rs b/src/membership.rs index cb8dba99..22c13f64 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -13,7 +13,7 @@ use futures_util::future::*; use sha2::{Digest, Sha256}; use tokio::prelude::*; use tokio::sync::watch; -use tokio::sync::RwLock; +use tokio::sync::Mutex; use crate::background::BackgroundRunner; use crate::data::*; @@ -32,36 +32,44 @@ pub struct System { pub rpc_client: RpcClient, - pub members: RwLock<Members>, + pub status: watch::Receiver<Arc<Status>>, + pub ring: watch::Receiver<Arc<Ring>>, + + update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>, pub background: Arc<BackgroundRunner>, } -pub struct Members { - pub status: HashMap<UUID, NodeStatus>, - pub status_hash: Hash, - - pub config: NetworkConfig, - pub ring: Vec<RingEntry>, - pub n_datacenters: usize, +#[derive(Debug, Clone)] +pub struct Status { + pub nodes: HashMap<UUID, NodeStatus>, + pub hash: Hash, } +#[derive(Debug, Clone)] pub struct NodeStatus { pub addr: SocketAddr, pub remaining_ping_attempts: usize, } -#[derive(Debug)] +#[derive(Clone)] +pub struct Ring { + pub config: NetworkConfig, + pub ring: Vec<RingEntry>, + pub n_datacenters: usize, +} + +#[derive(Clone, Debug)] pub struct RingEntry { pub location: Hash, pub node: UUID, pub datacenter: u64, } -impl Members { +impl Status { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); - let old_status = self.status.insert( + let old_status = self.nodes.insert( info.id.clone(), NodeStatus { addr: addr.clone(), @@ -77,8 +85,8 @@ impl Members { } } - fn recalculate_status_hash(&mut self) { - let mut nodes = self.status.iter().collect::<Vec<_>>(); + fn recalculate_hash(&mut self) { + let mut nodes = self.nodes.iter().collect::<Vec<_>>(); nodes.sort_unstable_by_key(|(id, _status)| *id); let mut hasher = Sha256::new(); @@ -88,11 +96,13 @@ impl Members { hasher.input(format!("{} {}\n", hex::encode(&id), status.addr)); } eprintln!("END --"); - self.status_hash + self.hash .as_slice_mut() .copy_from_slice(&hasher.result()[..]); } +} +impl Ring { fn rebuild_ring(&mut self) { let mut new_ring = vec![]; let mut datacenters = vec![]; @@ -201,20 +211,28 @@ impl System { } } }; - let mut members = Members { - status: HashMap::new(), - status_hash: Hash::default(), + let mut status = Status { + nodes: HashMap::new(), + hash: Hash::default(), + }; + status.recalculate_hash(); + let (update_status, status) = watch::channel(Arc::new(status)); + + let mut ring = Ring { config: net_config, ring: Vec::new(), n_datacenters: 0, }; - members.recalculate_status_hash(); - members.rebuild_ring(); + ring.rebuild_ring(); + let (update_ring, ring) = watch::channel(Arc::new(ring)); + System { config, id, rpc_client: RpcClient::new(), - members: RwLock::new(members), + status, + ring, + update_lock: Mutex::new((update_status, update_ring)), background, } } @@ -223,37 +241,33 @@ impl System { let mut path = self.config.metadata_dir.clone(); path.push("network_config"); - let members = self.members.read().await; - let data = - rmp_to_vec_all_named(&members.config)?; - drop(members); + let ring = self.ring.borrow().clone(); + let data = rmp_to_vec_all_named(&ring.config)?; - let mut f = tokio::fs::File::create(path.as_path()) - .await?; - f.write_all(&data[..]) - .await?; + let mut f = tokio::fs::File::create(path.as_path()).await?; + f.write_all(&data[..]).await?; Ok(()) } - pub async fn make_ping(&self) -> Message { - let members = self.members.read().await; + pub fn make_ping(&self) -> Message { + let status = self.status.borrow().clone(); + let ring = self.ring.borrow().clone(); Message::Ping(PingMessage { id: self.id.clone(), rpc_port: self.config.rpc_port, - status_hash: members.status_hash.clone(), - config_version: members.config.version, + status_hash: status.hash.clone(), + config_version: ring.config.version, }) } pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) { - let members = self.members.read().await; - let to = members - .status + let status = self.status.borrow().clone(); + let to = status + .nodes .keys() .filter(|x| **x != self.id) .cloned() .collect::<Vec<_>>(); - drop(members); rpc_call_many(self.clone(), &to[..], &msg, timeout).await; } @@ -273,7 +287,7 @@ impl System { } pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { - let ping_msg = self.make_ping().await; + let ping_msg = self.make_ping(); let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { let sys = self.clone(); let ping_msg_ref = &ping_msg; @@ -287,14 +301,16 @@ impl System { })) .await; - let mut members = self.members.write().await; + let update_locked = self.update_lock.lock().await; + let mut status: Status = self.status.borrow().as_ref().clone(); + let ring = self.ring.borrow().clone(); let mut has_changes = false; let mut to_advertise = vec![]; for (id_option, addr, ping_resp) in ping_resps { if let Ok(Message::Ping(info)) = ping_resp { - let is_new = members.handle_ping(addr.ip(), &info); + let is_new = status.handle_ping(addr.ip(), &info); if is_new { has_changes = true; to_advertise.push(AdvertisedNode { @@ -302,17 +318,17 @@ impl System { addr: addr.clone(), }); } - if is_new || members.status_hash != info.status_hash { + if is_new || status.hash != info.status_hash { self.background .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok)); } - if is_new || members.config.version < info.config_version { + if is_new || ring.config.version < info.config_version { self.background .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok)); } } else if let Some(id) = id_option { - let remaining_attempts = members - .status + let remaining_attempts = status + .nodes .get(id) .map(|x| x.remaining_ping_attempts) .unwrap_or(0); @@ -321,19 +337,22 @@ impl System { "Removing node {} after too many failed pings", hex::encode(&id) ); - members.status.remove(&id); + status.nodes.remove(&id); has_changes = true; } else { - if let Some(st) = members.status.get_mut(id) { + if let Some(st) = status.nodes.get_mut(id) { st.remaining_ping_attempts = remaining_attempts - 1; } } } } if has_changes { - members.recalculate_status_hash(); + status.recalculate_hash(); } - drop(members); + if let Err(e) = update_locked.0.broadcast(Arc::new(status)) { + eprintln!("In ping_nodes: could not save status update ({})", e); + } + drop(update_locked); if to_advertise.len() > 0 { self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT) @@ -346,29 +365,35 @@ impl System { from: &SocketAddr, ping: &PingMessage, ) -> Result<Message, Error> { - let mut members = self.members.write().await; - let is_new = members.handle_ping(from.ip(), ping); + let update_locked = self.update_lock.lock().await; + let mut status: Status = self.status.borrow().as_ref().clone(); + + let is_new = status.handle_ping(from.ip(), ping); if is_new { - members.recalculate_status_hash(); + status.recalculate_hash(); } - let status_hash = members.status_hash.clone(); - let config_version = members.config.version; - drop(members); + let status_hash = status.hash.clone(); + let config_version = self.ring.borrow().config.version; + + update_locked.0.broadcast(Arc::new(status))?; + drop(update_locked); if is_new || status_hash != ping.status_hash { - self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); + self.background + .spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); } if is_new || config_version < ping.config_version { - self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); + self.background + .spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); } - Ok(self.make_ping().await) + Ok(self.make_ping()) } - pub async fn handle_pull_status(&self) -> Result<Message, Error> { - let members = self.members.read().await; + pub fn handle_pull_status(&self) -> Result<Message, Error> { + let status = self.status.borrow().clone(); let mut mem = vec![]; - for (node, status) in members.status.iter() { + for (node, status) in status.nodes.iter() { mem.push(AdvertisedNode { id: node.clone(), addr: status.addr.clone(), @@ -377,9 +402,9 @@ impl System { Ok(Message::AdvertiseNodesUp(mem)) } - pub async fn handle_pull_config(&self) -> Result<Message, Error> { - let members = self.members.read().await; - Ok(Message::AdvertiseConfig(members.config.clone())) + pub fn handle_pull_config(&self) -> Result<Message, Error> { + let ring = self.ring.borrow().clone(); + Ok(Message::AdvertiseConfig(ring.config.clone())) } pub async fn handle_advertise_nodes_up( @@ -388,14 +413,15 @@ impl System { ) -> Result<Message, Error> { let mut to_ping = vec![]; - let mut members = self.members.write().await; + let update_lock = self.update_lock.lock().await; + let mut status: Status = self.status.borrow().as_ref().clone(); let mut has_changed = false; for node in adv.iter() { if node.id == self.id { // learn our own ip address let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); - let old_self = members.status.insert( + let old_self = status.nodes.insert( node.id.clone(), NodeStatus { addr: self_addr, @@ -406,17 +432,19 @@ impl System { None => true, Some(x) => x.addr != self_addr, }; - } else if !members.status.contains_key(&node.id) { + } else if !status.nodes.contains_key(&node.id) { to_ping.push((node.addr.clone(), Some(node.id.clone()))); } } if has_changed { - members.recalculate_status_hash(); + status.recalculate_hash(); } - drop(members); + update_lock.0.broadcast(Arc::new(status))?; + drop(update_lock); if to_ping.len() > 0 { - self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); + self.background + .spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); } Ok(Message::Ok) @@ -426,10 +454,14 @@ impl System { self: Arc<Self>, adv: &NetworkConfig, ) -> Result<Message, Error> { - let mut members = self.members.write().await; - if adv.version > members.config.version { - members.config = adv.clone(); - members.rebuild_ring(); + let mut ring: Ring = self.ring.borrow().as_ref().clone(); + let update_lock = self.update_lock.lock().await; + + if adv.version > ring.config.version { + ring.config = adv.clone(); + ring.rebuild_ring(); + update_lock.1.broadcast(Arc::new(ring))?; + drop(update_lock); self.background.spawn_cancellable( self.clone() @@ -446,14 +478,13 @@ impl System { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); - let members = self.members.read().await; - let ping_addrs = members - .status + let status = self.status.borrow().clone(); + let ping_addrs = status + .nodes .iter() .filter(|(id, _)| **id != self.id) .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) .collect::<Vec<_>>(); - drop(members); self.clone().ping_nodes(ping_addrs).await; |