aboutsummaryrefslogtreecommitdiff
path: root/src/util/background/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/background/mod.rs')
-rw-r--r--src/util/background/mod.rs71
1 files changed, 14 insertions, 57 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 619f5068..41b48e93 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,27 +1,18 @@
//! Job runner for futures and async functions
-pub mod job_worker;
pub mod worker;
-use core::future::Future;
-
use std::collections::HashMap;
-use std::pin::Pin;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
-use tokio::sync::{mpsc, watch, Mutex};
+use tokio::sync::{mpsc, watch};
-use crate::error::Error;
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};
-pub(crate) type JobOutput = Result<(), Error>;
-pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
/// Job runner for futures and async functions
pub struct BackgroundRunner {
- send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
@@ -29,19 +20,27 @@ pub struct BackgroundRunner {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
pub name: String,
- pub info: Option<String>,
+ pub status: WorkerStatus,
pub state: WorkerState,
pub errors: usize,
pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>,
}
+/// WorkerStatus is a struct returned by the worker with a bunch of canonical
+/// fields to indicate their status to CLI users. All fields are optional.
+#[derive(Clone, Serialize, Deserialize, Debug, Default)]
+pub struct WorkerStatus {
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
impl BackgroundRunner {
/// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
@@ -52,24 +51,7 @@ impl BackgroundRunner {
worker_processor.run().await;
});
- let (send_job, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
-
- send_worker
- .send(Box::new(job_worker::JobWorker {
- index: i,
- job_chan: queue_out.clone(),
- next_job: None,
- }))
- .ok()
- .unwrap();
- }
-
let bgrunner = Arc::new(Self {
- send_job,
send_worker,
worker_info,
});
@@ -80,31 +62,6 @@ impl BackgroundRunner {
self.worker_info.lock().unwrap().clone()
}
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, false))
- .ok()
- .expect("Could not put job in queue");
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, true))
- .ok()
- .expect("Could not put job in queue");
- }
-
pub fn spawn_worker<W>(&self, worker: W)
where
W: Worker + 'static,