aboutsummaryrefslogtreecommitdiff
path: root/src/table/replication/sharded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/replication/sharded.rs')
-rw-r--r--src/table/replication/sharded.rs39
1 files changed, 37 insertions, 2 deletions
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 793d87fd..f02b1d66 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication {
fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.cluster_layout().current().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.all_ack();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let mut storage_nodes = layout
+ .write_sets_of(&first_hash)
+ .map(|x| x.into_iter())
+ .flatten()
+ .collect::<Vec<_>>();
+ storage_nodes.sort();
+ storage_nodes.dedup();
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_nodes,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}