From 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:40:44 +0100 Subject: layou: implement ack locking --- src/table/replication/fullcopy.rs | 4 +++- src/table/replication/parameters.rs | 4 +++- src/table/replication/sharded.rs | 6 ++++-- src/table/sync.rs | 9 ++------- src/table/table.rs | 2 +- 5 files changed, 13 insertions(+), 12 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index cb5471af..df930224 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,6 +27,8 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { + type WriteSets = Vec>; + fn storage_nodes(&self, _hash: &Hash) -> Vec { let layout = self.system.cluster_layout(); layout.current().all_nodes().to_vec() @@ -39,7 +41,7 @@ impl TableReplication for TableFullReplication { 1 } - fn write_sets(&self, hash: &Hash) -> Vec> { + fn write_sets(&self, hash: &Hash) -> Self::WriteSets { vec![self.storage_nodes(hash)] } fn write_quorum(&self) -> usize { diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 2f842409..a4e701bb 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -3,6 +3,8 @@ use garage_util::data::*; /// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync + 'static { + type WriteSets: AsRef>> + Send + Sync + 'static; + // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods @@ -15,7 +17,7 @@ pub trait TableReplication: Send + Sync + 'static { fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_sets(&self, hash: &Hash) -> Vec>; + fn write_sets(&self, hash: &Hash) -> Self::WriteSets; /// Responses needed to consider a write succesfull in each set fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 1320a189..2a16bc0c 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -25,6 +25,8 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { + type WriteSets = WriteLock>>; + fn storage_nodes(&self, hash: &Hash) -> Vec { self.system.cluster_layout().storage_nodes_of(hash) } @@ -36,8 +38,8 @@ impl TableReplication for TableShardedReplication { self.read_quorum } - fn write_sets(&self, hash: &Hash) -> Vec> { - self.system.cluster_layout().write_sets_of(hash) + fn write_sets(&self, hash: &Hash) -> Self::WriteSets { + self.system.layout_manager.write_sets_of(hash) } fn write_quorum(&self) -> usize { self.write_quorum diff --git a/src/table/sync.rs b/src/table/sync.rs index b67cdd79..efeac402 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -173,12 +173,7 @@ impl TableSyncer { } if !items.is_empty() { - let nodes = self - .data - .replication - .storage_nodes(begin) - .into_iter() - .collect::>(); + let nodes = self.data.replication.storage_nodes(begin); if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", @@ -202,7 +197,7 @@ impl TableSyncer { end, counter ); - self.offload_items(&items, &nodes[..]).await?; + self.offload_items(&items, &nodes).await?; } else { break; } diff --git a/src/table/table.rs b/src/table/table.rs index c2efaeaf..5ec9eb0a 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -128,7 +128,7 @@ impl Table { .rpc_helper() .try_write_many_sets( &self.endpoint, - &who, + who.as_ref(), rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), -- cgit v1.2.3