aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-21 13:11:10 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-21 13:11:10 +0100
commit80892df8cce74211960c4259f48cd8c1bd5290be (patch)
treee714d111de3959acba68aaeae445f766a107d429
parent3bcbbe1e31e528e8d9648b0431a43f85e9496e58 (diff)
downloadgarage-80892df8cce74211960c4259f48cd8c1bd5290be.tar.gz
garage-80892df8cce74211960c4259f48cd8c1bd5290be.zip
Some refactoring
-rw-r--r--src/garage/main.rs1
-rw-r--r--src/rpc/lib.rs6
-rw-r--r--src/rpc/membership.rs121
-rw-r--r--src/rpc/ring.rs122
-rw-r--r--src/table/table.rs3
-rw-r--r--src/table/table_fullcopy.rs3
-rw-r--r--src/table/table_sharded.rs3
-rw-r--r--src/table/table_sync.rs2
8 files changed, 141 insertions, 120 deletions
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 7c8899a0..2d13cd7c 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -21,6 +21,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::*;
+use garage_rpc::ring::*;
use garage_rpc::rpc_client::*;
use admin_rpc::*;
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 639ece15..71d75395 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -2,7 +2,11 @@
extern crate log;
pub mod consul;
+pub(crate) mod tls_util;
+
+pub mod ring;
pub mod membership;
+
pub mod rpc_client;
pub mod rpc_server;
-pub(crate) mod tls_util;
+
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6e573a61..4872899b 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -1,6 +1,4 @@
use std::collections::HashMap;
-use std::hash::Hash as StdHash;
-use std::hash::Hasher;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
@@ -24,6 +22,7 @@ use garage_util::error::Error;
use crate::consul::get_consul_nodes;
use crate::rpc_client::*;
use crate::rpc_server::*;
+use crate::ring::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
@@ -66,19 +65,6 @@ pub struct AdvertisedNode {
pub state_info: StateInfo,
}
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct NetworkConfig {
- pub members: HashMap<UUID, NetworkConfigEntry>,
- pub version: u64,
-}
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct NetworkConfigEntry {
- pub datacenter: String,
- pub n_tokens: u32,
- pub tag: String,
-}
-
pub struct System {
pub id: UUID,
@@ -90,7 +76,7 @@ pub struct System {
rpc_http_client: Arc<RpcHttpClient>,
rpc_client: Arc<RpcClient<Message>>,
- pub status: watch::Receiver<Arc<Status>>,
+ pub(crate) status: watch::Receiver<Arc<Status>>,
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
@@ -123,20 +109,6 @@ pub struct StateInfo {
pub hostname: String,
}
-#[derive(Clone)]
-pub struct Ring {
- pub config: NetworkConfig,
- pub ring: Vec<RingEntry>,
- pub n_datacenters: usize,
-}
-
-#[derive(Clone, Debug)]
-pub struct RingEntry {
- pub location: Hash,
- pub node: UUID,
- pub datacenter: u64,
-}
-
impl Status {
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
let addr = SocketAddr::new(ip, info.rpc_port);
@@ -175,86 +147,6 @@ impl Status {
}
}
-impl Ring {
- fn rebuild_ring(&mut self) {
- let mut new_ring = vec![];
- let mut datacenters = vec![];
-
- for (id, config) in self.config.members.iter() {
- let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
- config.datacenter.hash(&mut dc_hasher);
- let datacenter = dc_hasher.finish();
-
- if !datacenters.contains(&datacenter) {
- datacenters.push(datacenter);
- }
-
- for i in 0..config.n_tokens {
- let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
-
- new_ring.push(RingEntry {
- location: location.into(),
- node: *id,
- datacenter,
- })
- }
- }
-
- new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
- self.ring = new_ring;
- self.n_datacenters = datacenters.len();
-
- // eprintln!("RING: --");
- // for e in self.ring.iter() {
- // eprintln!("{:?}", e);
- // }
- // eprintln!("END --");
- }
-
- pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
- if n >= self.config.members.len() {
- return self.config.members.keys().cloned().collect::<Vec<_>>();
- }
-
- let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
- Ok(i) => i,
- Err(i) => {
- if i == 0 {
- self.ring.len() - 1
- } else {
- i - 1
- }
- }
- };
-
- self.walk_ring_from_pos(start, n)
- }
-
- fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
- if n >= self.config.members.len() {
- return self.config.members.keys().cloned().collect::<Vec<_>>();
- }
-
- let mut ret = vec![];
- let mut datacenters = vec![];
-
- let mut delta = 0;
- while ret.len() < n {
- let i = (start + delta) % self.ring.len();
- delta += 1;
-
- if !datacenters.contains(&self.ring[i].datacenter) {
- ret.push(self.ring[i].node);
- datacenters.push(self.ring[i].datacenter);
- } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
- ret.push(self.ring[i].node);
- }
- }
-
- ret
- }
-}
-
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
let mut id_file = metadata_dir.clone();
id_file.push("node_id");
@@ -312,10 +204,7 @@ impl System {
"No valid previous network configuration stored ({}), starting fresh.",
e
);
- NetworkConfig {
- members: HashMap::new(),
- version: 0,
- }
+ NetworkConfig::new()
}
};
let mut status = Status {
@@ -641,9 +530,11 @@ impl System {
adv: &NetworkConfig,
) -> Result<Message, Error> {
let update_lock = self.update_lock.lock().await;
- let mut ring: Ring = self.ring.borrow().as_ref().clone();
+ let ring: Arc<Ring> = self.ring.borrow().clone();
if adv.version > ring.config.version {
+ let mut ring = ring.as_ref().clone();
+
ring.config = adv.clone();
ring.rebuild_ring();
update_lock.1.broadcast(Arc::new(ring))?;
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
new file mode 100644
index 00000000..1646afbf
--- /dev/null
+++ b/src/rpc/ring.rs
@@ -0,0 +1,122 @@
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct NetworkConfig {
+ pub members: HashMap<UUID, NetworkConfigEntry>,
+ pub version: u64,
+}
+
+impl NetworkConfig {
+ pub(crate) fn new() -> Self {
+ Self{
+ members: HashMap::new(),
+ version: 0,
+ }
+ }
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct NetworkConfigEntry {
+ pub datacenter: String,
+ pub n_tokens: u32,
+ pub tag: String,
+}
+
+
+#[derive(Clone)]
+pub struct Ring {
+ pub config: NetworkConfig,
+ pub ring: Vec<RingEntry>,
+ pub n_datacenters: usize,
+}
+
+#[derive(Clone, Debug)]
+pub struct RingEntry {
+ pub location: Hash,
+ pub node: UUID,
+ datacenter: usize,
+}
+
+impl Ring {
+ pub(crate) fn rebuild_ring(&mut self) {
+ let mut new_ring = vec![];
+ let mut datacenters = vec![];
+
+ for (id, config) in self.config.members.iter() {
+ let datacenter = &config.datacenter;
+
+ if !datacenters.contains(datacenter) {
+ datacenters.push(datacenter.to_string());
+ }
+ let datacenter_idx = datacenters.iter().enumerate().find(|(_, dc)| *dc == datacenter).unwrap().0;
+
+ for i in 0..config.n_tokens {
+ let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
+
+ new_ring.push(RingEntry {
+ location: location.into(),
+ node: *id,
+ datacenter: datacenter_idx,
+ })
+ }
+ }
+
+ new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
+ self.ring = new_ring;
+ self.n_datacenters = datacenters.len();
+
+ // eprintln!("RING: --");
+ // for e in self.ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
+ }
+
+ pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
+ if n >= self.config.members.len() {
+ return self.config.members.keys().cloned().collect::<Vec<_>>();
+ }
+
+ let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
+ Ok(i) => i,
+ Err(i) => {
+ if i == 0 {
+ self.ring.len() - 1
+ } else {
+ i - 1
+ }
+ }
+ };
+
+ self.walk_ring_from_pos(start, n)
+ }
+
+ fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
+ if n >= self.config.members.len() {
+ return self.config.members.keys().cloned().collect::<Vec<_>>();
+ }
+
+ let mut ret = vec![];
+ let mut datacenters = vec![];
+
+ let mut delta = 0;
+ while ret.len() < n {
+ let i = (start + delta) % self.ring.len();
+ delta += 1;
+
+ if !datacenters.contains(&self.ring[i].datacenter) {
+ ret.push(self.ring[i].node);
+ datacenters.push(self.ring[i].datacenter);
+ } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
+ ret.push(self.ring[i].node);
+ }
+ }
+
+ ret
+ }
+}
diff --git a/src/table/table.rs b/src/table/table.rs
index acb46325..300e400f 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -12,7 +12,8 @@ use serde_bytes::ByteBuf;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::{Ring, System};
+use garage_rpc::membership::System;
+use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs
index a6c78a63..4659cb2c 100644
--- a/src/table/table_fullcopy.rs
+++ b/src/table/table_fullcopy.rs
@@ -1,7 +1,8 @@
use arc_swap::ArcSwapOption;
use std::sync::Arc;
-use garage_rpc::membership::{Ring, System};
+use garage_rpc::membership::{System};
+use garage_rpc::ring::Ring;
use garage_util::data::*;
use crate::*;
diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs
index 88856542..4f98902d 100644
--- a/src/table/table_sharded.rs
+++ b/src/table/table_sharded.rs
@@ -1,4 +1,5 @@
-use garage_rpc::membership::{Ring, System};
+use garage_rpc::membership::{System};
+use garage_rpc::ring::Ring;
use garage_util::data::*;
use crate::*;
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 6c0df15b..3c667985 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -12,7 +12,7 @@ use serde_bytes::ByteBuf;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch};
-use garage_rpc::membership::Ring;
+use garage_rpc::ring::Ring;
use garage_util::data::*;
use garage_util::error::Error;