diff options
Diffstat (limited to 'src/util/background/worker.rs')
-rw-r--r-- | src/util/background/worker.rs | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index f916692d..1d4eb3ea 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,16 +1,20 @@ +use std::collections::HashMap; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::future::*; use futures::stream::FuturesUnordered; use futures::StreamExt; +use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; use tracing::*; +use crate::background::WorkerInfo; use crate::error::Error; -#[derive(PartialEq, Copy, Clone, Debug)] +#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] pub enum WorkerStatus { Busy, Idle, @@ -21,6 +25,10 @@ pub enum WorkerStatus { pub trait Worker: Send { fn name(&self) -> String; + fn info(&self) -> Option<String> { + None + } + /// Work: do a basic unit of work, if one is available (otherwise, should return /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the /// middle of processing, it will only be interrupted at the last minute when Garage is trying @@ -39,16 +47,19 @@ pub trait Worker: Send { pub(crate) struct WorkerProcessor { stop_signal: watch::Receiver<bool>, worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, } impl WorkerProcessor { pub(crate) fn new( worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, stop_signal: watch::Receiver<bool>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, ) -> Self { Self { stop_signal, worker_chan, + worker_info, } } @@ -87,6 +98,20 @@ impl WorkerProcessor { worker = await_next_worker => { if let Some(mut worker) = worker { trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); + let mut wi = self.worker_info.lock().unwrap(); + match wi.get_mut(&worker.task_id) { + Some(i) => { + i.status = worker.status; + i.info = worker.worker.info(); + } + None => { + wi.insert(worker.task_id, WorkerInfo { + name: worker.worker.name(), + status: worker.status, + info: worker.worker.info(), + }); + } + } // TODO save new worker status somewhere if worker.status == WorkerStatus::Done { info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); |