aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r--src/rpc/layout/manager.rs74
1 files changed, 68 insertions, 6 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index e270ad21..4e073d1f 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -1,5 +1,5 @@
use std::collections::HashMap;
-use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
+use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration;
use serde::{Deserialize, Serialize};
@@ -74,8 +74,8 @@ impl LayoutManager {
}
};
- let mut cluster_layout = LayoutHelper::new(cluster_layout);
- cluster_layout.update_trackers_of(node_id.into());
+ let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default());
+ cluster_layout.update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
let change_notify = Arc::new(Notify::new());
@@ -139,13 +139,36 @@ impl LayoutManager {
let mut layout = self.layout.write().unwrap();
if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
- debug!("sync_until updated to {}", sync_until);
+ info!("sync_until updated to {}", sync_until);
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.update_trackers.clone(),
));
}
}
+ fn ack_new_version(self: &Arc<Self>) {
+ let mut layout = self.layout.write().unwrap();
+ if layout.ack_max_free(self.node_id) {
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
+ layout.update_trackers.clone(),
+ ));
+ }
+ }
+
+ // ---- ACK LOCKING ----
+
+ pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
+ let layout = self.layout();
+ let version = layout.current().version;
+ let nodes = layout.write_sets_of(position);
+ layout
+ .ack_lock
+ .get(&version)
+ .unwrap()
+ .fetch_add(1, Ordering::Relaxed);
+ WriteLock::new(version, self, nodes)
+ }
+
// ---- INTERNALS ---
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
@@ -154,7 +177,7 @@ impl LayoutManager {
if !prev_layout_check || adv.check().is_ok() {
if layout.update(|l| l.merge(adv)) {
- layout.update_trackers_of(self.node_id);
+ layout.update_trackers(self.node_id);
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
}
@@ -168,7 +191,7 @@ impl LayoutManager {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) {
- layout.update_trackers_of(self.node_id);
+ layout.update_trackers(self.node_id);
return Some(layout.update_trackers.clone());
}
}
@@ -317,3 +340,42 @@ impl LayoutManager {
Ok(SystemRpc::Ok)
}
}
+
+// ---- ack lock ----
+
+pub struct WriteLock<T> {
+ layout_version: u64,
+ layout_manager: Arc<LayoutManager>,
+ value: T,
+}
+
+impl<T> WriteLock<T> {
+ fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self {
+ Self {
+ layout_version: version,
+ layout_manager: layout_manager.clone(),
+ value,
+ }
+ }
+}
+
+impl<T> AsRef<T> for WriteLock<T> {
+ fn as_ref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> Drop for WriteLock<T> {
+ fn drop(&mut self) {
+ let layout = self.layout_manager.layout(); // acquire read lock
+ if let Some(counter) = layout.ack_lock.get(&self.layout_version) {
+ let prev_lock = counter.fetch_sub(1, Ordering::Relaxed);
+ if prev_lock == 1 && layout.current().version > self.layout_version {
+ drop(layout); // release read lock, write lock will be acquired
+ self.layout_manager.ack_new_version();
+ }
+ } else {
+ error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version);
+ }
+ }
+}