aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/background/mod.rs25
-rw-r--r--src/util/background/worker.rs27
2 files changed, 49 insertions, 3 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index c06e2225..92090a1a 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -4,9 +4,12 @@ pub mod job_worker;
pub mod worker;
use core::future::Future;
+
+use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
+use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
@@ -20,6 +23,14 @@ pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner {
send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub struct WorkerInfo {
+ pub name: String,
+ pub info: Option<String>,
+ pub status: WorkerStatus,
}
impl BackgroundRunner {
@@ -30,8 +41,13 @@ impl BackgroundRunner {
) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
- let await_all_done =
- tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await });
+ let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let mut worker_processor =
+ WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
+
+ let await_all_done = tokio::spawn(async move {
+ worker_processor.run().await;
+ });
let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
@@ -52,10 +68,15 @@ impl BackgroundRunner {
let bgrunner = Arc::new(Self {
send_job,
send_worker,
+ worker_info,
});
(bgrunner, await_all_done)
}
+ pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
+ self.worker_info.lock().unwrap().clone()
+ }
+
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index f916692d..1d4eb3ea 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,16 +1,20 @@
+use std::collections::HashMap;
+use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
+use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
use tracing::*;
+use crate::background::WorkerInfo;
use crate::error::Error;
-#[derive(PartialEq, Copy, Clone, Debug)]
+#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
pub enum WorkerStatus {
Busy,
Idle,
@@ -21,6 +25,10 @@ pub enum WorkerStatus {
pub trait Worker: Send {
fn name(&self) -> String;
+ fn info(&self) -> Option<String> {
+ None
+ }
+
/// Work: do a basic unit of work, if one is available (otherwise, should return
/// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
/// middle of processing, it will only be interrupted at the last minute when Garage is trying
@@ -39,16 +47,19 @@ pub trait Worker: Send {
pub(crate) struct WorkerProcessor {
stop_signal: watch::Receiver<bool>,
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
impl WorkerProcessor {
pub(crate) fn new(
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
stop_signal: watch::Receiver<bool>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
) -> Self {
Self {
stop_signal,
worker_chan,
+ worker_info,
}
}
@@ -87,6 +98,20 @@ impl WorkerProcessor {
worker = await_next_worker => {
if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
+ let mut wi = self.worker_info.lock().unwrap();
+ match wi.get_mut(&worker.task_id) {
+ Some(i) => {
+ i.status = worker.status;
+ i.info = worker.worker.info();
+ }
+ None => {
+ wi.insert(worker.task_id, WorkerInfo {
+ name: worker.worker.name(),
+ status: worker.status,
+ info: worker.worker.info(),
+ });
+ }
+ }
// TODO save new worker status somewhere
if worker.status == WorkerStatus::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);