From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/table/replication/fullcopy.rs | 9 +++++++-- src/table/replication/parameters.rs | 8 +++++--- src/table/replication/sharded.rs | 24 ++++++++---------------- 3 files changed, 20 insertions(+), 21 deletions(-) (limited to 'src/table/replication') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index beaacc2b..cb5471af 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,6 +27,11 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { + fn storage_nodes(&self, _hash: &Hash) -> Vec { + let layout = self.system.cluster_layout(); + layout.current().all_nodes().to_vec() + } + fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } @@ -34,8 +39,8 @@ impl TableReplication for TableFullReplication { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec { - self.system.cluster_layout().current().all_nodes().to_vec() + fn write_sets(&self, hash: &Hash) -> Vec> { + vec![self.storage_nodes(hash)] } fn write_quorum(&self) -> usize { let nmembers = self.system.cluster_layout().current().all_nodes().len(); diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 2a7d3585..2f842409 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -6,21 +6,23 @@ pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods + /// The entire list of all nodes that store a partition + fn storage_nodes(&self, hash: &Hash) -> Vec; + /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec; - /// Responses needed to consider a write succesfull + fn write_sets(&self, hash: &Hash) -> Vec>; + /// Responses needed to consider a write succesfull in each set fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; - /// List of partitions and nodes to sync with in current layout fn sync_partitions(&self) -> SyncPartitions; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index f02b1d66..1320a189 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -25,21 +25,19 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { + fn storage_nodes(&self, hash: &Hash) -> Vec { + self.system.cluster_layout().storage_nodes_of(hash) + } + fn read_nodes(&self, hash: &Hash) -> Vec { - self.system - .cluster_layout() - .current() - .nodes_of(hash, self.replication_factor) + self.system.cluster_layout().read_nodes_of(hash) } fn read_quorum(&self) -> usize { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec { - self.system - .cluster_layout() - .current() - .nodes_of(hash, self.replication_factor) + fn write_sets(&self, hash: &Hash) -> Vec> { + self.system.cluster_layout().write_sets_of(hash) } fn write_quorum(&self) -> usize { self.write_quorum @@ -60,13 +58,7 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let mut storage_nodes = layout - .write_sets_of(&first_hash) - .map(|x| x.into_iter()) - .flatten() - .collect::>(); - storage_nodes.sort(); - storage_nodes.dedup(); + let storage_nodes = layout.storage_nodes_of(&first_hash); SyncPartition { partition, first_hash, -- cgit v1.2.3