aboutsummaryrefslogtreecommitdiff
path: root/src/table/replication/fullcopy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/replication/fullcopy.rs')
-rw-r--r--src/table/replication/fullcopy.rs52
1 files changed, 37 insertions, 15 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 18682ace..1e52bb47 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,24 +1,36 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>,
- /// Max number of faults allowed while replicating a record
- pub max_faults: usize,
}
impl TableReplication for TableFullReplication {
+ type WriteSets = Vec<Vec<Uuid>>;
+
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -26,26 +38,36 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.layout.node_ids().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.ring.borrow().layout.node_ids().len();
- if nmembers > self.max_faults {
- nmembers - self.max_faults
+ let nmembers = self.system.cluster_layout().current().all_nodes().len();
+
+ let max_faults = if nmembers > 1 { 1 } else { 0 };
+
+ if nmembers > max_faults {
+ nmembers - max_faults
} else {
1
}
}
- fn max_write_errors(&self) -> usize {
- self.max_faults
- }
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_sets: vec![layout.current().all_nodes().to_vec()],
+ }],
+ }
}
}