aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-12 19:57:37 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-12 19:57:37 +0100
commitc475471e7a8e7544f2be490898f4249cf27a17e9 (patch)
tree41253eaaba21b7d38340f2eab0ada17eaae88e8a /src/table/sync.rs
parentf4aad8fe6e36fe05e05c88c322b99fc87d896578 (diff)
downloadgarage-c475471e7a8e7544f2be490898f4249cf27a17e9.tar.gz
garage-c475471e7a8e7544f2be490898f4249cf27a17e9.zip
Implement table gc, currently for block_ref and version only
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs18
1 files changed, 6 insertions, 12 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4be8cd10..aae65852 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::future::join_all;
use futures::{pin_mut, select};
use futures_util::future::*;
use futures_util::stream::*;
@@ -347,16 +346,11 @@ where
nodes: &[UUID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(SyncRPC::Items(values));
-
- for res in join_all(nodes.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
- }))
- .await
- {
- res?;
- }
+
+ self.rpc_client.try_call_many(
+ &nodes[..],
+ SyncRPC::Items(values),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?;
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
@@ -577,7 +571,7 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message {
SyncRPC::RootCkHash(range, h) => {
let root_ck = self.get_root_ck(*range)?;