diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-12 19:57:37 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-12 19:57:37 +0100 |
commit | c475471e7a8e7544f2be490898f4249cf27a17e9 (patch) | |
tree | 41253eaaba21b7d38340f2eab0ada17eaae88e8a /src/table/sync.rs | |
parent | f4aad8fe6e36fe05e05c88c322b99fc87d896578 (diff) | |
download | garage-c475471e7a8e7544f2be490898f4249cf27a17e9.tar.gz garage-c475471e7a8e7544f2be490898f4249cf27a17e9.zip |
Implement table gc, currently for block_ref and version only
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 18 |
1 files changed, 6 insertions, 12 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 4be8cd10..aae65852 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::future::join_all; use futures::{pin_mut, select}; use futures_util::future::*; use futures_util::stream::*; @@ -347,16 +346,11 @@ where nodes: &[UUID], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); - let update_msg = Arc::new(SyncRPC::Items(values)); - - for res in join_all(nodes.iter().map(|to| { - self.rpc_client - .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) - })) - .await - { - res?; - } + + self.rpc_client.try_call_many( + &nodes[..], + SyncRPC::Items(values), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?; // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; @@ -577,7 +571,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { + async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { match message { SyncRPC::RootCkHash(range, h) => { let root_ck = self.get_root_ck(*range)?; |