aboutsummaryrefslogtreecommitdiff
path: root/src/table/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/replication')
-rw-r--r--src/table/replication/fullcopy.rs25
-rw-r--r--src/table/replication/parameters.rs19
-rw-r--r--src/table/replication/sharded.rs39
3 files changed, 76 insertions, 7 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index a5c83d0f..5653a229 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,3 +1,4 @@
+use std::iter::FromIterator;
use std::sync::Arc;
use garage_rpc::layout::*;
@@ -6,10 +7,17 @@ use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
@@ -44,7 +52,18 @@ impl TableReplication for TableFullReplication {
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()),
+ }],
+ }
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 19b306f2..2a7d3585 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -20,6 +20,21 @@ pub trait TableReplication: Send + Sync + 'static {
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
- /// List of existing partitions
- fn partitions(&self) -> Vec<(Partition, Hash)>;
+
+ /// List of partitions and nodes to sync with in current layout
+ fn sync_partitions(&self) -> SyncPartitions;
+}
+
+#[derive(Debug)]
+pub struct SyncPartitions {
+ pub layout_version: u64,
+ pub partitions: Vec<SyncPartition>,
+}
+
+#[derive(Debug)]
+pub struct SyncPartition {
+ pub partition: Partition,
+ pub first_hash: Hash,
+ pub last_hash: Hash,
+ pub storage_nodes: Vec<Uuid>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 793d87fd..f02b1d66 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication {
fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.cluster_layout().current().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.all_ack();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let mut storage_nodes = layout
+ .write_sets_of(&first_hash)
+ .map(|x| x.into_iter())
+ .flatten()
+ .collect::<Vec<_>>();
+ storage_nodes.sort();
+ storage_nodes.dedup();
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_nodes,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}