From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/util/background/job_worker.rs | 48 ----------------------------- src/util/background/mod.rs | 64 ++++++++------------------------------- 2 files changed, 13 insertions(+), 99 deletions(-) delete mode 100644 src/util/background/job_worker.rs (limited to 'src/util') diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs deleted file mode 100644 index 2568ea11..00000000 --- a/src/util/background/job_worker.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! Job worker: a generic worker that just processes incoming -//! jobs one by one - -use std::sync::Arc; - -use async_trait::async_trait; -use tokio::sync::{mpsc, Mutex}; - -use crate::background::worker::*; -use crate::background::*; - -pub(crate) struct JobWorker { - pub(crate) index: usize, - pub(crate) job_chan: Arc>>, - pub(crate) next_job: Option, -} - -#[async_trait] -impl Worker for JobWorker { - fn name(&self) -> String { - format!("Job worker #{}", self.index) - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - match self.next_job.take() { - None => return Ok(WorkerState::Idle), - Some(job) => { - job.await?; - Ok(WorkerState::Busy) - } - } - } - - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - loop { - match self.job_chan.lock().await.recv().await { - Some((job, cancellable)) => { - if cancellable && *must_exit.borrow() { - continue; - } - self.next_job = Some(job); - return WorkerState::Busy; - } - None => return WorkerState::Done, - } - } - } -} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index fd9258b8..0bb4fb67 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,27 +1,23 @@ //! Job runner for futures and async functions -pub mod job_worker; pub mod worker; use core::future::Future; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch, Mutex}; +use tokio::sync::{mpsc, watch}; use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; pub(crate) type JobOutput = Result<(), Error>; -pub(crate) type Job = Pin + Send>>; /// Job runner for futures and async functions pub struct BackgroundRunner { - send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender>, worker_info: Arc>>, } @@ -49,10 +45,7 @@ pub struct WorkerStatus { impl BackgroundRunner { /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver, - ) -> (Arc, tokio::task::JoinHandle<()>) { + pub fn new(stop_signal: watch::Receiver) -> (Arc, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::>(); let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -63,24 +56,7 @@ impl BackgroundRunner { worker_processor.run().await; }); - let (send_job, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - - send_worker - .send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })) - .ok() - .unwrap(); - } - let bgrunner = Arc::new(Self { - send_job, send_worker, worker_info, }); @@ -91,31 +67,6 @@ impl BackgroundRunner { self.worker_info.lock().unwrap().clone() } - /// Spawn a task to be run in background - pub fn spawn(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, false)) - .ok() - .expect("Could not put job in queue"); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, true)) - .ok() - .expect("Could not put job in queue"); - } - pub fn spawn_worker(&self, worker: W) where W: Worker + 'static, @@ -126,3 +77,14 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } + +pub fn spawn(job: T) +where + T: Future + Send + 'static, +{ + tokio::spawn(async move { + if let Err(e) = job.await { + error!("{}", e); + } + }); +} -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/util/background/worker.rs | 73 ++++++++++++++----------------------------- 1 file changed, 23 insertions(+), 50 deletions(-) (limited to 'src/util') diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 7e9da7f8..8165e2cb 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use async_trait::async_trait; use futures::future::*; @@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus}; use crate::error::Error; use crate::time::now_msec; +// All workers that haven't exited for this time after an exit signal was recieved +// will be interrupted in the middle of whatever they are doing. +const EXIT_DEADLINE: Duration = Duration::from_secs(8); + #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] pub enum WorkerState { Busy, @@ -50,10 +54,8 @@ pub trait Worker: Send { async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result; /// Wait for work: await for some task to become available. This future can be interrupted in - /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we - /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows - /// it to check if we are exiting. - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState; + /// the middle for any reason, for example if an interrupt signal was recieved. + async fn wait_for_work(&mut self) -> WorkerState; } pub(crate) struct WorkerProcessor { @@ -93,11 +95,9 @@ impl WorkerProcessor { let task_id = next_task_id; next_task_id += 1; let stop_signal = self.stop_signal.clone(); - let stop_signal_worker = self.stop_signal.clone(); let mut worker = WorkerHandler { task_id, stop_signal, - stop_signal_worker, worker: new_worker, state: WorkerState::Busy, errors: 0, @@ -153,26 +153,14 @@ impl WorkerProcessor { } // We are exiting, drain everything - let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { - while let Some(mut worker) = workers.next().await { - if worker.state == WorkerState::Done { - info!( - "Worker {} (TID {}) exited", - worker.worker.name(), - worker.task_id - ); - } else if Instant::now() > drain_half_time { - warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); - } else { - workers.push( - async move { - worker.step().await; - worker - } - .boxed(), - ); - } + while let Some(worker) = workers.next().await { + info!( + "Worker {} (TID {}) exited (last state: {:?})", + worker.worker.name(), + worker.task_id, + worker.state + ); } }; @@ -180,7 +168,7 @@ impl WorkerProcessor { _ = drain_everything => { info!("All workers exited peacefully \\o/"); } - _ = tokio::time::sleep(Duration::from_secs(9)) => { + _ = tokio::time::sleep(EXIT_DEADLINE) => { error!("Some workers could not exit in time, we are cancelling some things in the middle"); } } @@ -190,7 +178,6 @@ impl WorkerProcessor { struct WorkerHandler { task_id: usize, stop_signal: watch::Receiver, - stop_signal_worker: watch::Receiver, worker: Box, state: WorkerState, errors: usize, @@ -225,33 +212,19 @@ impl WorkerHandler { }, WorkerState::Throttled(delay) => { // Sleep for given delay and go back to busy state - if !*self.stop_signal.borrow() { - select! { - _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (), - _ = self.stop_signal.changed() => (), + select! { + _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => { + self.state = WorkerState::Busy; } + _ = self.stop_signal.changed() => (), } - self.state = WorkerState::Busy; } WorkerState::Idle => { - if *self.stop_signal.borrow() { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = tokio::time::sleep(Duration::from_secs(1)) => { - // stay in Idle state - } - } - } else { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = self.stop_signal.changed() => { - // stay in Idle state - } + select! { + new_st = self.worker.wait_for_work() => { + self.state = new_st; } + _ = self.stop_signal.changed() => (), } } WorkerState::Done => unreachable!(), -- cgit v1.2.3 From 510b62010871e9133a98f625b85f07a7e50f6f23 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:08:05 +0100 Subject: Get rid of background::spawn --- src/util/background/mod.rs | 16 ---------------- 1 file changed, 16 deletions(-) (limited to 'src/util') diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 0bb4fb67..41b48e93 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -2,20 +2,15 @@ pub mod worker; -use core::future::Future; - use std::collections::HashMap; use std::sync::Arc; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; -use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; -pub(crate) type JobOutput = Result<(), Error>; - /// Job runner for futures and async functions pub struct BackgroundRunner { send_worker: mpsc::UnboundedSender>, @@ -77,14 +72,3 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } - -pub fn spawn(job: T) -where - T: Future + Send + 'static, -{ - tokio::spawn(async move { - if let Err(e) = job.await { - error!("{}", e); - } - }); -} -- cgit v1.2.3