From 3119ea59b08e62ce14cddeb4809a397785b662bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 21 Jun 2022 13:50:55 +0200 Subject: New worker semantics applied to garage_table --- src/table/gc.rs | 80 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 27 deletions(-) (limited to 'src/table/gc.rs') diff --git a/src/table/gc.rs b/src/table/gc.rs index e7fbbcb0..36124c2f 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -69,35 +68,11 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result, Error> { let now = now_msec(); @@ -328,6 +303,57 @@ where } } +struct GcWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc>, + wait_delay: Duration, +} + +impl GcWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl Worker for GcWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Table GC: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerStatus::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerStatus::Idle) + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { + tokio::time::sleep(self.wait_delay).await; + WorkerStatus::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled -- cgit v1.2.3