aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-21 13:50:55 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-21 13:50:55 +0200
commit3119ea59b08e62ce14cddeb4809a397785b662bb (patch)
tree08e54b210ba73988ed1ac56db7045f39f3791bdb /src/table/gc.rs
parente12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (diff)
downloadgarage-3119ea59b08e62ce14cddeb4809a397785b662bb.tar.gz
garage-3119ea59b08e62ce14cddeb4809a397785b662bb.zip
New worker semantics applied to garage_table
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r--src/table/gc.rs80
1 files changed, 53 insertions, 27 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index e7fbbcb0..36124c2f 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
use tokio::sync::watch;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -69,35 +68,11 @@ where
gc.endpoint.set_handler(gc.clone());
- let gc1 = gc.clone();
- system.background.spawn_worker(
- format!("GC loop for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
- );
+ system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.gc_loop_iter().await {
- Ok(None) => {
- // Stuff was done, loop immediately
- }
- Ok(Some(wait_delay)) => {
- // Nothing was done, wait specified delay.
- select! {
- _ = tokio::time::sleep(wait_delay).fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
- }
- }
- }
- }
-
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@@ -328,6 +303,57 @@ where
}
}
+struct GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ gc: Arc<TableGc<F, R>>,
+ wait_delay: Duration,
+}
+
+impl<F, R> GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn new(gc: Arc<TableGc<F, R>>) -> Self {
+ Self {
+ gc,
+ wait_delay: Duration::from_secs(0),
+ }
+ }
+}
+
+#[async_trait]
+impl<F, R> Worker for GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("Table GC: {}", F::TABLE_NAME)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.gc.gc_loop_iter().await? {
+ None => Ok(WorkerStatus::Busy),
+ Some(delay) => {
+ self.wait_delay = delay;
+ Ok(WorkerStatus::Idle)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ tokio::time::sleep(self.wait_delay).await;
+ WorkerStatus::Busy
+ }
+}
+
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled