diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-15 15:40:44 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-15 15:40:44 +0100 |
commit | 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c (patch) | |
tree | 5bfe599b2ce2c0e558d9fb244647eccda9164f88 /src/rpc/layout/history.rs | |
parent | 393c4d4515e0cdadadc8de8ae2df12e4371cff88 (diff) | |
download | garage-33c8a489b0a9c0e869282bfc19c548f5a3e02e8c.tar.gz garage-33c8a489b0a9c0e869282bfc19c548f5a3e02e8c.zip |
layou: implement ack locking
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r-- | src/rpc/layout/history.rs | 98 |
1 files changed, 72 insertions, 26 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b6f0e495..dd38efa7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; use std::collections::HashSet; use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -21,6 +23,11 @@ pub struct LayoutHelper { trackers_hash: Hash, staging_hash: Hash, + + // ack lock: counts in-progress write operations for each + // layout version ; we don't increase the ack update tracker + // while this lock is nonzero + pub(crate) ack_lock: HashMap<u64, AtomicUsize>, } impl Deref for LayoutHelper { @@ -31,7 +38,7 @@ impl Deref for LayoutHelper { } impl LayoutHelper { - pub fn new(mut layout: LayoutHistory) -> Self { + pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self { layout.cleanup_old_versions(); let all_nongateway_nodes = layout.get_all_nongateway_nodes(); @@ -51,6 +58,11 @@ impl LayoutHelper { let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); + ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); + ack_lock + .entry(layout.current().version) + .or_insert(AtomicUsize::new(0)); + LayoutHelper { layout: Some(layout), ack_map_min, @@ -59,6 +71,7 @@ impl LayoutHelper { all_nongateway_nodes, trackers_hash, staging_hash, + ack_lock, } } @@ -74,7 +87,10 @@ impl LayoutHelper { { let changed = f(&mut self.layout.as_mut().unwrap()); if changed { - *self = Self::new(self.layout.take().unwrap()); + *self = Self::new( + self.layout.take().unwrap(), + std::mem::take(&mut self.ack_lock), + ); } changed } @@ -115,7 +131,7 @@ impl LayoutHelper { .collect() } - pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { self.layout() .versions .iter() @@ -143,42 +159,72 @@ impl LayoutHelper { // ------------------ helpers for update tracking --------------- - pub(crate) fn sync_first(&mut self, node: Uuid) { + pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version which is not currently + // locked by an in-progress write operation + self.ack_max_free(local_node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(local_node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(local_node_id); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } + + fn sync_first(&mut self, local_node_id: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; - self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); } - pub(crate) fn sync_ack(&mut self, node: Uuid) { + fn sync_ack(&mut self, local_node_id: Uuid) { let sync_map_min = self.sync_map_min; self.update(|layout| { layout .update_trackers .sync_ack_map - .set_max(node, sync_map_min) + .set_max(local_node_id, sync_map_min) }); } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_ack = self.max_free_ack(); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_ack) + }); + if changed { + info!("ack_until updated to {}", max_ack); + } + changed } - pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { - // Ensure trackers for this node's values are up-to-date - - // 1. Acknowledge the last layout version in the history - self.ack_last(node_id); - - // 2. Assume the data on this node is sync'ed up at least to - // the first layout version in the history - self.sync_first(node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(node_id); - - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + pub(crate) fn max_free_ack(&self) -> u64 { + self.layout() + .versions + .iter() + .map(|x| x.version) + .take_while(|v| { + self.ack_lock + .get(v) + .map(|x| x.load(Ordering::Relaxed) == 0) + .unwrap_or(true) + }) + .max() + .unwrap_or(self.min_stored()) } } |