diff options
author | Alex Auvolat <alex@adnab.me> | 2021-02-21 13:11:10 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-02-21 13:11:10 +0100 |
commit | 80892df8cce74211960c4259f48cd8c1bd5290be (patch) | |
tree | e714d111de3959acba68aaeae445f766a107d429 /src/rpc/membership.rs | |
parent | 3bcbbe1e31e528e8d9648b0431a43f85e9496e58 (diff) | |
download | garage-80892df8cce74211960c4259f48cd8c1bd5290be.tar.gz garage-80892df8cce74211960c4259f48cd8c1bd5290be.zip |
Some refactoring
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r-- | src/rpc/membership.rs | 121 |
1 files changed, 6 insertions, 115 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6e573a61..4872899b 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,6 +1,4 @@ use std::collections::HashMap; -use std::hash::Hash as StdHash; -use std::hash::Hasher; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; @@ -24,6 +22,7 @@ use garage_util::error::Error; use crate::consul::get_consul_nodes; use crate::rpc_client::*; use crate::rpc_server::*; +use crate::ring::*; const PING_INTERVAL: Duration = Duration::from_secs(10); const CONSUL_INTERVAL: Duration = Duration::from_secs(60); @@ -66,19 +65,6 @@ pub struct AdvertisedNode { pub state_info: StateInfo, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NetworkConfig { - pub members: HashMap<UUID, NetworkConfigEntry>, - pub version: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NetworkConfigEntry { - pub datacenter: String, - pub n_tokens: u32, - pub tag: String, -} - pub struct System { pub id: UUID, @@ -90,7 +76,7 @@ pub struct System { rpc_http_client: Arc<RpcHttpClient>, rpc_client: Arc<RpcClient<Message>>, - pub status: watch::Receiver<Arc<Status>>, + pub(crate) status: watch::Receiver<Arc<Status>>, pub ring: watch::Receiver<Arc<Ring>>, update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>, @@ -123,20 +109,6 @@ pub struct StateInfo { pub hostname: String, } -#[derive(Clone)] -pub struct Ring { - pub config: NetworkConfig, - pub ring: Vec<RingEntry>, - pub n_datacenters: usize, -} - -#[derive(Clone, Debug)] -pub struct RingEntry { - pub location: Hash, - pub node: UUID, - pub datacenter: u64, -} - impl Status { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); @@ -175,86 +147,6 @@ impl Status { } } -impl Ring { - fn rebuild_ring(&mut self) { - let mut new_ring = vec![]; - let mut datacenters = vec![]; - - for (id, config) in self.config.members.iter() { - let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); - config.datacenter.hash(&mut dc_hasher); - let datacenter = dc_hasher.finish(); - - if !datacenters.contains(&datacenter) { - datacenters.push(datacenter); - } - - for i in 0..config.n_tokens { - let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); - - new_ring.push(RingEntry { - location: location.into(), - node: *id, - datacenter, - }) - } - } - - new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); - self.ring = new_ring; - self.n_datacenters = datacenters.len(); - - // eprintln!("RING: --"); - // for e in self.ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); - } - - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::<Vec<_>>(); - } - - let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { - Ok(i) => i, - Err(i) => { - if i == 0 { - self.ring.len() - 1 - } else { - i - 1 - } - } - }; - - self.walk_ring_from_pos(start, n) - } - - fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::<Vec<_>>(); - } - - let mut ret = vec![]; - let mut datacenters = vec![]; - - let mut delta = 0; - while ret.len() < n { - let i = (start + delta) % self.ring.len(); - delta += 1; - - if !datacenters.contains(&self.ring[i].datacenter) { - ret.push(self.ring[i].node); - datacenters.push(self.ring[i].datacenter); - } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { - ret.push(self.ring[i].node); - } - } - - ret - } -} - fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { let mut id_file = metadata_dir.clone(); id_file.push("node_id"); @@ -312,10 +204,7 @@ impl System { "No valid previous network configuration stored ({}), starting fresh.", e ); - NetworkConfig { - members: HashMap::new(), - version: 0, - } + NetworkConfig::new() } }; let mut status = Status { @@ -641,9 +530,11 @@ impl System { adv: &NetworkConfig, ) -> Result<Message, Error> { let update_lock = self.update_lock.lock().await; - let mut ring: Ring = self.ring.borrow().as_ref().clone(); + let ring: Arc<Ring> = self.ring.borrow().clone(); if adv.version > ring.config.version { + let mut ring = ring.as_ref().clone(); + ring.config = adv.clone(); ring.rebuild_ring(); update_lock.1.broadcast(Arc::new(ring))?; |