aboutsummaryrefslogtreecommitdiff
path: root/src/table/replication
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-01-11 10:58:08 +0000
committerAlex <alex@adnab.me>2024-01-11 10:58:08 +0000
commit8a6ec1d6111a60e602c90ade2200b2dab5733fe3 (patch)
treeb8daac4f41050339c87106d72ce7224f7eef38aa /src/table/replication
parent723e56b37f13f078a15e067343191fb1bf96e8b2 (diff)
parent0041b013a473e3ae72f50209d8f79db75a72848b (diff)
downloadgarage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.tar.gz
garage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.zip
Merge pull request 'NLnet task 3' (#667) from nlnet-task3 into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/667
Diffstat (limited to 'src/table/replication')
-rw-r--r--src/table/replication/fullcopy.rs40
-rw-r--r--src/table/replication/parameters.rs29
-rw-r--r--src/table/replication/sharded.rs51
3 files changed, 98 insertions, 22 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 18682ace..30122f39 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,15 +1,22 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
@@ -19,6 +26,13 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
+ type WriteSets = Vec<Vec<Uuid>>;
+
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -26,12 +40,11 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.layout.node_ids().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.ring.borrow().layout.node_ids().len();
+ let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
@@ -45,7 +58,18 @@ impl TableReplication for TableFullReplication {
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_sets: vec![layout.current().all_nodes().to_vec()],
+ }],
+ }
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index f00815a2..78470f35 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,25 +1,44 @@
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
+ type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
+
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ /// The entire list of all nodes that store a partition
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
- /// Responses needed to consider a write succesfull
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
+ /// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
- /// List of existing partitions
- fn partitions(&self) -> Vec<(Partition, Hash)>;
+ /// List of partitions and nodes to sync with in current layout
+ fn sync_partitions(&self) -> SyncPartitions;
+}
+
+#[derive(Debug)]
+pub struct SyncPartitions {
+ pub layout_version: u64,
+ pub partitions: Vec<SyncPartition>,
+}
+
+#[derive(Debug)]
+pub struct SyncPartition {
+ pub partition: Partition,
+ pub first_hash: Hash,
+ pub last_hash: Hash,
+ pub storage_sets: Vec<Vec<Uuid>>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 1cf964af..8ba3700f 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -25,17 +25,21 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
+
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ self.system.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -45,9 +49,38 @@ impl TableReplication for TableShardedReplication {
}
fn partition_of(&self, hash: &Hash) -> Partition {
- self.system.ring.borrow().partition_of(hash)
+ self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.ring.borrow().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.ack_map_min();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let storage_sets = layout.storage_sets_of(&first_hash);
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_sets,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}