From de9d6cddf709e686ada3d1e71de7b31d7704b8b5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Dec 2022 17:16:49 +0100 Subject: Prettier worker list table; remove useless CLI log messages --- src/util/background/mod.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'src/util/background/mod.rs') diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 619f5068..fd9258b8 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -29,13 +29,24 @@ pub struct BackgroundRunner { #[derive(Clone, Serialize, Deserialize, Debug)] pub struct WorkerInfo { pub name: String, - pub info: Option, + pub status: WorkerStatus, pub state: WorkerState, pub errors: usize, pub consecutive_errors: usize, pub last_error: Option<(String, u64)>, } +/// WorkerStatus is a struct returned by the worker with a bunch of canonical +/// fields to indicate their status to CLI users. All fields are optional. +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct WorkerStatus { + pub tranquility: Option, + pub progress: Option, + pub queue_length: Option, + pub persistent_errors: Option, + pub freeform: Vec, +} + impl BackgroundRunner { /// Create a new BackgroundRunner pub fn new( -- cgit v1.2.3 From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/util/background/mod.rs | 64 ++++++++++------------------------------------ 1 file changed, 13 insertions(+), 51 deletions(-) (limited to 'src/util/background/mod.rs') diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index fd9258b8..0bb4fb67 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,27 +1,23 @@ //! Job runner for futures and async functions -pub mod job_worker; pub mod worker; use core::future::Future; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch, Mutex}; +use tokio::sync::{mpsc, watch}; use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; pub(crate) type JobOutput = Result<(), Error>; -pub(crate) type Job = Pin + Send>>; /// Job runner for futures and async functions pub struct BackgroundRunner { - send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender>, worker_info: Arc>>, } @@ -49,10 +45,7 @@ pub struct WorkerStatus { impl BackgroundRunner { /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver, - ) -> (Arc, tokio::task::JoinHandle<()>) { + pub fn new(stop_signal: watch::Receiver) -> (Arc, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::>(); let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -63,24 +56,7 @@ impl BackgroundRunner { worker_processor.run().await; }); - let (send_job, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - - send_worker - .send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })) - .ok() - .unwrap(); - } - let bgrunner = Arc::new(Self { - send_job, send_worker, worker_info, }); @@ -91,31 +67,6 @@ impl BackgroundRunner { self.worker_info.lock().unwrap().clone() } - /// Spawn a task to be run in background - pub fn spawn(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, false)) - .ok() - .expect("Could not put job in queue"); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, true)) - .ok() - .expect("Could not put job in queue"); - } - pub fn spawn_worker(&self, worker: W) where W: Worker + 'static, @@ -126,3 +77,14 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } + +pub fn spawn(job: T) +where + T: Future + Send + 'static, +{ + tokio::spawn(async move { + if let Err(e) = job.await { + error!("{}", e); + } + }); +} -- cgit v1.2.3 From 510b62010871e9133a98f625b85f07a7e50f6f23 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:08:05 +0100 Subject: Get rid of background::spawn --- src/util/background/mod.rs | 16 ---------------- 1 file changed, 16 deletions(-) (limited to 'src/util/background/mod.rs') diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 0bb4fb67..41b48e93 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -2,20 +2,15 @@ pub mod worker; -use core::future::Future; - use std::collections::HashMap; use std::sync::Arc; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; -use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; -pub(crate) type JobOutput = Result<(), Error>; - /// Job runner for futures and async functions pub struct BackgroundRunner { send_worker: mpsc::UnboundedSender>, @@ -77,14 +72,3 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } - -pub fn spawn(job: T) -where - T: Future + Send + 'static, -{ - tokio::spawn(async move { - if let Err(e) = job.await { - error!("{}", e); - } - }); -} -- cgit v1.2.3