diff options
author | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
---|---|---|
committer | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
commit | 829f815a897b04986559910bbcbf53625adcdf20 (patch) | |
tree | 6db3c27cff2aded754a641d1f2b05c83be701267 /src/util/background/job_worker.rs | |
parent | 99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff) | |
parent | a096ced35562bd0a8877a1ee2f755be1edafe343 (diff) | |
download | garage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz garage-829f815a897b04986559910bbcbf53625adcdf20.zip |
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/util/background/job_worker.rs')
-rw-r--r-- | src/util/background/job_worker.rs | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs new file mode 100644 index 00000000..2568ea11 --- /dev/null +++ b/src/util/background/job_worker.rs @@ -0,0 +1,48 @@ +//! Job worker: a generic worker that just processes incoming +//! jobs one by one + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::{mpsc, Mutex}; + +use crate::background::worker::*; +use crate::background::*; + +pub(crate) struct JobWorker { + pub(crate) index: usize, + pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>, + pub(crate) next_job: Option<Job>, +} + +#[async_trait] +impl Worker for JobWorker { + fn name(&self) -> String { + format!("Job worker #{}", self.index) + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + match self.next_job.take() { + None => return Ok(WorkerState::Idle), + Some(job) => { + job.await?; + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + loop { + match self.job_chan.lock().await.recv().await { + Some((job, cancellable)) => { + if cancellable && *must_exit.borrow() { + continue; + } + self.next_job = Some(job); + return WorkerState::Busy; + } + None => return WorkerState::Done, + } + } + } +} |