aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/helper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/helper.rs')
-rw-r--r--src/rpc/layout/helper.rs224
1 files changed, 224 insertions, 0 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
new file mode 100644
index 00000000..ed3da498
--- /dev/null
+++ b/src/rpc/layout/helper.rs
@@ -0,0 +1,224 @@
+use std::collections::HashMap;
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use garage_util::data::*;
+
+use super::schema::*;
+
+pub struct LayoutHelper {
+ layout: Option<LayoutHistory>,
+
+ // cached values
+ ack_map_min: u64,
+ sync_map_min: u64,
+
+ all_nodes: Vec<Uuid>,
+ all_nongateway_nodes: Vec<Uuid>,
+
+ pub(crate) trackers_hash: Hash,
+ pub(crate) staging_hash: Hash,
+
+ // ack lock: counts in-progress write operations for each
+ // layout version ; we don't increase the ack update tracker
+ // while this lock is nonzero
+ pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
+}
+
+impl Deref for LayoutHelper {
+ type Target = LayoutHistory;
+ fn deref(&self) -> &LayoutHistory {
+ self.layout()
+ }
+}
+
+impl LayoutHelper {
+ pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
+ layout.cleanup_old_versions();
+
+ let all_nongateway_nodes = layout.get_all_nongateway_nodes();
+ layout.clamp_update_trackers(&all_nongateway_nodes);
+
+ let min_version = layout.min_stored();
+ let ack_map_min = layout
+ .update_trackers
+ .ack_map
+ .min(&all_nongateway_nodes, min_version);
+ let sync_map_min = layout
+ .update_trackers
+ .sync_map
+ .min(&all_nongateway_nodes, min_version);
+
+ let all_nodes = layout.get_all_nodes();
+ let trackers_hash = layout.calculate_trackers_hash();
+ let staging_hash = layout.calculate_staging_hash();
+
+ ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
+ ack_lock
+ .entry(layout.current().version)
+ .or_insert(AtomicUsize::new(0));
+
+ LayoutHelper {
+ layout: Some(layout),
+ ack_map_min,
+ sync_map_min,
+ all_nodes,
+ all_nongateway_nodes,
+ trackers_hash,
+ staging_hash,
+ ack_lock,
+ }
+ }
+
+ // ------------------ single updating function --------------
+
+ fn layout(&self) -> &LayoutHistory {
+ self.layout.as_ref().unwrap()
+ }
+
+ pub(crate) fn update<F>(&mut self, f: F) -> bool
+ where
+ F: FnOnce(&mut LayoutHistory) -> bool,
+ {
+ let changed = f(&mut self.layout.as_mut().unwrap());
+ if changed {
+ *self = Self::new(
+ self.layout.take().unwrap(),
+ std::mem::take(&mut self.ack_lock),
+ );
+ }
+ changed
+ }
+
+ // ------------------ read helpers ---------------
+
+ pub fn all_nodes(&self) -> &[Uuid] {
+ &self.all_nodes
+ }
+
+ pub fn all_nongateway_nodes(&self) -> &[Uuid] {
+ &self.all_nongateway_nodes
+ }
+
+ pub fn all_ack(&self) -> u64 {
+ self.ack_map_min
+ }
+
+ pub fn sync_versions(&self) -> (u64, u64, u64) {
+ (
+ self.layout().current().version,
+ self.all_ack(),
+ self.layout().min_stored(),
+ )
+ }
+
+ pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let sync_min = self.sync_map_min;
+ let version = self
+ .layout()
+ .versions
+ .iter()
+ .find(|x| x.version == sync_min)
+ .or(self.layout().versions.last())
+ .unwrap();
+ version
+ .nodes_of(position, version.replication_factor)
+ .collect()
+ }
+
+ pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.nodes_of(position, x.replication_factor).collect())
+ .collect()
+ }
+
+ pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let mut ret = vec![];
+ for version in self.layout().versions.iter() {
+ ret.extend(version.nodes_of(position, version.replication_factor));
+ }
+ ret.sort();
+ ret.dedup();
+ ret
+ }
+
+ pub fn trackers_hash(&self) -> Hash {
+ self.trackers_hash
+ }
+
+ pub fn staging_hash(&self) -> Hash {
+ self.staging_hash
+ }
+
+ // ------------------ helpers for update tracking ---------------
+
+ pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version which is not currently
+ // locked by an in-progress write operation
+ self.ack_max_free(local_node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(local_node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(local_node_id);
+
+ info!("ack_map: {:?}", self.update_trackers.ack_map);
+ info!("sync_map: {:?}", self.update_trackers.sync_map);
+ info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+
+ fn sync_first(&mut self, local_node_id: Uuid) {
+ let first_version = self.versions.first().as_ref().unwrap().version;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_map
+ .set_max(local_node_id, first_version)
+ });
+ }
+
+ fn sync_ack(&mut self, local_node_id: Uuid) {
+ let sync_map_min = self.sync_map_min;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_ack_map
+ .set_max(local_node_id, sync_map_min)
+ });
+ }
+
+ pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
+ let max_ack = self.max_free_ack();
+ let changed = self.update(|layout| {
+ layout
+ .update_trackers
+ .ack_map
+ .set_max(local_node_id, max_ack)
+ });
+ if changed {
+ info!("ack_until updated to {}", max_ack);
+ }
+ changed
+ }
+
+ pub(crate) fn max_free_ack(&self) -> u64 {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.version)
+ .take_while(|v| {
+ self.ack_lock
+ .get(v)
+ .map(|x| x.load(Ordering::Relaxed) == 0)
+ .unwrap_or(true)
+ })
+ .max()
+ .unwrap_or(self.min_stored())
+ }
+}