diff options
author | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
commit | 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 (patch) | |
tree | 256ee885e93cf1c41dc2869fe13a648aa91ab9b5 /src/table/merkle.rs | |
parent | aab34bfe5415e9584432bf32e29a151dc5af9ebd (diff) | |
download | garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.tar.gz garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.zip |
Background task manager (#332)
- [x] New background worker trait
- [x] Adapt all current workers to use new API
- [x] Command to list currently running workers, and whether they are active, idle, or dead
- [x] Error reporting
- Optimizations
- [x] Merkle updater: several items per iteration
- [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on
- scrub:
- [x] have only one worker with a channel to start/pause/cancel
- [x] automatic scrub
- [x] ability to view and change tranquility from CLI
- [x] persistence of a few info
- [ ] Testing
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 87 |
1 files changed, 54 insertions, 33 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7685b193..a5c29723 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -78,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.updater_loop_iter() { - Ok(true) => (), - Ok(false) => { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - } - - fn updater_loop_iter(&self) -> Result<bool, Error> { + fn updater_loop_iter(&self) -> Result<WorkerState, Error> { if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; - Ok(true) + Ok(WorkerState::Busy) } else { - Ok(false) + Ok(WorkerState::Idle) } } @@ -325,6 +298,54 @@ where } } +struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl<F, R> Worker for MerkleWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} Merkle tree updater", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.0.todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let updater = self.0.clone(); + tokio::task::spawn_blocking(move || { + for _i in 0..100 { + let s = updater.updater_loop_iter(); + if !matches!(s, Ok(WorkerState::Busy)) { + return s; + } + } + Ok(WorkerState::Busy) + }) + .await + .unwrap() + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerState::Busy + } +} + impl MerkleNodeKey { fn encode(&self) -> Vec<u8> { let mut ret = Vec::with_capacity(2 + self.prefix.len()); |