diff options
-rw-r--r-- | src/garage/admin.rs | 23 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 4 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 21 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 24 | ||||
-rw-r--r-- | src/table/gc.rs | 11 | ||||
-rw-r--r-- | src/table/merkle.rs | 11 | ||||
-rw-r--r-- | src/table/sync.rs | 11 | ||||
-rw-r--r-- | src/util/background/mod.rs | 25 | ||||
-rw-r--r-- | src/util/background/worker.rs | 27 |
9 files changed, 151 insertions, 6 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 8a984cfb..f307bea1 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_util::background::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -36,6 +37,7 @@ pub enum AdminRpc { LaunchRepair(RepairOpt), Migrate(MigrateOpt), Stats(StatsOpt), + Worker(WorkerOpt), // Replies Ok(String), @@ -47,6 +49,7 @@ pub enum AdminRpc { }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap<Uuid, Bucket>), + WorkerList(HashMap<usize, garage_util::background::WorkerInfo>), } impl Rpc for AdminRpc { @@ -822,6 +825,25 @@ impl AdminRpcHandler { Ok(()) } + + // ---- + + async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> { + match opt.cmd { + WorkerCmd::List { busy } => { + let workers = self.garage.background.get_worker_info(); + let workers = if busy { + workers + .into_iter() + .filter(|(_, w)| w.status == WorkerStatus::Busy) + .collect() + } else { + workers + }; + Ok(AdminRpc::WorkerList(workers)) + } + } + } } #[async_trait] @@ -837,6 +859,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, + AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 3a0bd956..38a58b76 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -39,6 +39,7 @@ pub async fn cli_command_dispatch( cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await } Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, + Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, _ => unreachable!(), } } @@ -182,6 +183,9 @@ pub async fn cmd_admin( AdminRpc::KeyInfo(key, rb) => { print_key_info(&key, &rb); } + AdminRpc::WorkerList(wi) => { + print_worker_info(wi); + } r => { error!("Unexpected response: {:?}", r); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4f2efe19..f6b2d197 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -45,6 +45,10 @@ pub enum Command { /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), + + /// Manage background workers + #[structopt(name = "worker")] + Worker(WorkerOpt), } #[derive(StructOpt, Debug)] @@ -460,3 +464,20 @@ pub struct StatsOpt { #[structopt(short = "d", long = "detailed")] pub detailed: bool, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct WorkerOpt { + #[structopt(subcommand)] + pub cmd: WorkerCmd, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum WorkerCmd { + /// List all workers on Garage node + #[structopt(name = "list")] + List { + /// Show only busy workers + #[structopt(short = "b", long = "busy")] + busy: bool, + }, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 329e8a3e..81361864 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use garage_util::background::*; use garage_util::crdt::*; use garage_util::data::Uuid; use garage_util::error::*; @@ -235,3 +236,26 @@ pub fn find_matching_node( Ok(candidates[0]) } } + +pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) { + let mut wi = wi.into_iter().collect::<Vec<_>>(); + wi.sort_by_key(|(tid, info)| { + ( + match info.status { + WorkerStatus::Busy => 0, + WorkerStatus::Idle => 1, + WorkerStatus::Done => 2, + }, + *tid, + ) + }); + + let mut table = vec![]; + for (tid, info) in wi.iter() { + table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name)); + if let Some(i) = &info.info { + table.push(format!("\t\t{}", i)); + } + } + format_table(table); +} diff --git a/src/table/gc.rs b/src/table/gc.rs index 36124c2f..d088a11c 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -332,7 +332,16 @@ where R: TableReplication + 'static, { fn name(&self) -> String { - format!("Table GC: {}", F::TABLE_NAME) + format!("{} GC", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.gc.data.gc_todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } } async fn work( diff --git a/src/table/merkle.rs b/src/table/merkle.rs index d4d2717f..06d131cb 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -310,7 +310,16 @@ where R: TableReplication + 'static, { fn name(&self) -> String { - format!("Merkle tree updater: {}", F::TABLE_NAME) + format!("{} Merkle tree updater", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.0.todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } } async fn work( diff --git a/src/table/sync.rs b/src/table/sync.rs index be081d96..bdf88782 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -574,7 +574,16 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { #[async_trait] impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { fn name(&self) -> String { - format!("Table sync worker for {}", F::TABLE_NAME) + format!("{} sync", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.todo.len(); + if l > 0 { + Some(format!("{} partitions remaining", l)) + } else { + None + } } async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index c06e2225..92090a1a 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -4,9 +4,12 @@ pub mod job_worker; pub mod worker; use core::future::Future; + +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; +use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch, Mutex}; use crate::error::Error; @@ -20,6 +23,14 @@ pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; pub struct BackgroundRunner { send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct WorkerInfo { + pub name: String, + pub info: Option<String>, + pub status: WorkerStatus, } impl BackgroundRunner { @@ -30,8 +41,13 @@ impl BackgroundRunner { ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); - let await_all_done = - tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await }); + let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut worker_processor = + WorkerProcessor::new(worker_out, stop_signal, worker_info.clone()); + + let await_all_done = tokio::spawn(async move { + worker_processor.run().await; + }); let (send_job, queue_out) = mpsc::unbounded_channel(); let queue_out = Arc::new(Mutex::new(queue_out)); @@ -52,10 +68,15 @@ impl BackgroundRunner { let bgrunner = Arc::new(Self { send_job, send_worker, + worker_info, }); (bgrunner, await_all_done) } + pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> { + self.worker_info.lock().unwrap().clone() + } + /// Spawn a task to be run in background pub fn spawn<T>(&self, job: T) where diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index f916692d..1d4eb3ea 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,16 +1,20 @@ +use std::collections::HashMap; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::future::*; use futures::stream::FuturesUnordered; use futures::StreamExt; +use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; use tracing::*; +use crate::background::WorkerInfo; use crate::error::Error; -#[derive(PartialEq, Copy, Clone, Debug)] +#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] pub enum WorkerStatus { Busy, Idle, @@ -21,6 +25,10 @@ pub enum WorkerStatus { pub trait Worker: Send { fn name(&self) -> String; + fn info(&self) -> Option<String> { + None + } + /// Work: do a basic unit of work, if one is available (otherwise, should return /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the /// middle of processing, it will only be interrupted at the last minute when Garage is trying @@ -39,16 +47,19 @@ pub trait Worker: Send { pub(crate) struct WorkerProcessor { stop_signal: watch::Receiver<bool>, worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, } impl WorkerProcessor { pub(crate) fn new( worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, stop_signal: watch::Receiver<bool>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, ) -> Self { Self { stop_signal, worker_chan, + worker_info, } } @@ -87,6 +98,20 @@ impl WorkerProcessor { worker = await_next_worker => { if let Some(mut worker) = worker { trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); + let mut wi = self.worker_info.lock().unwrap(); + match wi.get_mut(&worker.task_id) { + Some(i) => { + i.status = worker.status; + i.info = worker.worker.info(); + } + None => { + wi.insert(worker.task_id, WorkerInfo { + name: worker.worker.name(), + status: worker.status, + info: worker.worker.info(), + }); + } + } // TODO save new worker status somewhere if worker.status == WorkerStatus::Done { info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); |