aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-12 19:57:37 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-12 19:57:37 +0100
commitc475471e7a8e7544f2be490898f4249cf27a17e9 (patch)
tree41253eaaba21b7d38340f2eab0ada17eaae88e8a /src/table/gc.rs
parentf4aad8fe6e36fe05e05c88c322b99fc87d896578 (diff)
downloadgarage-c475471e7a8e7544f2be490898f4249cf27a17e9.tar.gz
garage-c475471e7a8e7544f2be490898f4249cf27a17e9.zip
Implement table gc, currently for block_ref and version only
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r--src/table/gc.rs212
1 files changed, 212 insertions, 0 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
new file mode 100644
index 00000000..afc8a473
--- /dev/null
+++ b/src/table/gc.rs
@@ -0,0 +1,212 @@
+use std::sync::Arc;
+use std::time::Duration;
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+
+use futures::future::join_all;
+use futures::select;
+use futures_util::future::*;
+use tokio::sync::watch;
+
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use crate::data::*;
+use crate::table::*;
+use crate::schema::*;
+use crate::replication::*;
+
+const TABLE_GC_BATCH_SIZE: usize = 1024;
+const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub struct TableGC<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<R>>,
+
+ rpc_client: Arc<RpcClient<GcRPC>>,
+}
+
+#[derive(Serialize, Deserialize)]
+enum GcRPC {
+ Update(Vec<ByteBuf>),
+ DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
+ Ok,
+}
+
+impl RpcMessage for GcRPC {}
+
+impl<F, R> TableGC<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/gc", data.name);
+ let rpc_client = aux.system.rpc_client::<GcRPC>(&rpc_path);
+
+ let gc = Arc::new(Self {
+ data: data.clone(),
+ aux: aux.clone(),
+ rpc_client,
+ });
+
+ gc.register_handler(rpc_server, rpc_path);
+
+ let gc1 = gc.clone();
+ aux.system.background.spawn_worker(
+ format!("GC loop for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
+ );
+
+ gc
+ }
+
+ async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ match self.gc_loop_iter().await {
+ Ok(true) => {
+ // Stuff was done, loop imediately
+ }
+ Ok(false) => {
+ select! {
+ _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
+ }
+ Err(e) => {
+ warn!("({}) Error doing GC: {}", self.data.name, e);
+ }
+ }
+ }
+ Ok(())
+ }
+
+ async fn gc_loop_iter(&self) -> Result<bool, Error> {
+ let mut entries = vec![];
+ let mut excluded = vec![];
+
+ for item in self.data.gc_todo.iter() {
+ let (k, vhash) = item?;
+
+ let vhash = Hash::try_from(&vhash[..]).unwrap();
+
+ let v_opt = self.data.store.get(&k[..])?
+ .filter(|v| blake2sum(&v[..]) == vhash);
+
+ if let Some(v) = v_opt {
+ entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
+ if entries.len() >= TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ } else {
+ excluded.push((k, vhash));
+ }
+ }
+
+ for (k, vhash) in excluded {
+ let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ }
+
+ if entries.len() == 0 {
+ // Nothing to do in this iteration
+ return Ok(false);
+ }
+
+ debug!("({}) GC: doing {} items", self.data.name, entries.len());
+
+ let mut partitions = HashMap::new();
+ for (k, vhash, v) in entries {
+ let pkh = Hash::try_from(&k[..32]).unwrap();
+ let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system);
+ nodes.retain(|x| *x != self.aux.system.id);
+ nodes.sort();
+
+ if !partitions.contains_key(&nodes) {
+ partitions.insert(nodes.clone(), vec![]);
+ }
+ partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
+ }
+
+ let resps = join_all(partitions.into_iter()
+ .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await;
+ for resp in resps {
+ if let Err(e) = resp {
+ warn!("({}) Unable to send and delete for GC: {}", self.data.name, e);
+ }
+ }
+
+ Ok(true)
+ }
+
+ async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> {
+ let n_items = items.len();
+
+ let mut updates = vec![];
+ let mut deletes = vec![];
+ for (k, vhash, v) in items {
+ updates.push(v);
+ deletes.push((k, vhash));
+ }
+
+ self.rpc_client.try_call_many(
+ &nodes[..],
+ GcRPC::Update(updates),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
+
+ info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items);
+
+ self.rpc_client.try_call_many(
+ &nodes[..],
+ GcRPC::DeleteIfEqualHash(deletes.clone()),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
+
+ for (k, vhash) in deletes {
+ self.data.delete_if_equal_hash(&k[..], vhash)?;
+ let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ }
+
+ Ok(())
+ }
+
+ // ---- RPC HANDLER ----
+
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.aux.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
+ async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
+ match message {
+ GcRPC::Update(items) => {
+ self.data.update_many(items)?;
+ Ok(GcRPC::Ok)
+ }
+ GcRPC::DeleteIfEqualHash(items) => {
+ for (key, vhash) in items.iter() {
+ self.data.delete_if_equal_hash(&key[..], *vhash)?;
+ }
+ Ok(GcRPC::Ok)
+ }
+ _ => Err(Error::Message(format!("Unexpected GC RPC"))),
+ }
+ }
+}