aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-15 14:20:50 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-15 14:20:50 +0100
commit393c4d4515e0cdadadc8de8ae2df12e4371cff88 (patch)
tree1805db8a9697a723abd76f26f2cd421d9f5960a5
parent65066c70640371cc318faddfb4c05c96de18e86d (diff)
downloadgarage-393c4d4515e0cdadadc8de8ae2df12e4371cff88.tar.gz
garage-393c4d4515e0cdadadc8de8ae2df12e4371cff88.zip
layout: add helper for cached/external values to centralize recomputation
-rw-r--r--src/api/admin/cluster.rs1
-rw-r--r--src/api/k2v/index.rs2
-rw-r--r--src/garage/cli/layout.rs3
-rw-r--r--src/model/helper/bucket.rs2
-rw-r--r--src/model/index_counter.rs4
-rw-r--r--src/rpc/layout/history.rs311
-rw-r--r--src/rpc/layout/manager.rs22
-rw-r--r--src/rpc/layout/schema.rs48
-rw-r--r--src/rpc/rpc_helper.rs6
-rw-r--r--src/rpc/system.rs4
10 files changed, 222 insertions, 181 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index d912b58f..593bd778 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -240,7 +240,6 @@ pub async fn handle_update_cluster_layout(
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
- layout.update_hashes();
garage
.system
.layout_manager
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index c189232a..e8cd1fba 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -29,7 +29,7 @@ pub async fn handle_read_index(
.system
.cluster_layout()
.all_nongateway_nodes()
- .into_owned();
+ .to_vec();
let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table,
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 0f01a37a..51774314 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -49,7 +49,7 @@ pub async fn cmd_assign_role(
};
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
- let all_nodes = layout.all_nodes().into_owned();
+ let all_nodes = layout.get_all_nodes();
let added_nodes = args
.node_ids
@@ -331,7 +331,6 @@ pub async fn send_layout(
rpc_host: NodeID,
mut layout: LayoutHistory,
) -> Result<(), Error> {
- layout.update_hashes();
rpc_cli
.call(
&rpc_host,
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 2cb53424..efa3e27b 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -455,7 +455,7 @@ impl<'a> BucketHelper<'a> {
.system
.cluster_layout()
.all_nongateway_nodes()
- .into_owned();
+ .to_vec();
let k2vindexes = self
.0
.k2v
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 2d968733..e8702bf1 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
-use garage_rpc::layout::LayoutHistory;
+use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -83,7 +83,7 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
}
impl<T: CountedItem> CounterEntry<T> {
- pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> {
+ pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap<String, i64> {
let nodes = layout.all_nongateway_nodes();
self.filtered_values_with_nodes(&nodes)
}
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 1684918e..b6f0e495 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,5 +1,5 @@
-use std::borrow::Cow;
use std::collections::HashSet;
+use std::ops::Deref;
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
@@ -9,95 +9,106 @@ use garage_util::error::*;
use super::schema::*;
use super::*;
+pub struct LayoutHelper {
+ layout: Option<LayoutHistory>,
-impl LayoutHistory {
- pub fn new(replication_factor: usize) -> Self {
- let version = LayoutVersion::new(replication_factor);
+ // cached values
+ ack_map_min: u64,
+ sync_map_min: u64,
- let staging = LayoutStaging {
- parameters: Lww::<LayoutParameters>::new(version.parameters),
- roles: LwwMap::new(),
- };
+ all_nodes: Vec<Uuid>,
+ all_nongateway_nodes: Vec<Uuid>,
- let mut ret = LayoutHistory {
- versions: vec![version],
- update_trackers: Default::default(),
- trackers_hash: [0u8; 32].into(),
- staging: Lww::raw(0, staging),
- staging_hash: [0u8; 32].into(),
- };
- ret.update_hashes();
- ret
- }
+ trackers_hash: Hash,
+ staging_hash: Hash,
+}
- pub fn current(&self) -> &LayoutVersion {
- self.versions.last().as_ref().unwrap()
+impl Deref for LayoutHelper {
+ type Target = LayoutHistory;
+ fn deref(&self) -> &LayoutHistory {
+ self.layout()
}
+}
- pub fn update_hashes(&mut self) {
- self.trackers_hash = self.calculate_trackers_hash();
- self.staging_hash = self.calculate_staging_hash();
+impl LayoutHelper {
+ pub fn new(mut layout: LayoutHistory) -> Self {
+ layout.cleanup_old_versions();
+
+ let all_nongateway_nodes = layout.get_all_nongateway_nodes();
+ layout.clamp_update_trackers(&all_nongateway_nodes);
+
+ let min_version = layout.min_stored();
+ let ack_map_min = layout
+ .update_trackers
+ .ack_map
+ .min(&all_nongateway_nodes, min_version);
+ let sync_map_min = layout
+ .update_trackers
+ .sync_map
+ .min(&all_nongateway_nodes, min_version);
+
+ let all_nodes = layout.get_all_nodes();
+ let trackers_hash = layout.calculate_trackers_hash();
+ let staging_hash = layout.calculate_staging_hash();
+
+ LayoutHelper {
+ layout: Some(layout),
+ ack_map_min,
+ sync_map_min,
+ all_nodes,
+ all_nongateway_nodes,
+ trackers_hash,
+ staging_hash,
+ }
}
- pub(crate) fn calculate_trackers_hash(&self) -> Hash {
- blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
- }
+ // ------------------ single updating function --------------
- pub(crate) fn calculate_staging_hash(&self) -> Hash {
- blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
+ fn layout(&self) -> &LayoutHistory {
+ self.layout.as_ref().unwrap()
}
- // ------------------ who stores what now? ---------------
-
- pub fn all_ack(&self) -> u64 {
- self.update_trackers.ack_map.current_min
+ pub(crate) fn update<F>(&mut self, f: F) -> bool
+ where
+ F: FnOnce(&mut LayoutHistory) -> bool,
+ {
+ let changed = f(&mut self.layout.as_mut().unwrap());
+ if changed {
+ *self = Self::new(self.layout.take().unwrap());
+ }
+ changed
}
- pub fn min_stored(&self) -> u64 {
- self.versions.first().as_ref().unwrap().version
+ // ------------------ read helpers ---------------
+
+ pub fn all_nodes(&self) -> &[Uuid] {
+ &self.all_nodes
}
- pub fn sync_versions(&self) -> (u64, u64, u64) {
- (self.current().version, self.all_ack(), self.min_stored())
+ pub fn all_nongateway_nodes(&self) -> &[Uuid] {
+ &self.all_nongateway_nodes
}
- pub fn all_nodes(&self) -> Cow<'_, [Uuid]> {
- // TODO: cache this
- if self.versions.len() == 1 {
- self.versions[0].all_nodes().into()
- } else {
- let set = self
- .versions
- .iter()
- .map(|x| x.all_nodes())
- .flatten()
- .collect::<HashSet<_>>();
- set.into_iter().copied().collect::<Vec<_>>().into()
- }
+ pub fn all_ack(&self) -> u64 {
+ self.ack_map_min
}
- pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> {
- // TODO: cache this
- if self.versions.len() == 1 {
- self.versions[0].nongateway_nodes().into()
- } else {
- let set = self
- .versions
- .iter()
- .map(|x| x.nongateway_nodes())
- .flatten()
- .collect::<HashSet<_>>();
- set.into_iter().copied().collect::<Vec<_>>().into()
- }
+ pub fn sync_versions(&self) -> (u64, u64, u64) {
+ (
+ self.layout().current().version,
+ self.all_ack(),
+ self.layout().min_stored(),
+ )
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
- let sync_min = self.update_trackers.sync_map.current_min;
+ let sync_min = self.sync_map_min;
let version = self
+ .layout()
.versions
.iter()
.find(|x| x.version == sync_min)
- .or(self.versions.last())
+ .or(self.layout().versions.last())
.unwrap();
version
.nodes_of(position, version.replication_factor)
@@ -105,7 +116,8 @@ impl LayoutHistory {
}
pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
- self.versions
+ self.layout()
+ .versions
.iter()
.map(|x| x.nodes_of(position, x.replication_factor).collect())
.collect()
@@ -113,7 +125,7 @@ impl LayoutHistory {
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let mut ret = vec![];
- for version in self.versions.iter() {
+ for version in self.layout().versions.iter() {
ret.extend(version.nodes_of(position, version.replication_factor));
}
ret.sort();
@@ -121,7 +133,35 @@ impl LayoutHistory {
ret
}
- // ------------------ update tracking ---------------
+ pub fn trackers_hash(&self) -> Hash {
+ self.trackers_hash
+ }
+
+ pub fn staging_hash(&self) -> Hash {
+ self.staging_hash
+ }
+
+ // ------------------ helpers for update tracking ---------------
+
+ pub(crate) fn sync_first(&mut self, node: Uuid) {
+ let first_version = self.versions.first().as_ref().unwrap().version;
+ self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version));
+ }
+
+ pub(crate) fn sync_ack(&mut self, node: Uuid) {
+ let sync_map_min = self.sync_map_min;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_ack_map
+ .set_max(node, sync_map_min)
+ });
+ }
+
+ pub(crate) fn ack_last(&mut self, node: Uuid) {
+ let last_version = self.current().version;
+ self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version));
+ }
pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date
@@ -136,55 +176,104 @@ impl LayoutHistory {
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(node_id);
- // 4. Cleanup layout versions that are not needed anymore
- self.cleanup_old_versions();
-
- // 5. Recalculate global minima
- self.update_trackers_min();
-
info!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+}
- // Finally, update hashes
- self.update_hashes();
+// ----
+
+impl LayoutHistory {
+ pub fn new(replication_factor: usize) -> Self {
+ let version = LayoutVersion::new(replication_factor);
+
+ let staging = LayoutStaging {
+ parameters: Lww::<LayoutParameters>::new(version.parameters),
+ roles: LwwMap::new(),
+ };
+
+ LayoutHistory {
+ versions: vec![version],
+ update_trackers: Default::default(),
+ staging: Lww::raw(0, staging),
+ }
}
- fn update_trackers_min(&mut self) {
- // TODO: for TableFullReplication, counting gateway nodes might be
- // necessary? Think about this more.
- let storage_nodes = self.all_nongateway_nodes().into_owned();
- let min_version = self.versions.first().unwrap().version;
- self.update_trackers.update_min(&storage_nodes, min_version);
+ // ------------------ who stores what now? ---------------
+
+ pub fn current(&self) -> &LayoutVersion {
+ self.versions.last().as_ref().unwrap()
}
- pub(crate) fn ack_last(&mut self, node: Uuid) {
- let last_version = self.current().version;
- self.update_trackers.ack_map.set_max(node, last_version);
- self.update_trackers_min();
+ pub fn min_stored(&self) -> u64 {
+ self.versions.first().as_ref().unwrap().version
}
- pub(crate) fn sync_first(&mut self, node: Uuid) {
- let first_version = self.versions.first().as_ref().unwrap().version;
- self.update_trackers.sync_map.set_max(node, first_version);
- self.update_trackers_min();
+ pub fn get_all_nodes(&self) -> Vec<Uuid> {
+ if self.versions.len() == 1 {
+ self.versions[0].all_nodes().to_vec()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .map(|x| x.all_nodes())
+ .flatten()
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>()
+ }
}
- pub(crate) fn sync_ack(&mut self, node: Uuid) {
- self.update_trackers
- .sync_ack_map
- .set_max(node, self.update_trackers.sync_map.current_min);
- self.update_trackers_min();
+ fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
+ if self.versions.len() == 1 {
+ self.versions[0].nongateway_nodes().to_vec()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .map(|x| x.nongateway_nodes())
+ .flatten()
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>()
+ }
}
- pub(crate) fn cleanup_old_versions(&mut self) {
- let min_sync_ack = self.update_trackers.sync_ack_map.current_min;
- while self.versions.first().as_ref().unwrap().version < min_sync_ack {
- let removed = self.versions.remove(0);
- info!("Layout history: pruning old version {}", removed.version);
+ // ---- housekeeping (all invoked by LayoutHelper) ----
+
+ fn cleanup_old_versions(&mut self) {
+ loop {
+ let all_nongateway_nodes = self.get_all_nongateway_nodes();
+ let min_version = self.min_stored();
+ let sync_ack_map_min = self
+ .update_trackers
+ .sync_ack_map
+ .min(&all_nongateway_nodes, min_version);
+ if self.min_stored() < sync_ack_map_min {
+ let removed = self.versions.remove(0);
+ info!("Layout history: pruning old version {}", removed.version);
+ } else {
+ break;
+ }
}
}
+ fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
+ let min_v = self.min_stored();
+ for node in nodes {
+ self.update_trackers.ack_map.set_max(*node, min_v);
+ self.update_trackers.sync_map.set_max(*node, min_v);
+ self.update_trackers.sync_ack_map.set_max(*node, min_v);
+ }
+ }
+
+ fn calculate_trackers_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
+ }
+
+ fn calculate_staging_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
+ }
+
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
@@ -221,20 +310,6 @@ impl LayoutHistory {
self.versions.remove(0);
changed = true;
}
- if changed {
- let min_v = self.versions.first().unwrap().version;
- let nodes = self.all_nongateway_nodes().into_owned();
- for node in nodes {
- self.update_trackers.ack_map.set_max(node, min_v);
- self.update_trackers.sync_map.set_max(node, min_v);
- self.update_trackers.sync_ack_map.set_max(node, min_v);
- }
- }
- }
-
- // Update the current_min value in trackers if anything changed
- if changed {
- self.update_trackers_min();
}
// Merge staged layout changes
@@ -280,7 +355,6 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: self.staging.get().parameters.clone(),
roles: LwwMap::new(),
});
- self.update_hashes();
Ok((self, msg))
}
@@ -290,20 +364,11 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: Lww::new(self.current().parameters.clone()),
roles: LwwMap::new(),
});
- self.update_hashes();
Ok(self)
}
pub fn check(&self) -> Result<(), String> {
- // Check that the hash of the staging data is correct
- if self.trackers_hash != self.calculate_trackers_hash() {
- return Err("trackers_hash is incorrect".into());
- }
- if self.staging_hash != self.calculate_staging_hash() {
- return Err("staging_hash is incorrect".into());
- }
-
for version in self.versions.iter() {
version.check()?;
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 21ec2d8d..e270ad21 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -24,7 +24,7 @@ pub struct LayoutManager {
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
- layout: Arc<RwLock<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHelper>>,
pub(crate) change_notify: Arc<Notify>,
table_sync_version: Mutex<HashMap<String, u64>>,
@@ -54,7 +54,7 @@ impl LayoutManager {
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
- let mut cluster_layout = match persist_cluster_layout.load() {
+ let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
@@ -74,6 +74,7 @@ impl LayoutManager {
}
};
+ let mut cluster_layout = LayoutHelper::new(cluster_layout);
cluster_layout.update_trackers_of(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@@ -100,7 +101,7 @@ impl LayoutManager {
// ---- PUBLIC INTERFACE ----
- pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
+ pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
self.layout.read().unwrap()
}
@@ -108,8 +109,8 @@ impl LayoutManager {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
- cluster_layout_trackers_hash: layout.trackers_hash,
- cluster_layout_staging_hash: layout.staging_hash,
+ cluster_layout_trackers_hash: layout.trackers_hash(),
+ cluster_layout_staging_hash: layout.staging_hash(),
}
}
@@ -137,13 +138,8 @@ impl LayoutManager {
drop(table_sync_version);
let mut layout = self.layout.write().unwrap();
- if layout
- .update_trackers
- .sync_map
- .set_max(self.node_id, sync_until)
- {
+ if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
debug!("sync_until updated to {}", sync_until);
- layout.update_hashes();
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.update_trackers.clone(),
));
@@ -157,7 +153,7 @@ impl LayoutManager {
let prev_layout_check = layout.check().is_ok();
if !prev_layout_check || adv.check().is_ok() {
- if layout.merge(adv) {
+ if layout.update(|l| l.merge(adv)) {
layout.update_trackers_of(self.node_id);
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
@@ -171,7 +167,7 @@ impl LayoutManager {
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
- if layout.update_trackers.merge(adv) {
+ if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers_of(self.node_id);
return Some(layout.update_trackers.clone());
}
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index 969f5a0b..00a2c017 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -188,7 +188,7 @@ mod v010 {
use super::v09;
use crate::layout::CompactNodeType;
use garage_util::crdt::{Lww, LwwMap};
- use garage_util::data::{Hash, Uuid};
+ use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
@@ -202,13 +202,9 @@ mod v010 {
/// Update trackers
pub update_trackers: UpdateTrackers,
- /// Hash of the update trackers
- pub trackers_hash: Hash,
/// Staged changes for the next version
pub staging: Lww<LayoutStaging>,
- /// Hash of the serialized staging_parameters + staging_roles
- pub staging_hash: Hash,
}
/// A version of the layout of the cluster, i.e. the list of roles
@@ -260,10 +256,7 @@ mod v010 {
/// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
- pub struct UpdateTracker {
- pub values: BTreeMap<Uuid, u64>,
- pub current_min: u64,
- }
+ pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);
impl garage_util::migrate::Migrate for LayoutHistory {
const VERSION_MARKER: &'static [u8] = b"G010lh";
@@ -293,32 +286,27 @@ mod v010 {
nongateway_node_count,
ring_assignment_data: previous.ring_assignment_data,
};
- let update_tracker = UpdateTracker {
- values: version
+ let update_tracker = UpdateTracker(
+ version
.nongateway_nodes()
.iter()
.copied()
.map(|x| (x, version.version))
.collect::<BTreeMap<Uuid, u64>>(),
- current_min: 0,
- };
+ );
let staging = LayoutStaging {
parameters: previous.staging_parameters,
roles: previous.staging_roles,
};
- let mut ret = Self {
+ Self {
versions: vec![version],
update_trackers: UpdateTrackers {
ack_map: update_tracker.clone(),
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
- trackers_hash: [0u8; 32].into(),
staging: Lww::raw(previous.version, staging),
- staging_hash: [0u8; 32].into(),
- };
- ret.update_hashes();
- ret
+ }
}
}
}
@@ -382,14 +370,14 @@ impl core::str::FromStr for ZoneRedundancy {
impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) -> bool {
let mut changed = false;
- for (k, v) in other.values.iter() {
- if let Some(v_mut) = self.values.get_mut(k) {
+ for (k, v) in other.0.iter() {
+ if let Some(v_mut) = self.0.get_mut(k) {
if *v > *v_mut {
*v_mut = *v;
changed = true;
}
} else {
- self.values.insert(*k, *v);
+ self.0.insert(*k, *v);
changed = true;
}
}
@@ -397,23 +385,23 @@ impl UpdateTracker {
}
pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
- match self.values.get_mut(&peer) {
+ match self.0.get_mut(&peer) {
Some(e) if *e < value => {
*e = value;
true
}
None => {
- self.values.insert(peer, value);
+ self.0.insert(peer, value);
true
}
_ => false,
}
}
- fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) {
- self.current_min = storage_nodes
+ pub(crate) fn min(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
+ storage_nodes
.iter()
- .map(|x| self.values.get(x).copied().unwrap_or(min_version))
+ .map(|x| self.0.get(x).copied().unwrap_or(min_version))
.min()
.unwrap_or(min_version)
}
@@ -426,10 +414,4 @@ impl UpdateTrackers {
let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
c1 || c2 || c3
}
-
- pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) {
- self.ack_map.update_min(&storage_nodes, min_version);
- self.sync_map.update_min(&storage_nodes, min_version);
- self.sync_ack_map.update_min(&storage_nodes, min_version);
- }
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 1bad495b..e269ddaa 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -26,7 +26,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
-use crate::layout::LayoutHistory;
+use crate::layout::LayoutHelper;
use crate::metrics::RpcMetrics;
// Default RPC timeout = 5 minutes
@@ -90,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout: Arc<RwLock<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHelper>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
@@ -99,7 +99,7 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout: Arc<RwLock<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHelper>>,
rpc_timeout: Option<Duration>,
) -> Self {
let metrics = RpcMetrics::new();
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 31d78bf6..d74dc2a1 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::manager::{LayoutManager, LayoutStatus};
-use crate::layout::{self, LayoutHistory, NodeRoleV};
+use crate::layout::{self, LayoutHelper, LayoutHistory, NodeRoleV};
use crate::replication_mode::*;
use crate::rpc_helper::*;
@@ -350,7 +350,7 @@ impl System {
// ---- Public utilities / accessors ----
- pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
+ pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
self.layout_manager.layout()
}