aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/history.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r--src/rpc/layout/history.rs311
1 files changed, 188 insertions, 123 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 1684918e..b6f0e495 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,5 +1,5 @@
-use std::borrow::Cow;
use std::collections::HashSet;
+use std::ops::Deref;
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
@@ -9,95 +9,106 @@ use garage_util::error::*;
use super::schema::*;
use super::*;
+pub struct LayoutHelper {
+ layout: Option<LayoutHistory>,
-impl LayoutHistory {
- pub fn new(replication_factor: usize) -> Self {
- let version = LayoutVersion::new(replication_factor);
+ // cached values
+ ack_map_min: u64,
+ sync_map_min: u64,
- let staging = LayoutStaging {
- parameters: Lww::<LayoutParameters>::new(version.parameters),
- roles: LwwMap::new(),
- };
+ all_nodes: Vec<Uuid>,
+ all_nongateway_nodes: Vec<Uuid>,
- let mut ret = LayoutHistory {
- versions: vec![version],
- update_trackers: Default::default(),
- trackers_hash: [0u8; 32].into(),
- staging: Lww::raw(0, staging),
- staging_hash: [0u8; 32].into(),
- };
- ret.update_hashes();
- ret
- }
+ trackers_hash: Hash,
+ staging_hash: Hash,
+}
- pub fn current(&self) -> &LayoutVersion {
- self.versions.last().as_ref().unwrap()
+impl Deref for LayoutHelper {
+ type Target = LayoutHistory;
+ fn deref(&self) -> &LayoutHistory {
+ self.layout()
}
+}
- pub fn update_hashes(&mut self) {
- self.trackers_hash = self.calculate_trackers_hash();
- self.staging_hash = self.calculate_staging_hash();
+impl LayoutHelper {
+ pub fn new(mut layout: LayoutHistory) -> Self {
+ layout.cleanup_old_versions();
+
+ let all_nongateway_nodes = layout.get_all_nongateway_nodes();
+ layout.clamp_update_trackers(&all_nongateway_nodes);
+
+ let min_version = layout.min_stored();
+ let ack_map_min = layout
+ .update_trackers
+ .ack_map
+ .min(&all_nongateway_nodes, min_version);
+ let sync_map_min = layout
+ .update_trackers
+ .sync_map
+ .min(&all_nongateway_nodes, min_version);
+
+ let all_nodes = layout.get_all_nodes();
+ let trackers_hash = layout.calculate_trackers_hash();
+ let staging_hash = layout.calculate_staging_hash();
+
+ LayoutHelper {
+ layout: Some(layout),
+ ack_map_min,
+ sync_map_min,
+ all_nodes,
+ all_nongateway_nodes,
+ trackers_hash,
+ staging_hash,
+ }
}
- pub(crate) fn calculate_trackers_hash(&self) -> Hash {
- blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
- }
+ // ------------------ single updating function --------------
- pub(crate) fn calculate_staging_hash(&self) -> Hash {
- blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
+ fn layout(&self) -> &LayoutHistory {
+ self.layout.as_ref().unwrap()
}
- // ------------------ who stores what now? ---------------
-
- pub fn all_ack(&self) -> u64 {
- self.update_trackers.ack_map.current_min
+ pub(crate) fn update<F>(&mut self, f: F) -> bool
+ where
+ F: FnOnce(&mut LayoutHistory) -> bool,
+ {
+ let changed = f(&mut self.layout.as_mut().unwrap());
+ if changed {
+ *self = Self::new(self.layout.take().unwrap());
+ }
+ changed
}
- pub fn min_stored(&self) -> u64 {
- self.versions.first().as_ref().unwrap().version
+ // ------------------ read helpers ---------------
+
+ pub fn all_nodes(&self) -> &[Uuid] {
+ &self.all_nodes
}
- pub fn sync_versions(&self) -> (u64, u64, u64) {
- (self.current().version, self.all_ack(), self.min_stored())
+ pub fn all_nongateway_nodes(&self) -> &[Uuid] {
+ &self.all_nongateway_nodes
}
- pub fn all_nodes(&self) -> Cow<'_, [Uuid]> {
- // TODO: cache this
- if self.versions.len() == 1 {
- self.versions[0].all_nodes().into()
- } else {
- let set = self
- .versions
- .iter()
- .map(|x| x.all_nodes())
- .flatten()
- .collect::<HashSet<_>>();
- set.into_iter().copied().collect::<Vec<_>>().into()
- }
+ pub fn all_ack(&self) -> u64 {
+ self.ack_map_min
}
- pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> {
- // TODO: cache this
- if self.versions.len() == 1 {
- self.versions[0].nongateway_nodes().into()
- } else {
- let set = self
- .versions
- .iter()
- .map(|x| x.nongateway_nodes())
- .flatten()
- .collect::<HashSet<_>>();
- set.into_iter().copied().collect::<Vec<_>>().into()
- }
+ pub fn sync_versions(&self) -> (u64, u64, u64) {
+ (
+ self.layout().current().version,
+ self.all_ack(),
+ self.layout().min_stored(),
+ )
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
- let sync_min = self.update_trackers.sync_map.current_min;
+ let sync_min = self.sync_map_min;
let version = self
+ .layout()
.versions
.iter()
.find(|x| x.version == sync_min)
- .or(self.versions.last())
+ .or(self.layout().versions.last())
.unwrap();
version
.nodes_of(position, version.replication_factor)
@@ -105,7 +116,8 @@ impl LayoutHistory {
}
pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
- self.versions
+ self.layout()
+ .versions
.iter()
.map(|x| x.nodes_of(position, x.replication_factor).collect())
.collect()
@@ -113,7 +125,7 @@ impl LayoutHistory {
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let mut ret = vec![];
- for version in self.versions.iter() {
+ for version in self.layout().versions.iter() {
ret.extend(version.nodes_of(position, version.replication_factor));
}
ret.sort();
@@ -121,7 +133,35 @@ impl LayoutHistory {
ret
}
- // ------------------ update tracking ---------------
+ pub fn trackers_hash(&self) -> Hash {
+ self.trackers_hash
+ }
+
+ pub fn staging_hash(&self) -> Hash {
+ self.staging_hash
+ }
+
+ // ------------------ helpers for update tracking ---------------
+
+ pub(crate) fn sync_first(&mut self, node: Uuid) {
+ let first_version = self.versions.first().as_ref().unwrap().version;
+ self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version));
+ }
+
+ pub(crate) fn sync_ack(&mut self, node: Uuid) {
+ let sync_map_min = self.sync_map_min;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_ack_map
+ .set_max(node, sync_map_min)
+ });
+ }
+
+ pub(crate) fn ack_last(&mut self, node: Uuid) {
+ let last_version = self.current().version;
+ self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version));
+ }
pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date
@@ -136,55 +176,104 @@ impl LayoutHistory {
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(node_id);
- // 4. Cleanup layout versions that are not needed anymore
- self.cleanup_old_versions();
-
- // 5. Recalculate global minima
- self.update_trackers_min();
-
info!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+}
- // Finally, update hashes
- self.update_hashes();
+// ----
+
+impl LayoutHistory {
+ pub fn new(replication_factor: usize) -> Self {
+ let version = LayoutVersion::new(replication_factor);
+
+ let staging = LayoutStaging {
+ parameters: Lww::<LayoutParameters>::new(version.parameters),
+ roles: LwwMap::new(),
+ };
+
+ LayoutHistory {
+ versions: vec![version],
+ update_trackers: Default::default(),
+ staging: Lww::raw(0, staging),
+ }
}
- fn update_trackers_min(&mut self) {
- // TODO: for TableFullReplication, counting gateway nodes might be
- // necessary? Think about this more.
- let storage_nodes = self.all_nongateway_nodes().into_owned();
- let min_version = self.versions.first().unwrap().version;
- self.update_trackers.update_min(&storage_nodes, min_version);
+ // ------------------ who stores what now? ---------------
+
+ pub fn current(&self) -> &LayoutVersion {
+ self.versions.last().as_ref().unwrap()
}
- pub(crate) fn ack_last(&mut self, node: Uuid) {
- let last_version = self.current().version;
- self.update_trackers.ack_map.set_max(node, last_version);
- self.update_trackers_min();
+ pub fn min_stored(&self) -> u64 {
+ self.versions.first().as_ref().unwrap().version
}
- pub(crate) fn sync_first(&mut self, node: Uuid) {
- let first_version = self.versions.first().as_ref().unwrap().version;
- self.update_trackers.sync_map.set_max(node, first_version);
- self.update_trackers_min();
+ pub fn get_all_nodes(&self) -> Vec<Uuid> {
+ if self.versions.len() == 1 {
+ self.versions[0].all_nodes().to_vec()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .map(|x| x.all_nodes())
+ .flatten()
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>()
+ }
}
- pub(crate) fn sync_ack(&mut self, node: Uuid) {
- self.update_trackers
- .sync_ack_map
- .set_max(node, self.update_trackers.sync_map.current_min);
- self.update_trackers_min();
+ fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
+ if self.versions.len() == 1 {
+ self.versions[0].nongateway_nodes().to_vec()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .map(|x| x.nongateway_nodes())
+ .flatten()
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>()
+ }
}
- pub(crate) fn cleanup_old_versions(&mut self) {
- let min_sync_ack = self.update_trackers.sync_ack_map.current_min;
- while self.versions.first().as_ref().unwrap().version < min_sync_ack {
- let removed = self.versions.remove(0);
- info!("Layout history: pruning old version {}", removed.version);
+ // ---- housekeeping (all invoked by LayoutHelper) ----
+
+ fn cleanup_old_versions(&mut self) {
+ loop {
+ let all_nongateway_nodes = self.get_all_nongateway_nodes();
+ let min_version = self.min_stored();
+ let sync_ack_map_min = self
+ .update_trackers
+ .sync_ack_map
+ .min(&all_nongateway_nodes, min_version);
+ if self.min_stored() < sync_ack_map_min {
+ let removed = self.versions.remove(0);
+ info!("Layout history: pruning old version {}", removed.version);
+ } else {
+ break;
+ }
}
}
+ fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
+ let min_v = self.min_stored();
+ for node in nodes {
+ self.update_trackers.ack_map.set_max(*node, min_v);
+ self.update_trackers.sync_map.set_max(*node, min_v);
+ self.update_trackers.sync_ack_map.set_max(*node, min_v);
+ }
+ }
+
+ fn calculate_trackers_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
+ }
+
+ fn calculate_staging_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
+ }
+
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
@@ -221,20 +310,6 @@ impl LayoutHistory {
self.versions.remove(0);
changed = true;
}
- if changed {
- let min_v = self.versions.first().unwrap().version;
- let nodes = self.all_nongateway_nodes().into_owned();
- for node in nodes {
- self.update_trackers.ack_map.set_max(node, min_v);
- self.update_trackers.sync_map.set_max(node, min_v);
- self.update_trackers.sync_ack_map.set_max(node, min_v);
- }
- }
- }
-
- // Update the current_min value in trackers if anything changed
- if changed {
- self.update_trackers_min();
}
// Merge staged layout changes
@@ -280,7 +355,6 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: self.staging.get().parameters.clone(),
roles: LwwMap::new(),
});
- self.update_hashes();
Ok((self, msg))
}
@@ -290,20 +364,11 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: Lww::new(self.current().parameters.clone()),
roles: LwwMap::new(),
});
- self.update_hashes();
Ok(self)
}
pub fn check(&self) -> Result<(), String> {
- // Check that the hash of the staging data is correct
- if self.trackers_hash != self.calculate_trackers_hash() {
- return Err("trackers_hash is incorrect".into());
- }
- if self.staging_hash != self.calculate_staging_hash() {
- return Err("staging_hash is incorrect".into());
- }
-
for version in self.versions.iter() {
version.check()?;
}