aboutsummaryrefslogtreecommitdiff
path: root/src/table/replication
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-04-10 15:23:12 +0000
committerAlex <alex@adnab.me>2024-04-10 15:23:12 +0000
commit1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e (patch)
tree47e42c4e6ae47590fbb5c8f94e90a23bf04c1674 /src/table/replication
parentb47706809cc9d28d1328bafdf9756e96388cca24 (diff)
parentff093ddbb8485409f389abe7b5e569cb38d222d2 (diff)
downloadgarage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.tar.gz
garage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.zip
Merge pull request 'Garage v1.0' (#683) from next-0.10 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/683
Diffstat (limited to 'src/table/replication')
-rw-r--r--src/table/replication/fullcopy.rs52
-rw-r--r--src/table/replication/parameters.rs30
-rw-r--r--src/table/replication/sharded.rs54
3 files changed, 103 insertions, 33 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 18682ace..1e52bb47 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,24 +1,36 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>,
- /// Max number of faults allowed while replicating a record
- pub max_faults: usize,
}
impl TableReplication for TableFullReplication {
+ type WriteSets = Vec<Vec<Uuid>>;
+
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -26,26 +38,36 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.layout.node_ids().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.ring.borrow().layout.node_ids().len();
- if nmembers > self.max_faults {
- nmembers - self.max_faults
+ let nmembers = self.system.cluster_layout().current().all_nodes().len();
+
+ let max_faults = if nmembers > 1 { 1 } else { 0 };
+
+ if nmembers > max_faults {
+ nmembers - max_faults
} else {
1
}
}
- fn max_write_errors(&self) -> usize {
- self.max_faults
- }
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_sets: vec![layout.current().all_nodes().to_vec()],
+ }],
+ }
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index f00815a2..682c1ea6 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,25 +1,43 @@
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
+ type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
+
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ /// The entire list of all nodes that store a partition
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
- /// Responses needed to consider a write succesfull
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
+ /// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
- fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
- /// List of existing partitions
- fn partitions(&self) -> Vec<(Partition, Hash)>;
+ /// List of partitions and nodes to sync with in current layout
+ fn sync_partitions(&self) -> SyncPartitions;
+}
+
+#[derive(Debug)]
+pub struct SyncPartitions {
+ pub layout_version: u64,
+ pub partitions: Vec<SyncPartition>,
+}
+
+#[derive(Debug)]
+pub struct SyncPartition {
+ pub partition: Partition,
+ pub first_hash: Hash,
+ pub last_hash: Hash,
+ pub storage_sets: Vec<Vec<Uuid>>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 1cf964af..e0245949 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -25,29 +25,59 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
+
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ self.system.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
}
- fn max_write_errors(&self) -> usize {
- self.replication_factor - self.write_quorum
- }
fn partition_of(&self, hash: &Hash) -> Partition {
- self.system.ring.borrow().partition_of(hash)
+ self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.ring.borrow().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.ack_map_min();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let storage_sets = layout.storage_sets_of(&first_hash);
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_sets,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}