diff options
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/history.rs | 98 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 74 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 1 |
3 files changed, 141 insertions, 32 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b6f0e495..dd38efa7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; use std::collections::HashSet; use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -21,6 +23,11 @@ pub struct LayoutHelper { trackers_hash: Hash, staging_hash: Hash, + + // ack lock: counts in-progress write operations for each + // layout version ; we don't increase the ack update tracker + // while this lock is nonzero + pub(crate) ack_lock: HashMap<u64, AtomicUsize>, } impl Deref for LayoutHelper { @@ -31,7 +38,7 @@ impl Deref for LayoutHelper { } impl LayoutHelper { - pub fn new(mut layout: LayoutHistory) -> Self { + pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self { layout.cleanup_old_versions(); let all_nongateway_nodes = layout.get_all_nongateway_nodes(); @@ -51,6 +58,11 @@ impl LayoutHelper { let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); + ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); + ack_lock + .entry(layout.current().version) + .or_insert(AtomicUsize::new(0)); + LayoutHelper { layout: Some(layout), ack_map_min, @@ -59,6 +71,7 @@ impl LayoutHelper { all_nongateway_nodes, trackers_hash, staging_hash, + ack_lock, } } @@ -74,7 +87,10 @@ impl LayoutHelper { { let changed = f(&mut self.layout.as_mut().unwrap()); if changed { - *self = Self::new(self.layout.take().unwrap()); + *self = Self::new( + self.layout.take().unwrap(), + std::mem::take(&mut self.ack_lock), + ); } changed } @@ -115,7 +131,7 @@ impl LayoutHelper { .collect() } - pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { self.layout() .versions .iter() @@ -143,42 +159,72 @@ impl LayoutHelper { // ------------------ helpers for update tracking --------------- - pub(crate) fn sync_first(&mut self, node: Uuid) { + pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version which is not currently + // locked by an in-progress write operation + self.ack_max_free(local_node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(local_node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(local_node_id); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } + + fn sync_first(&mut self, local_node_id: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; - self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); } - pub(crate) fn sync_ack(&mut self, node: Uuid) { + fn sync_ack(&mut self, local_node_id: Uuid) { let sync_map_min = self.sync_map_min; self.update(|layout| { layout .update_trackers .sync_ack_map - .set_max(node, sync_map_min) + .set_max(local_node_id, sync_map_min) }); } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_ack = self.max_free_ack(); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_ack) + }); + if changed { + info!("ack_until updated to {}", max_ack); + } + changed } - pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { - // Ensure trackers for this node's values are up-to-date - - // 1. Acknowledge the last layout version in the history - self.ack_last(node_id); - - // 2. Assume the data on this node is sync'ed up at least to - // the first layout version in the history - self.sync_first(node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(node_id); - - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + pub(crate) fn max_free_ack(&self) -> u64 { + self.layout() + .versions + .iter() + .map(|x| x.version) + .take_while(|v| { + self.ack_lock + .get(v) + .map(|x| x.load(Ordering::Relaxed) == 0) + .unwrap_or(true) + }) + .max() + .unwrap_or(self.min_stored()) } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index e270ad21..4e073d1f 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; +use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -74,8 +74,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new(cluster_layout); - cluster_layout.update_trackers_of(node_id.into()); + let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default()); + cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -139,13 +139,36 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { - debug!("sync_until updated to {}", sync_until); + info!("sync_until updated to {}", sync_until); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), )); } } + fn ack_new_version(self: &Arc<Self>) { + let mut layout = self.layout.write().unwrap(); + if layout.ack_max_free(self.node_id) { + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + + // ---- ACK LOCKING ---- + + pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { + let layout = self.layout(); + let version = layout.current().version; + let nodes = layout.write_sets_of(position); + layout + .ack_lock + .get(&version) + .unwrap() + .fetch_add(1, Ordering::Relaxed); + WriteLock::new(version, self, nodes) + } + // ---- INTERNALS --- fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { @@ -154,7 +177,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.update(|l| l.merge(adv)) { - layout.update_trackers_of(self.node_id); + layout.update_trackers(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -168,7 +191,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { - layout.update_trackers_of(self.node_id); + layout.update_trackers(self.node_id); return Some(layout.update_trackers.clone()); } } @@ -317,3 +340,42 @@ impl LayoutManager { Ok(SystemRpc::Ok) } } + +// ---- ack lock ---- + +pub struct WriteLock<T> { + layout_version: u64, + layout_manager: Arc<LayoutManager>, + value: T, +} + +impl<T> WriteLock<T> { + fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self { + Self { + layout_version: version, + layout_manager: layout_manager.clone(), + value, + } + } +} + +impl<T> AsRef<T> for WriteLock<T> { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl<T> Drop for WriteLock<T> { + fn drop(&mut self) { + let layout = self.layout_manager.layout(); // acquire read lock + if let Some(counter) = layout.ack_lock.get(&self.layout_version) { + let prev_lock = counter.fetch_sub(1, Ordering::Relaxed); + if prev_lock == 1 && layout.current().version > self.layout_version { + drop(layout); // release read lock, write lock will be acquired + self.layout_manager.ack_new_version(); + } + } else { + error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version); + } + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 577b32fb..859287c8 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -11,6 +11,7 @@ pub mod manager; // ---- re-exports ---- pub use history::*; +pub use manager::WriteLock; pub use schema::*; pub use version::*; |