aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-15 15:40:44 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-15 15:40:44 +0100
commit33c8a489b0a9c0e869282bfc19c548f5a3e02e8c (patch)
tree5bfe599b2ce2c0e558d9fb244647eccda9164f88
parent393c4d4515e0cdadadc8de8ae2df12e4371cff88 (diff)
downloadgarage-33c8a489b0a9c0e869282bfc19c548f5a3e02e8c.tar.gz
garage-33c8a489b0a9c0e869282bfc19c548f5a3e02e8c.zip
layou: implement ack locking
-rw-r--r--src/block/manager.rs2
-rw-r--r--src/garage/cli/layout.rs2
-rw-r--r--src/rpc/layout/history.rs98
-rw-r--r--src/rpc/layout/manager.rs74
-rw-r--r--src/rpc/layout/mod.rs1
-rw-r--r--src/table/replication/fullcopy.rs4
-rw-r--r--src/table/replication/parameters.rs4
-rw-r--r--src/table/replication/sharded.rs6
-rw-r--r--src/table/sync.rs9
-rw-r--r--src/table/table.rs2
10 files changed, 156 insertions, 46 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 0ca8bc31..be2e4951 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -366,7 +366,7 @@ impl BlockManager {
.rpc_helper()
.try_write_many_sets(
&self.endpoint,
- &who,
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 51774314..0be8278f 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -329,7 +329,7 @@ pub async fn fetch_layout(
pub async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
- mut layout: LayoutHistory,
+ layout: LayoutHistory,
) -> Result<(), Error> {
rpc_cli
.call(
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index b6f0e495..dd38efa7 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,5 +1,7 @@
+use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
@@ -21,6 +23,11 @@ pub struct LayoutHelper {
trackers_hash: Hash,
staging_hash: Hash,
+
+ // ack lock: counts in-progress write operations for each
+ // layout version ; we don't increase the ack update tracker
+ // while this lock is nonzero
+ pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
}
impl Deref for LayoutHelper {
@@ -31,7 +38,7 @@ impl Deref for LayoutHelper {
}
impl LayoutHelper {
- pub fn new(mut layout: LayoutHistory) -> Self {
+ pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
layout.cleanup_old_versions();
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
@@ -51,6 +58,11 @@ impl LayoutHelper {
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
+ ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
+ ack_lock
+ .entry(layout.current().version)
+ .or_insert(AtomicUsize::new(0));
+
LayoutHelper {
layout: Some(layout),
ack_map_min,
@@ -59,6 +71,7 @@ impl LayoutHelper {
all_nongateway_nodes,
trackers_hash,
staging_hash,
+ ack_lock,
}
}
@@ -74,7 +87,10 @@ impl LayoutHelper {
{
let changed = f(&mut self.layout.as_mut().unwrap());
if changed {
- *self = Self::new(self.layout.take().unwrap());
+ *self = Self::new(
+ self.layout.take().unwrap(),
+ std::mem::take(&mut self.ack_lock),
+ );
}
changed
}
@@ -115,7 +131,7 @@ impl LayoutHelper {
.collect()
}
- pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions
.iter()
@@ -143,42 +159,72 @@ impl LayoutHelper {
// ------------------ helpers for update tracking ---------------
- pub(crate) fn sync_first(&mut self, node: Uuid) {
+ pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version which is not currently
+ // locked by an in-progress write operation
+ self.ack_max_free(local_node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(local_node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(local_node_id);
+
+ info!("ack_map: {:?}", self.update_trackers.ack_map);
+ info!("sync_map: {:?}", self.update_trackers.sync_map);
+ info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+
+ fn sync_first(&mut self, local_node_id: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version;
- self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version));
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_map
+ .set_max(local_node_id, first_version)
+ });
}
- pub(crate) fn sync_ack(&mut self, node: Uuid) {
+ fn sync_ack(&mut self, local_node_id: Uuid) {
let sync_map_min = self.sync_map_min;
self.update(|layout| {
layout
.update_trackers
.sync_ack_map
- .set_max(node, sync_map_min)
+ .set_max(local_node_id, sync_map_min)
});
}
- pub(crate) fn ack_last(&mut self, node: Uuid) {
- let last_version = self.current().version;
- self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version));
+ pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
+ let max_ack = self.max_free_ack();
+ let changed = self.update(|layout| {
+ layout
+ .update_trackers
+ .ack_map
+ .set_max(local_node_id, max_ack)
+ });
+ if changed {
+ info!("ack_until updated to {}", max_ack);
+ }
+ changed
}
- pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
- // Ensure trackers for this node's values are up-to-date
-
- // 1. Acknowledge the last layout version in the history
- self.ack_last(node_id);
-
- // 2. Assume the data on this node is sync'ed up at least to
- // the first layout version in the history
- self.sync_first(node_id);
-
- // 3. Acknowledge everyone has synced up to min(self.sync_map)
- self.sync_ack(node_id);
-
- info!("ack_map: {:?}", self.update_trackers.ack_map);
- info!("sync_map: {:?}", self.update_trackers.sync_map);
- info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ pub(crate) fn max_free_ack(&self) -> u64 {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.version)
+ .take_while(|v| {
+ self.ack_lock
+ .get(v)
+ .map(|x| x.load(Ordering::Relaxed) == 0)
+ .unwrap_or(true)
+ })
+ .max()
+ .unwrap_or(self.min_stored())
}
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index e270ad21..4e073d1f 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -1,5 +1,5 @@
use std::collections::HashMap;
-use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
+use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration;
use serde::{Deserialize, Serialize};
@@ -74,8 +74,8 @@ impl LayoutManager {
}
};
- let mut cluster_layout = LayoutHelper::new(cluster_layout);
- cluster_layout.update_trackers_of(node_id.into());
+ let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default());
+ cluster_layout.update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
let change_notify = Arc::new(Notify::new());
@@ -139,13 +139,36 @@ impl LayoutManager {
let mut layout = self.layout.write().unwrap();
if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
- debug!("sync_until updated to {}", sync_until);
+ info!("sync_until updated to {}", sync_until);
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.update_trackers.clone(),
));
}
}
+ fn ack_new_version(self: &Arc<Self>) {
+ let mut layout = self.layout.write().unwrap();
+ if layout.ack_max_free(self.node_id) {
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
+ layout.update_trackers.clone(),
+ ));
+ }
+ }
+
+ // ---- ACK LOCKING ----
+
+ pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
+ let layout = self.layout();
+ let version = layout.current().version;
+ let nodes = layout.write_sets_of(position);
+ layout
+ .ack_lock
+ .get(&version)
+ .unwrap()
+ .fetch_add(1, Ordering::Relaxed);
+ WriteLock::new(version, self, nodes)
+ }
+
// ---- INTERNALS ---
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
@@ -154,7 +177,7 @@ impl LayoutManager {
if !prev_layout_check || adv.check().is_ok() {
if layout.update(|l| l.merge(adv)) {
- layout.update_trackers_of(self.node_id);
+ layout.update_trackers(self.node_id);
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
}
@@ -168,7 +191,7 @@ impl LayoutManager {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) {
- layout.update_trackers_of(self.node_id);
+ layout.update_trackers(self.node_id);
return Some(layout.update_trackers.clone());
}
}
@@ -317,3 +340,42 @@ impl LayoutManager {
Ok(SystemRpc::Ok)
}
}
+
+// ---- ack lock ----
+
+pub struct WriteLock<T> {
+ layout_version: u64,
+ layout_manager: Arc<LayoutManager>,
+ value: T,
+}
+
+impl<T> WriteLock<T> {
+ fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self {
+ Self {
+ layout_version: version,
+ layout_manager: layout_manager.clone(),
+ value,
+ }
+ }
+}
+
+impl<T> AsRef<T> for WriteLock<T> {
+ fn as_ref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> Drop for WriteLock<T> {
+ fn drop(&mut self) {
+ let layout = self.layout_manager.layout(); // acquire read lock
+ if let Some(counter) = layout.ack_lock.get(&self.layout_version) {
+ let prev_lock = counter.fetch_sub(1, Ordering::Relaxed);
+ if prev_lock == 1 && layout.current().version > self.layout_version {
+ drop(layout); // release read lock, write lock will be acquired
+ self.layout_manager.ack_new_version();
+ }
+ } else {
+ error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version);
+ }
+ }
+}
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index 577b32fb..859287c8 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -11,6 +11,7 @@ pub mod manager;
// ---- re-exports ----
pub use history::*;
+pub use manager::WriteLock;
pub use schema::*;
pub use version::*;
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index cb5471af..df930224 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -27,6 +27,8 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
+ type WriteSets = Vec<Vec<Uuid>>;
+
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout();
layout.current().all_nodes().to_vec()
@@ -39,7 +41,7 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 2f842409..a4e701bb 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -3,6 +3,8 @@ use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
+ type WriteSets: AsRef<Vec<Vec<Uuid>>> + Send + Sync + 'static;
+
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
@@ -15,7 +17,7 @@ pub trait TableReplication: Send + Sync + 'static {
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>>;
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
/// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 1320a189..2a16bc0c 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -25,6 +25,8 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
+
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().storage_nodes_of(hash)
}
@@ -36,8 +38,8 @@ impl TableReplication for TableShardedReplication {
self.read_quorum
}
- fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
- self.system.cluster_layout().write_sets_of(hash)
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ self.system.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
diff --git a/src/table/sync.rs b/src/table/sync.rs
index b67cdd79..efeac402 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -173,12 +173,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
if !items.is_empty() {
- let nodes = self
- .data
- .replication
- .storage_nodes(begin)
- .into_iter()
- .collect::<Vec<_>>();
+ let nodes = self.data.replication.storage_nodes(begin);
if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
@@ -202,7 +197,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
end,
counter
);
- self.offload_items(&items, &nodes[..]).await?;
+ self.offload_items(&items, &nodes).await?;
} else {
break;
}
diff --git a/src/table/table.rs b/src/table/table.rs
index c2efaeaf..5ec9eb0a 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -128,7 +128,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
.rpc_helper()
.try_write_many_sets(
&self.endpoint,
- &who,
+ who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),