diff options
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 69 |
1 files changed, 49 insertions, 20 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index afc8a473..594044b8 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -1,6 +1,6 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use std::collections::HashMap; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -17,9 +17,9 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::data::*; -use crate::table::*; -use crate::schema::*; use crate::replication::*; +use crate::schema::*; +use crate::table::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); @@ -99,7 +99,10 @@ where let vhash = Hash::try_from(&vhash[..]).unwrap(); - let v_opt = self.data.store.get(&k[..])? + let v_opt = self + .data + .store + .get(&k[..])? .filter(|v| blake2sum(&v[..]) == vhash); if let Some(v) = v_opt { @@ -113,7 +116,10 @@ where } for (k, vhash) in excluded { - let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; } if entries.len() == 0 { @@ -136,18 +142,29 @@ where partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); } - let resps = join_all(partitions.into_iter() - .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await; + let resps = join_all( + partitions + .into_iter() + .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), + ) + .await; for resp in resps { if let Err(e) = resp { - warn!("({}) Unable to send and delete for GC: {}", self.data.name, e); + warn!( + "({}) Unable to send and delete for GC: {}", + self.data.name, e + ); } } Ok(true) } - async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> { + async fn try_send_and_delete( + &self, + nodes: Vec<UUID>, + items: Vec<(ByteBuf, Hash, ByteBuf)>, + ) -> Result<(), Error> { let n_items = items.len(); let mut updates = vec![]; @@ -157,21 +174,33 @@ where deletes.push((k, vhash)); } - self.rpc_client.try_call_many( - &nodes[..], - GcRPC::Update(updates), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::Update(updates), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; + + info!( + "({}) GC: {} items successfully pushed, will try to delete.", + self.data.name, n_items + ); - info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items); + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::DeleteIfEqualHash(deletes.clone()), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; - self.rpc_client.try_call_many( - &nodes[..], - GcRPC::DeleteIfEqualHash(deletes.clone()), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; - for (k, vhash) in deletes { self.data.delete_if_equal_hash(&k[..], vhash)?; - let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; } Ok(()) |