diff options
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r-- | src/rpc/layout/manager.rs | 74 |
1 files changed, 68 insertions, 6 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index e270ad21..4e073d1f 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; +use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -74,8 +74,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new(cluster_layout); - cluster_layout.update_trackers_of(node_id.into()); + let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default()); + cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -139,13 +139,36 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { - debug!("sync_until updated to {}", sync_until); + info!("sync_until updated to {}", sync_until); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), )); } } + fn ack_new_version(self: &Arc<Self>) { + let mut layout = self.layout.write().unwrap(); + if layout.ack_max_free(self.node_id) { + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + + // ---- ACK LOCKING ---- + + pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { + let layout = self.layout(); + let version = layout.current().version; + let nodes = layout.write_sets_of(position); + layout + .ack_lock + .get(&version) + .unwrap() + .fetch_add(1, Ordering::Relaxed); + WriteLock::new(version, self, nodes) + } + // ---- INTERNALS --- fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { @@ -154,7 +177,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.update(|l| l.merge(adv)) { - layout.update_trackers_of(self.node_id); + layout.update_trackers(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -168,7 +191,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { - layout.update_trackers_of(self.node_id); + layout.update_trackers(self.node_id); return Some(layout.update_trackers.clone()); } } @@ -317,3 +340,42 @@ impl LayoutManager { Ok(SystemRpc::Ok) } } + +// ---- ack lock ---- + +pub struct WriteLock<T> { + layout_version: u64, + layout_manager: Arc<LayoutManager>, + value: T, +} + +impl<T> WriteLock<T> { + fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self { + Self { + layout_version: version, + layout_manager: layout_manager.clone(), + value, + } + } +} + +impl<T> AsRef<T> for WriteLock<T> { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl<T> Drop for WriteLock<T> { + fn drop(&mut self) { + let layout = self.layout_manager.layout(); // acquire read lock + if let Some(counter) = layout.ack_lock.get(&self.layout_version) { + let prev_lock = counter.fetch_sub(1, Ordering::Relaxed); + if prev_lock == 1 && layout.current().version > self.layout_version { + drop(layout); // release read lock, write lock will be acquired + self.layout_manager.ack_new_version(); + } + } else { + error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version); + } + } +} |