diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-21 13:50:55 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-21 13:50:55 +0200 |
commit | 3119ea59b08e62ce14cddeb4809a397785b662bb (patch) | |
tree | 08e54b210ba73988ed1ac56db7045f39f3791bdb /src/table/merkle.rs | |
parent | e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (diff) | |
download | garage-3119ea59b08e62ce14cddeb4809a397785b662bb.tar.gz garage-3119ea59b08e62ce14cddeb4809a397785b662bb.zip |
New worker semantics applied to garage_table
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 67 |
1 files changed, 34 insertions, 33 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7685b193..d4d2717f 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -78,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.updater_loop_iter() { - Ok(true) => (), - Ok(false) => { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - } - - fn updater_loop_iter(&self) -> Result<bool, Error> { + fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> { if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; - Ok(true) + Ok(WorkerStatus::Busy) } else { - Ok(false) + Ok(WorkerStatus::Idle) } } @@ -325,6 +298,34 @@ where } } +struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl<F, R> Worker for MerkleWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Merkle tree updater: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + self.0.updater_loop_iter() + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerStatus::Busy + } +} + impl MerkleNodeKey { fn encode(&self) -> Vec<u8> { let mut ret = Vec::with_capacity(2 + self.prefix.len()); |