aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r--src/table/gc.rs139
1 files changed, 88 insertions, 51 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2a05b6ae..83e7eeff 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -8,13 +8,13 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
use tokio::sync::watch;
+use garage_db::counted_tree_hack::CountedTree;
+
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_rpc::system::System;
@@ -25,7 +25,6 @@ use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
-const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
@@ -68,50 +67,24 @@ where
gc.endpoint.set_handler(gc.clone());
- let gc1 = gc.clone();
- system.background.spawn_worker(
- format!("GC loop for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
- );
+ system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.gc_loop_iter().await {
- Ok(None) => {
- // Stuff was done, loop immediately
- }
- Ok(Some(wait_delay)) => {
- // Nothing was done, wait specified delay.
- select! {
- _ = tokio::time::sleep(wait_delay).fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
- }
- }
- }
- }
-
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
- let mut entries = vec![];
- let mut excluded = vec![];
-
// List entries in the GC todo list
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
- for entry_kv in self.data.gc_todo.iter() {
+ let mut candidates = vec![];
+ for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
- let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
+ let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
- if entries.is_empty() && excluded.is_empty() {
+ if candidates.is_empty() {
// If the earliest entry in the todo list shouldn't yet be processed,
// return a duration to wait in the loop
return Ok(Some(Duration::from_millis(
@@ -123,15 +96,23 @@ where
}
}
- let vhash = Hash::try_from(&vhash[..]).unwrap();
+ candidates.push(todo_entry);
+ if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ }
+ let mut entries = vec![];
+ let mut excluded = vec![];
+ for mut todo_entry in candidates {
// Check if the tombstone is still the current value of the entry.
// If not, we don't actually want to GC it, and we will remove it
// from the gc_todo table later (below).
+ let vhash = todo_entry.value_hash;
todo_entry.value = self
.data
.store
- .get(&k[..])?
+ .get(&todo_entry.key[..])?
.filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());
@@ -254,9 +235,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: send tombstones")?;
@@ -277,9 +256,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: remote delete tombstones")?;
@@ -321,6 +298,66 @@ where
}
}
+struct GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ gc: Arc<TableGc<F, R>>,
+ wait_delay: Duration,
+}
+
+impl<F, R> GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn new(gc: Arc<TableGc<F, R>>) -> Self {
+ Self {
+ gc,
+ wait_delay: Duration::from_secs(0),
+ }
+ }
+}
+
+#[async_trait]
+impl<F, R> Worker for GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} GC", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.gc.data.gc_todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.gc.gc_loop_iter().await? {
+ None => Ok(WorkerState::Busy),
+ Some(delay) => {
+ self.wait_delay = delay;
+ Ok(WorkerState::Idle)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ tokio::time::sleep(self.wait_delay).await;
+ WorkerState::Busy
+ }
+}
+
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
@@ -353,17 +390,17 @@ impl GcTodoEntry {
}
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
- pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
+ pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
Self {
- tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()),
- key: sled_k[8..].to_vec(),
- value_hash: Hash::try_from(sled_v).unwrap(),
+ tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
+ key: db_k[8..].to_vec(),
+ value_hash: Hash::try_from(db_v).unwrap(),
value: None,
}
}
/// Saves the GcTodoEntry in the gc_todo tree
- pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
+ pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@@ -373,9 +410,9 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
- pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
- let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
- &self.todo_table_key()[..],
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
+ gc_todo_tree.compare_and_swap::<_, _, &[u8]>(
+ &self.todo_table_key(),
Some(self.value_hash),
None,
)?;