diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-21 13:50:55 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-21 13:50:55 +0200 |
commit | 3119ea59b08e62ce14cddeb4809a397785b662bb (patch) | |
tree | 08e54b210ba73988ed1ac56db7045f39f3791bdb /src/table/gc.rs | |
parent | e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (diff) | |
download | garage-3119ea59b08e62ce14cddeb4809a397785b662bb.tar.gz garage-3119ea59b08e62ce14cddeb4809a397785b662bb.zip |
New worker semantics applied to garage_table
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 80 |
1 files changed, 53 insertions, 27 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index e7fbbcb0..36124c2f 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -69,35 +68,11 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); @@ -328,6 +303,57 @@ where } } +struct GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc<TableGc<F, R>>, + wait_delay: Duration, +} + +impl<F, R> GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc<TableGc<F, R>>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl<F, R> Worker for GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Table GC: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerStatus::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerStatus::Idle) + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + tokio::time::sleep(self.wait_delay).await; + WorkerStatus::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled |