diff options
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs new file mode 100644 index 00000000..afc8a473 --- /dev/null +++ b/src/table/gc.rs @@ -0,0 +1,212 @@ +use std::sync::Arc; +use std::time::Duration; +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; + +use futures::future::join_all; +use futures::select; +use futures_util::future::*; +use tokio::sync::watch; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use crate::data::*; +use crate::table::*; +use crate::schema::*; +use crate::replication::*; + +const TABLE_GC_BATCH_SIZE: usize = 1024; +const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +pub struct TableGC<F: TableSchema, R: TableReplication> { + data: Arc<TableData<F>>, + aux: Arc<TableAux<R>>, + + rpc_client: Arc<RpcClient<GcRPC>>, +} + +#[derive(Serialize, Deserialize)] +enum GcRPC { + Update(Vec<ByteBuf>), + DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), + Ok, +} + +impl RpcMessage for GcRPC {} + +impl<F, R> TableGC<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch( + data: Arc<TableData<F>>, + aux: Arc<TableAux<R>>, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { + let rpc_path = format!("table_{}/gc", data.name); + let rpc_client = aux.system.rpc_client::<GcRPC>(&rpc_path); + + let gc = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), + rpc_client, + }); + + gc.register_handler(rpc_server, rpc_path); + + let gc1 = gc.clone(); + aux.system.background.spawn_worker( + format!("GC loop for {}", data.name), + move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), + ); + + gc + } + + async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + while !*must_exit.borrow() { + match self.gc_loop_iter().await { + Ok(true) => { + // Stuff was done, loop imediately + } + Ok(false) => { + select! { + _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + Err(e) => { + warn!("({}) Error doing GC: {}", self.data.name, e); + } + } + } + Ok(()) + } + + async fn gc_loop_iter(&self) -> Result<bool, Error> { + let mut entries = vec![]; + let mut excluded = vec![]; + + for item in self.data.gc_todo.iter() { + let (k, vhash) = item?; + + let vhash = Hash::try_from(&vhash[..]).unwrap(); + + let v_opt = self.data.store.get(&k[..])? + .filter(|v| blake2sum(&v[..]) == vhash); + + if let Some(v) = v_opt { + entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); + if entries.len() >= TABLE_GC_BATCH_SIZE { + break; + } + } else { + excluded.push((k, vhash)); + } + } + + for (k, vhash) in excluded { + let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; + } + + if entries.len() == 0 { + // Nothing to do in this iteration + return Ok(false); + } + + debug!("({}) GC: doing {} items", self.data.name, entries.len()); + + let mut partitions = HashMap::new(); + for (k, vhash, v) in entries { + let pkh = Hash::try_from(&k[..32]).unwrap(); + let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system); + nodes.retain(|x| *x != self.aux.system.id); + nodes.sort(); + + if !partitions.contains_key(&nodes) { + partitions.insert(nodes.clone(), vec![]); + } + partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); + } + + let resps = join_all(partitions.into_iter() + .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await; + for resp in resps { + if let Err(e) = resp { + warn!("({}) Unable to send and delete for GC: {}", self.data.name, e); + } + } + + Ok(true) + } + + async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> { + let n_items = items.len(); + + let mut updates = vec![]; + let mut deletes = vec![]; + for (k, vhash, v) in items { + updates.push(v); + deletes.push((k, vhash)); + } + + self.rpc_client.try_call_many( + &nodes[..], + GcRPC::Update(updates), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + + info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items); + + self.rpc_client.try_call_many( + &nodes[..], + GcRPC::DeleteIfEqualHash(deletes.clone()), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + + for (k, vhash) in deletes { + self.data.delete_if_equal_hash(&k[..], vhash)?; + let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; + } + + Ok(()) + } + + // ---- RPC HANDLER ---- + + fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + } + + async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> { + match message { + GcRPC::Update(items) => { + self.data.update_many(items)?; + Ok(GcRPC::Ok) + } + GcRPC::DeleteIfEqualHash(items) => { + for (key, vhash) in items.iter() { + self.data.delete_if_equal_hash(&key[..], *vhash)?; + } + Ok(GcRPC::Ok) + } + _ => Err(Error::Message(format!("Unexpected GC RPC"))), + } + } +} |