aboutsummaryrefslogtreecommitdiff
path: root/src/util/background/job_worker.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-07-08 13:30:26 +0200
committerAlex <alex@adnab.me>2022-07-08 13:30:26 +0200
commit4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 (patch)
tree256ee885e93cf1c41dc2869fe13a648aa91ab9b5 /src/util/background/job_worker.rs
parentaab34bfe5415e9584432bf32e29a151dc5af9ebd (diff)
downloadgarage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.tar.gz
garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.zip
Background task manager (#332)
- [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/util/background/job_worker.rs')
-rw-r--r--src/util/background/job_worker.rs48
1 files changed, 48 insertions, 0 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..2568ea11
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,48 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerState::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerState::Busy;
+ }
+ None => return WorkerState::Done,
+ }
+ }
+ }
+}