diff options
Diffstat (limited to 'src/table/replication/sharded.rs')
-rw-r--r-- | src/table/replication/sharded.rs | 39 |
1 files changed, 37 insertions, 2 deletions
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 793d87fd..f02b1d66 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication { fn partition_of(&self, hash: &Hash) -> Partition { self.system.cluster_layout().current().partition_of(hash) } - fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.cluster_layout().current().partitions() + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.all_ack(); + + let mut partitions = layout + .current() + .partitions() + .map(|(partition, first_hash)| { + let mut storage_nodes = layout + .write_sets_of(&first_hash) + .map(|x| x.into_iter()) + .flatten() + .collect::<Vec<_>>(); + storage_nodes.sort(); + storage_nodes.dedup(); + SyncPartition { + partition, + first_hash, + last_hash: [0u8; 32].into(), // filled in just after + storage_nodes, + } + }) + .collect::<Vec<_>>(); + + for i in 0..partitions.len() { + partitions[i].last_hash = if i + 1 < partitions.len() { + partitions[i + 1].first_hash + } else { + [0xFFu8; 32].into() + }; + } + + SyncPartitions { + layout_version, + partitions, + } } } |