diff options
Diffstat (limited to 'src/table/replication')
-rw-r--r-- | src/table/replication/fullcopy.rs | 25 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 19 | ||||
-rw-r--r-- | src/table/replication/sharded.rs | 39 |
3 files changed, 76 insertions, 7 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a5c83d0f..5653a229 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,3 +1,4 @@ +use std::iter::FromIterator; use std::sync::Arc; use garage_rpc::layout::*; @@ -6,10 +7,17 @@ use garage_util::data::*; use crate::replication::*; +// TODO: find a way to track layout changes for this as well +// The hard thing is that this data is stored also on gateway nodes, +// whereas sharded data is stored only on non-Gateway nodes (storage nodes) +// Also we want to be more tolerant to failures of gateways so we don't +// want to do too much holding back of data when progress of gateway +// nodes is not reported in the layout history's ack/sync/sync_ack maps. + /// Full replication schema: all nodes store everything -/// Writes are disseminated in an epidemic manner in the network /// Advantage: do all reads locally, extremely fast /// Inconvenient: only suitable to reasonably small tables +/// Inconvenient: if some writes fail, nodes will read outdated data #[derive(Clone)] pub struct TableFullReplication { /// The membership manager of this node @@ -44,7 +52,18 @@ impl TableReplication for TableFullReplication { fn partition_of(&self, _hash: &Hash) -> Partition { 0u16 } - fn partitions(&self) -> Vec<(Partition, Hash)> { - vec![(0u16, [0u8; 32].into())] + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.current().version; + SyncPartitions { + layout_version, + partitions: vec![SyncPartition { + partition: 0u16, + first_hash: [0u8; 32].into(), + last_hash: [0xff; 32].into(), + storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()), + }], + } } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 19b306f2..2a7d3585 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -20,6 +20,21 @@ pub trait TableReplication: Send + Sync + 'static { // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; - /// List of existing partitions - fn partitions(&self) -> Vec<(Partition, Hash)>; + + /// List of partitions and nodes to sync with in current layout + fn sync_partitions(&self) -> SyncPartitions; +} + +#[derive(Debug)] +pub struct SyncPartitions { + pub layout_version: u64, + pub partitions: Vec<SyncPartition>, +} + +#[derive(Debug)] +pub struct SyncPartition { + pub partition: Partition, + pub first_hash: Hash, + pub last_hash: Hash, + pub storage_nodes: Vec<Uuid>, } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 793d87fd..f02b1d66 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication { fn partition_of(&self, hash: &Hash) -> Partition { self.system.cluster_layout().current().partition_of(hash) } - fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.cluster_layout().current().partitions() + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.all_ack(); + + let mut partitions = layout + .current() + .partitions() + .map(|(partition, first_hash)| { + let mut storage_nodes = layout + .write_sets_of(&first_hash) + .map(|x| x.into_iter()) + .flatten() + .collect::<Vec<_>>(); + storage_nodes.sort(); + storage_nodes.dedup(); + SyncPartition { + partition, + first_hash, + last_hash: [0u8; 32].into(), // filled in just after + storage_nodes, + } + }) + .collect::<Vec<_>>(); + + for i in 0..partitions.len() { + partitions[i].last_hash = if i + 1 < partitions.len() { + partitions[i + 1].first_hash + } else { + [0xFFu8; 32].into() + }; + } + + SyncPartitions { + layout_version, + partitions, + } } } |