aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r--src/table/gc.rs69
1 files changed, 49 insertions, 20 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index afc8a473..594044b8 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -1,6 +1,6 @@
+use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
-use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -17,9 +17,9 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::data::*;
-use crate::table::*;
-use crate::schema::*;
use crate::replication::*;
+use crate::schema::*;
+use crate::table::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
@@ -99,7 +99,10 @@ where
let vhash = Hash::try_from(&vhash[..]).unwrap();
- let v_opt = self.data.store.get(&k[..])?
+ let v_opt = self
+ .data
+ .store
+ .get(&k[..])?
.filter(|v| blake2sum(&v[..]) == vhash);
if let Some(v) = v_opt {
@@ -113,7 +116,10 @@ where
}
for (k, vhash) in excluded {
- let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ let _ = self
+ .data
+ .gc_todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
}
if entries.len() == 0 {
@@ -136,18 +142,29 @@ where
partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
}
- let resps = join_all(partitions.into_iter()
- .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await;
+ let resps = join_all(
+ partitions
+ .into_iter()
+ .map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
+ )
+ .await;
for resp in resps {
if let Err(e) = resp {
- warn!("({}) Unable to send and delete for GC: {}", self.data.name, e);
+ warn!(
+ "({}) Unable to send and delete for GC: {}",
+ self.data.name, e
+ );
}
}
Ok(true)
}
- async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> {
+ async fn try_send_and_delete(
+ &self,
+ nodes: Vec<UUID>,
+ items: Vec<(ByteBuf, Hash, ByteBuf)>,
+ ) -> Result<(), Error> {
let n_items = items.len();
let mut updates = vec![];
@@ -157,21 +174,33 @@ where
deletes.push((k, vhash));
}
- self.rpc_client.try_call_many(
- &nodes[..],
- GcRPC::Update(updates),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::Update(updates),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ info!(
+ "({}) GC: {} items successfully pushed, will try to delete.",
+ self.data.name, n_items
+ );
- info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items);
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::DeleteIfEqualHash(deletes.clone()),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
- self.rpc_client.try_call_many(
- &nodes[..],
- GcRPC::DeleteIfEqualHash(deletes.clone()),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
-
for (k, vhash) in deletes {
self.data.delete_if_equal_hash(&k[..], vhash)?;
- let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ let _ = self
+ .data
+ .gc_todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
}
Ok(())