aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/layout/helper.rs2
-rw-r--r--src/rpc/layout/manager.rs2
-rw-r--r--src/table/replication/fullcopy.rs3
-rw-r--r--src/table/replication/parameters.rs2
-rw-r--r--src/table/replication/sharded.rs4
-rw-r--r--src/table/sync.rs51
6 files changed, 36 insertions, 28 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 881a039e..0aa7c6aa 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -180,7 +180,7 @@ impl LayoutHelper {
ret
}
- pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions
.iter()
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 17465019..dc963ba0 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -139,7 +139,7 @@ impl LayoutManager {
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
let layout = self.layout();
let version = layout.current().version;
- let nodes = layout.write_sets_of(position);
+ let nodes = layout.storage_sets_of(position);
layout
.ack_lock
.get(&version)
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index df930224..30122f39 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,4 +1,3 @@
-use std::iter::FromIterator;
use std::sync::Arc;
use garage_rpc::layout::*;
@@ -69,7 +68,7 @@ impl TableReplication for TableFullReplication {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
- storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()),
+ storage_sets: vec![layout.current().all_nodes().to_vec()],
}],
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index db11ff5f..78470f35 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -40,5 +40,5 @@ pub struct SyncPartition {
pub partition: Partition,
pub first_hash: Hash,
pub last_hash: Hash,
- pub storage_nodes: Vec<Uuid>,
+ pub storage_sets: Vec<Vec<Uuid>>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 2a16bc0c..55d0029d 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -60,12 +60,12 @@ impl TableReplication for TableShardedReplication {
.current()
.partitions()
.map(|(partition, first_hash)| {
- let storage_nodes = layout.storage_nodes_of(&first_hash);
+ let storage_sets = layout.storage_sets_of(&first_hash);
SyncPartition {
partition,
first_hash,
last_hash: [0u8; 32].into(), // filled in just after
- storage_nodes,
+ storage_sets,
}
})
.collect::<Vec<_>>();
diff --git a/src/table/sync.rs b/src/table/sync.rs
index efeac402..cfcbc4b5 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -18,6 +18,7 @@ use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};
use garage_rpc::layout::*;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -106,44 +107,52 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
let my_id = self.system.id;
- let retain = partition.storage_nodes.contains(&my_id);
+ let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id));
if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
- partition.storage_nodes
+ partition.storage_sets
);
- let mut sync_futures = partition
- .storage_nodes
+ let mut result_tracker = QuorumSetResultTracker::new(
+ &partition.storage_sets,
+ self.data.replication.write_quorum(),
+ );
+
+ let mut sync_futures = result_tracker
+ .nodes
.iter()
- .filter(|node| **node != my_id)
+ .map(|(node, _)| *node)
.map(|node| {
- self.clone()
- .do_sync_with(&partition, *node, must_exit.clone())
+ let must_exit = must_exit.clone();
+ async move {
+ if node == my_id {
+ (node, Ok(()))
+ } else {
+ (node, self.do_sync_with(&partition, node, must_exit).await)
+ }
+ }
})
.collect::<FuturesUnordered<_>>();
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", F::TABLE_NAME, e);
+ while let Some((node, res)) = sync_futures.next().await {
+ if let Err(e) = &res {
+ warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e);
}
+ result_tracker.register_result(node, res);
}
- if n_errors > 0 {
- return Err(Error::Message(format!(
- "Sync failed with {} nodes.",
- n_errors
- )));
+
+ if result_tracker.too_many_failures() {
+ return Err(result_tracker.quorum_error());
+ } else {
+ Ok(())
}
} else {
self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
- .await?;
+ .await
}
-
- Ok(())
}
// Offload partition: this partition is not something we are storing,
@@ -264,7 +273,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
async fn do_sync_with(
- self: Arc<Self>,
+ self: &Arc<Self>,
partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,