aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/ring.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-21 13:11:10 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-21 13:11:10 +0100
commit80892df8cce74211960c4259f48cd8c1bd5290be (patch)
treee714d111de3959acba68aaeae445f766a107d429 /src/rpc/ring.rs
parent3bcbbe1e31e528e8d9648b0431a43f85e9496e58 (diff)
downloadgarage-80892df8cce74211960c4259f48cd8c1bd5290be.tar.gz
garage-80892df8cce74211960c4259f48cd8c1bd5290be.zip
Some refactoring
Diffstat (limited to 'src/rpc/ring.rs')
-rw-r--r--src/rpc/ring.rs122
1 files changed, 122 insertions, 0 deletions
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
new file mode 100644
index 00000000..1646afbf
--- /dev/null
+++ b/src/rpc/ring.rs
@@ -0,0 +1,122 @@
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct NetworkConfig {
+ pub members: HashMap<UUID, NetworkConfigEntry>,
+ pub version: u64,
+}
+
+impl NetworkConfig {
+ pub(crate) fn new() -> Self {
+ Self{
+ members: HashMap::new(),
+ version: 0,
+ }
+ }
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct NetworkConfigEntry {
+ pub datacenter: String,
+ pub n_tokens: u32,
+ pub tag: String,
+}
+
+
+#[derive(Clone)]
+pub struct Ring {
+ pub config: NetworkConfig,
+ pub ring: Vec<RingEntry>,
+ pub n_datacenters: usize,
+}
+
+#[derive(Clone, Debug)]
+pub struct RingEntry {
+ pub location: Hash,
+ pub node: UUID,
+ datacenter: usize,
+}
+
+impl Ring {
+ pub(crate) fn rebuild_ring(&mut self) {
+ let mut new_ring = vec![];
+ let mut datacenters = vec![];
+
+ for (id, config) in self.config.members.iter() {
+ let datacenter = &config.datacenter;
+
+ if !datacenters.contains(datacenter) {
+ datacenters.push(datacenter.to_string());
+ }
+ let datacenter_idx = datacenters.iter().enumerate().find(|(_, dc)| *dc == datacenter).unwrap().0;
+
+ for i in 0..config.n_tokens {
+ let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
+
+ new_ring.push(RingEntry {
+ location: location.into(),
+ node: *id,
+ datacenter: datacenter_idx,
+ })
+ }
+ }
+
+ new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
+ self.ring = new_ring;
+ self.n_datacenters = datacenters.len();
+
+ // eprintln!("RING: --");
+ // for e in self.ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
+ }
+
+ pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
+ if n >= self.config.members.len() {
+ return self.config.members.keys().cloned().collect::<Vec<_>>();
+ }
+
+ let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
+ Ok(i) => i,
+ Err(i) => {
+ if i == 0 {
+ self.ring.len() - 1
+ } else {
+ i - 1
+ }
+ }
+ };
+
+ self.walk_ring_from_pos(start, n)
+ }
+
+ fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
+ if n >= self.config.members.len() {
+ return self.config.members.keys().cloned().collect::<Vec<_>>();
+ }
+
+ let mut ret = vec![];
+ let mut datacenters = vec![];
+
+ let mut delta = 0;
+ while ret.len() < n {
+ let i = (start + delta) % self.ring.len();
+ delta += 1;
+
+ if !datacenters.contains(&self.ring[i].datacenter) {
+ ret.push(self.ring[i].node);
+ datacenters.push(self.ring[i].datacenter);
+ } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
+ ret.push(self.ring[i].node);
+ }
+ }
+
+ ret
+ }
+}