diff options
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 89 |
1 files changed, 62 insertions, 27 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index e7fbbcb0..12218d97 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -69,35 +68,11 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); @@ -328,6 +303,66 @@ where } } +struct GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc<TableGc<F, R>>, + wait_delay: Duration, +} + +impl<F, R> GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc<TableGc<F, R>>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl<F, R> Worker for GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} GC", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.gc.data.gc_todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerState::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerState::Idle) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(self.wait_delay).await; + WorkerState::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled |