aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/history.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r--src/rpc/layout/history.rs98
1 files changed, 72 insertions, 26 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index b6f0e495..dd38efa7 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,5 +1,7 @@
+use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
@@ -21,6 +23,11 @@ pub struct LayoutHelper {
trackers_hash: Hash,
staging_hash: Hash,
+
+ // ack lock: counts in-progress write operations for each
+ // layout version ; we don't increase the ack update tracker
+ // while this lock is nonzero
+ pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
}
impl Deref for LayoutHelper {
@@ -31,7 +38,7 @@ impl Deref for LayoutHelper {
}
impl LayoutHelper {
- pub fn new(mut layout: LayoutHistory) -> Self {
+ pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
layout.cleanup_old_versions();
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
@@ -51,6 +58,11 @@ impl LayoutHelper {
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
+ ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
+ ack_lock
+ .entry(layout.current().version)
+ .or_insert(AtomicUsize::new(0));
+
LayoutHelper {
layout: Some(layout),
ack_map_min,
@@ -59,6 +71,7 @@ impl LayoutHelper {
all_nongateway_nodes,
trackers_hash,
staging_hash,
+ ack_lock,
}
}
@@ -74,7 +87,10 @@ impl LayoutHelper {
{
let changed = f(&mut self.layout.as_mut().unwrap());
if changed {
- *self = Self::new(self.layout.take().unwrap());
+ *self = Self::new(
+ self.layout.take().unwrap(),
+ std::mem::take(&mut self.ack_lock),
+ );
}
changed
}
@@ -115,7 +131,7 @@ impl LayoutHelper {
.collect()
}
- pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions
.iter()
@@ -143,42 +159,72 @@ impl LayoutHelper {
// ------------------ helpers for update tracking ---------------
- pub(crate) fn sync_first(&mut self, node: Uuid) {
+ pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version which is not currently
+ // locked by an in-progress write operation
+ self.ack_max_free(local_node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(local_node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(local_node_id);
+
+ info!("ack_map: {:?}", self.update_trackers.ack_map);
+ info!("sync_map: {:?}", self.update_trackers.sync_map);
+ info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+
+ fn sync_first(&mut self, local_node_id: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version;
- self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version));
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_map
+ .set_max(local_node_id, first_version)
+ });
}
- pub(crate) fn sync_ack(&mut self, node: Uuid) {
+ fn sync_ack(&mut self, local_node_id: Uuid) {
let sync_map_min = self.sync_map_min;
self.update(|layout| {
layout
.update_trackers
.sync_ack_map
- .set_max(node, sync_map_min)
+ .set_max(local_node_id, sync_map_min)
});
}
- pub(crate) fn ack_last(&mut self, node: Uuid) {
- let last_version = self.current().version;
- self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version));
+ pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
+ let max_ack = self.max_free_ack();
+ let changed = self.update(|layout| {
+ layout
+ .update_trackers
+ .ack_map
+ .set_max(local_node_id, max_ack)
+ });
+ if changed {
+ info!("ack_until updated to {}", max_ack);
+ }
+ changed
}
- pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
- // Ensure trackers for this node's values are up-to-date
-
- // 1. Acknowledge the last layout version in the history
- self.ack_last(node_id);
-
- // 2. Assume the data on this node is sync'ed up at least to
- // the first layout version in the history
- self.sync_first(node_id);
-
- // 3. Acknowledge everyone has synced up to min(self.sync_map)
- self.sync_ack(node_id);
-
- info!("ack_map: {:?}", self.update_trackers.ack_map);
- info!("sync_map: {:?}", self.update_trackers.sync_map);
- info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ pub(crate) fn max_free_ack(&self) -> u64 {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.version)
+ .take_while(|v| {
+ self.ack_lock
+ .get(v)
+ .map(|x| x.load(Ordering::Relaxed) == 0)
+ .unwrap_or(true)
+ })
+ .max()
+ .unwrap_or(self.min_stored())
}
}