aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-12-07 10:55:15 +0100
committerAlex Auvolat <alex@adnab.me>2023-12-07 10:57:21 +0100
commit95eb13eb08d517d328e3c8aeb222440a27211ee9 (patch)
tree4216b2172cb973f404b56b51546824cbfa966a7a /src/table/table.rs
parentc8356a91d9bf1d1488ec288099f2a55a1019918f (diff)
downloadgarage-95eb13eb08d517d328e3c8aeb222440a27211ee9.tar.gz
garage-95eb13eb08d517d328e3c8aeb222440a27211ee9.zip
rpc: refactor result tracking for quorum sets
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs54
1 files changed, 11 insertions, 43 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 7d1ff31c..6508cf5d 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -20,6 +20,7 @@ use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -180,10 +181,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
// a quorum of nodes has answered OK, then the insert has succeeded and
// consistency properties (read-after-write) are preserved.
- // Some code here might feel redundant with RpcHelper::try_write_many_sets,
- // but I think deduplicating could lead to more spaghetti instead of
- // improving the readability, so I'm leaving as is.
-
let quorum = self.data.replication.write_quorum();
// Serialize all entries and compute the write sets for each of them.
@@ -197,7 +194,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
for entry in entries.into_iter() {
let entry = entry.borrow();
let hash = entry.partition_key().hash();
- let write_sets = self.data.replication.write_sets(&hash);
+ let mut write_sets = self.data.replication.write_sets(&hash);
+ for set in write_sets.as_mut().iter_mut() {
+ set.sort();
+ }
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
entries_vec.push((write_sets, e_enc));
}
@@ -212,12 +212,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
.collect::<Vec<&[Uuid]>>();
write_sets.sort();
write_sets.dedup();
- let mut write_set_index = HashMap::<&Uuid, Vec<usize>>::new();
- for (i, write_set) in write_sets.iter().enumerate() {
- for node in write_set.iter() {
- write_set_index.entry(node).or_default().push(i);
- }
- }
+
+ let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum);
// Build a map of all nodes to the entries that must be sent to that node.
let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
@@ -230,7 +226,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
}
// Build futures to actually perform each of the corresponding RPC calls
- let call_count = call_list.len();
let call_futures = call_list.into_iter().map(|(node, entries)| {
let this = self.clone();
let tracer = opentelemetry::global::tracer("garage");
@@ -254,27 +249,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
// Run all requests in parallel thanks to FuturesUnordered, and collect results.
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
- let mut set_counters = vec![(0, 0); write_sets.len()];
- let mut successes = 0;
- let mut errors = vec![];
while let Some((node, resp)) = resps.next().await {
- match resp {
- Ok(_) => {
- successes += 1;
- for set in write_set_index.get(&node).unwrap().iter() {
- set_counters[*set].0 += 1;
- }
- }
- Err(e) => {
- errors.push(e);
- for set in write_set_index.get(&node).unwrap().iter() {
- set_counters[*set].1 += 1;
- }
- }
- }
+ result_tracker.register_result(node, resp.map(|_| ()));
- if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
+ if result_tracker.all_quorums_ok() {
// Success
// Continue all other requests in background
@@ -285,25 +264,14 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
return Ok(());
}
- if set_counters
- .iter()
- .enumerate()
- .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len())
- {
+ if result_tracker.too_many_failures() {
// Too many errors in this set, we know we won't get a quorum
break;
}
}
// Failure, could not get quorum within at least one set
- let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::Quorum(
- quorum,
- Some(write_sets.len()),
- successes,
- call_count,
- errors,
- ))
+ Err(result_tracker.quorum_error())
}
pub async fn get(