aboutsummaryrefslogtreecommitdiff
path: root/src/table/replication/sharded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/replication/sharded.rs')
-rw-r--r--src/table/replication/sharded.rs51
1 files changed, 42 insertions, 9 deletions
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 1cf964af..8ba3700f 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -25,17 +25,21 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
+
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ self.system.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -45,9 +49,38 @@ impl TableReplication for TableShardedReplication {
}
fn partition_of(&self, hash: &Hash) -> Partition {
- self.system.ring.borrow().partition_of(hash)
+ self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.ring.borrow().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.ack_map_min();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let storage_sets = layout.storage_sets_of(&first_hash);
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_sets,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}