diff options
author | Alex Auvolat <alex@adnab.me> | 2021-02-21 13:11:10 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-02-21 13:11:10 +0100 |
commit | 80892df8cce74211960c4259f48cd8c1bd5290be (patch) | |
tree | e714d111de3959acba68aaeae445f766a107d429 | |
parent | 3bcbbe1e31e528e8d9648b0431a43f85e9496e58 (diff) | |
download | garage-80892df8cce74211960c4259f48cd8c1bd5290be.tar.gz garage-80892df8cce74211960c4259f48cd8c1bd5290be.zip |
Some refactoring
-rw-r--r-- | src/garage/main.rs | 1 | ||||
-rw-r--r-- | src/rpc/lib.rs | 6 | ||||
-rw-r--r-- | src/rpc/membership.rs | 121 | ||||
-rw-r--r-- | src/rpc/ring.rs | 122 | ||||
-rw-r--r-- | src/table/table.rs | 3 | ||||
-rw-r--r-- | src/table/table_fullcopy.rs | 3 | ||||
-rw-r--r-- | src/table/table_sharded.rs | 3 | ||||
-rw-r--r-- | src/table/table_sync.rs | 2 |
8 files changed, 141 insertions, 120 deletions
diff --git a/src/garage/main.rs b/src/garage/main.rs index 7c8899a0..2d13cd7c 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -21,6 +21,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::*; +use garage_rpc::ring::*; use garage_rpc::rpc_client::*; use admin_rpc::*; diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 639ece15..71d75395 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -2,7 +2,11 @@ extern crate log; pub mod consul; +pub(crate) mod tls_util; + +pub mod ring; pub mod membership; + pub mod rpc_client; pub mod rpc_server; -pub(crate) mod tls_util; + diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6e573a61..4872899b 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,6 +1,4 @@ use std::collections::HashMap; -use std::hash::Hash as StdHash; -use std::hash::Hasher; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; @@ -24,6 +22,7 @@ use garage_util::error::Error; use crate::consul::get_consul_nodes; use crate::rpc_client::*; use crate::rpc_server::*; +use crate::ring::*; const PING_INTERVAL: Duration = Duration::from_secs(10); const CONSUL_INTERVAL: Duration = Duration::from_secs(60); @@ -66,19 +65,6 @@ pub struct AdvertisedNode { pub state_info: StateInfo, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NetworkConfig { - pub members: HashMap<UUID, NetworkConfigEntry>, - pub version: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NetworkConfigEntry { - pub datacenter: String, - pub n_tokens: u32, - pub tag: String, -} - pub struct System { pub id: UUID, @@ -90,7 +76,7 @@ pub struct System { rpc_http_client: Arc<RpcHttpClient>, rpc_client: Arc<RpcClient<Message>>, - pub status: watch::Receiver<Arc<Status>>, + pub(crate) status: watch::Receiver<Arc<Status>>, pub ring: watch::Receiver<Arc<Ring>>, update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>, @@ -123,20 +109,6 @@ pub struct StateInfo { pub hostname: String, } -#[derive(Clone)] -pub struct Ring { - pub config: NetworkConfig, - pub ring: Vec<RingEntry>, - pub n_datacenters: usize, -} - -#[derive(Clone, Debug)] -pub struct RingEntry { - pub location: Hash, - pub node: UUID, - pub datacenter: u64, -} - impl Status { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); @@ -175,86 +147,6 @@ impl Status { } } -impl Ring { - fn rebuild_ring(&mut self) { - let mut new_ring = vec![]; - let mut datacenters = vec![]; - - for (id, config) in self.config.members.iter() { - let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); - config.datacenter.hash(&mut dc_hasher); - let datacenter = dc_hasher.finish(); - - if !datacenters.contains(&datacenter) { - datacenters.push(datacenter); - } - - for i in 0..config.n_tokens { - let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); - - new_ring.push(RingEntry { - location: location.into(), - node: *id, - datacenter, - }) - } - } - - new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); - self.ring = new_ring; - self.n_datacenters = datacenters.len(); - - // eprintln!("RING: --"); - // for e in self.ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); - } - - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::<Vec<_>>(); - } - - let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { - Ok(i) => i, - Err(i) => { - if i == 0 { - self.ring.len() - 1 - } else { - i - 1 - } - } - }; - - self.walk_ring_from_pos(start, n) - } - - fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::<Vec<_>>(); - } - - let mut ret = vec![]; - let mut datacenters = vec![]; - - let mut delta = 0; - while ret.len() < n { - let i = (start + delta) % self.ring.len(); - delta += 1; - - if !datacenters.contains(&self.ring[i].datacenter) { - ret.push(self.ring[i].node); - datacenters.push(self.ring[i].datacenter); - } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { - ret.push(self.ring[i].node); - } - } - - ret - } -} - fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { let mut id_file = metadata_dir.clone(); id_file.push("node_id"); @@ -312,10 +204,7 @@ impl System { "No valid previous network configuration stored ({}), starting fresh.", e ); - NetworkConfig { - members: HashMap::new(), - version: 0, - } + NetworkConfig::new() } }; let mut status = Status { @@ -641,9 +530,11 @@ impl System { adv: &NetworkConfig, ) -> Result<Message, Error> { let update_lock = self.update_lock.lock().await; - let mut ring: Ring = self.ring.borrow().as_ref().clone(); + let ring: Arc<Ring> = self.ring.borrow().clone(); if adv.version > ring.config.version { + let mut ring = ring.as_ref().clone(); + ring.config = adv.clone(); ring.rebuild_ring(); update_lock.1.broadcast(Arc::new(ring))?; diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs new file mode 100644 index 00000000..1646afbf --- /dev/null +++ b/src/rpc/ring.rs @@ -0,0 +1,122 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkConfig { + pub members: HashMap<UUID, NetworkConfigEntry>, + pub version: u64, +} + +impl NetworkConfig { + pub(crate) fn new() -> Self { + Self{ + members: HashMap::new(), + version: 0, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkConfigEntry { + pub datacenter: String, + pub n_tokens: u32, + pub tag: String, +} + + +#[derive(Clone)] +pub struct Ring { + pub config: NetworkConfig, + pub ring: Vec<RingEntry>, + pub n_datacenters: usize, +} + +#[derive(Clone, Debug)] +pub struct RingEntry { + pub location: Hash, + pub node: UUID, + datacenter: usize, +} + +impl Ring { + pub(crate) fn rebuild_ring(&mut self) { + let mut new_ring = vec![]; + let mut datacenters = vec![]; + + for (id, config) in self.config.members.iter() { + let datacenter = &config.datacenter; + + if !datacenters.contains(datacenter) { + datacenters.push(datacenter.to_string()); + } + let datacenter_idx = datacenters.iter().enumerate().find(|(_, dc)| *dc == datacenter).unwrap().0; + + for i in 0..config.n_tokens { + let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); + + new_ring.push(RingEntry { + location: location.into(), + node: *id, + datacenter: datacenter_idx, + }) + } + } + + new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); + self.ring = new_ring; + self.n_datacenters = datacenters.len(); + + // eprintln!("RING: --"); + // for e in self.ring.iter() { + // eprintln!("{:?}", e); + // } + // eprintln!("END --"); + } + + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { + if n >= self.config.members.len() { + return self.config.members.keys().cloned().collect::<Vec<_>>(); + } + + let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { + Ok(i) => i, + Err(i) => { + if i == 0 { + self.ring.len() - 1 + } else { + i - 1 + } + } + }; + + self.walk_ring_from_pos(start, n) + } + + fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { + if n >= self.config.members.len() { + return self.config.members.keys().cloned().collect::<Vec<_>>(); + } + + let mut ret = vec![]; + let mut datacenters = vec![]; + + let mut delta = 0; + while ret.len() < n { + let i = (start + delta) % self.ring.len(); + delta += 1; + + if !datacenters.contains(&self.ring[i].datacenter) { + ret.push(self.ring[i].node); + datacenters.push(self.ring[i].datacenter); + } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { + ret.push(self.ring[i].node); + } + } + + ret + } +} diff --git a/src/table/table.rs b/src/table/table.rs index acb46325..300e400f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -12,7 +12,8 @@ use serde_bytes::ByteBuf; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::{Ring, System}; +use garage_rpc::membership::System; +use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs index a6c78a63..4659cb2c 100644 --- a/src/table/table_fullcopy.rs +++ b/src/table/table_fullcopy.rs @@ -1,7 +1,8 @@ use arc_swap::ArcSwapOption; use std::sync::Arc; -use garage_rpc::membership::{Ring, System}; +use garage_rpc::membership::{System}; +use garage_rpc::ring::Ring; use garage_util::data::*; use crate::*; diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs index 88856542..4f98902d 100644 --- a/src/table/table_sharded.rs +++ b/src/table/table_sharded.rs @@ -1,4 +1,5 @@ -use garage_rpc::membership::{Ring, System}; +use garage_rpc::membership::{System}; +use garage_rpc::ring::Ring; use garage_util::data::*; use crate::*; diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 6c0df15b..3c667985 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -12,7 +12,7 @@ use serde_bytes::ByteBuf; use tokio::sync::Mutex; use tokio::sync::{mpsc, watch}; -use garage_rpc::membership::Ring; +use garage_rpc::ring::Ring; use garage_util::data::*; use garage_util::error::Error; |