aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <lx@deuxfleurs.fr>2025-01-31 15:39:31 +0100
committerAlex Auvolat <lx@deuxfleurs.fr>2025-02-03 18:54:51 +0100
commit10bbb26b303e7bd58ca3396009a66b70a1673c0f (patch)
tree82b718338b1f6e2693d1345ca872bc2fbb652dcb
parent89ff9f5576f91dc127ba3cc1fae96543e27b9468 (diff)
downloadgarage-10bbb26b303e7bd58ca3396009a66b70a1673c0f.tar.gz
garage-10bbb26b303e7bd58ca3396009a66b70a1673c0f.zip
cli_v2: implement ListWorkers and GetWorkerInfo
-rw-r--r--src/api/admin/api.rs93
-rw-r--r--src/api/admin/error.rs7
-rw-r--r--src/api/admin/macros.rs12
-rw-r--r--src/api/admin/router_v2.rs3
-rw-r--r--src/api/admin/worker.rs74
-rw-r--r--src/api/common/router_macros.rs3
-rw-r--r--src/garage/admin/mod.rs30
-rw-r--r--src/garage/cli/cmd.rs6
-rw-r--r--src/garage/cli/util.rs117
-rw-r--r--src/garage/cli_v2/worker.rs145
-rw-r--r--src/garage/server.rs3
-rw-r--r--src/util/background/mod.rs5
-rw-r--r--src/util/background/worker.rs14
13 files changed, 324 insertions, 188 deletions
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
index 89ddb286..1034f59c 100644
--- a/src/api/admin/api.rs
+++ b/src/api/admin/api.rs
@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use garage_rpc::*;
use garage_model::garage::Garage;
+use garage_util::error::Error as GarageError;
use garage_api_common::common_error::CommonErrorDerivative;
use garage_api_common::helpers::is_default;
@@ -78,11 +79,46 @@ admin_endpoints![
RemoveBucketAlias,
// Worker operations
+ ListWorkers,
+ GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
];
-local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,];
+local_admin_endpoints![
+ // Background workers
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+];
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiRequest<RB> {
+ pub node: String,
+ pub body: RB,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiResponse<RB> {
+ pub success: HashMap<String, RB>,
+ pub error: HashMap<String, String>,
+}
+
+impl<RB> MultiResponse<RB> {
+ pub fn into_single_response(self) -> Result<RB, GarageError> {
+ if let Some((_, e)) = self.error.into_iter().next() {
+ return Err(GarageError::Message(e));
+ }
+ if self.success.len() != 1 {
+ return Err(GarageError::Message(format!(
+ "{} responses returned, expected 1",
+ self.success.len()
+ )));
+ }
+ Ok(self.success.into_iter().next().unwrap().1)
+ }
+}
// **********************************************
// Special endpoints
@@ -596,6 +632,61 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
// Worker operations
// **********************************************
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalListWorkersRequest {
+ #[serde(default)]
+ pub busy_only: bool,
+ #[serde(default)]
+ pub error_only: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerInfoResp {
+ pub id: u64,
+ pub name: String,
+ pub state: WorkerStateResp,
+ pub errors: u64,
+ pub consecutive_errors: u64,
+ pub last_error: Option<WorkerLastError>,
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum WorkerStateResp {
+ Busy,
+ Throttled { duration_secs: f32 },
+ Idle,
+ Done,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerLastError {
+ pub message: String,
+ pub secs_ago: u64,
+}
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoRequest {
+ pub id: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
+
// ---- GetWorkerVariable ----
#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 3712ee7d..354a3bab 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -25,6 +25,10 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
+ /// The requested worker does not exist
+ #[error(display = "Worker not found: {}", _0)]
+ NoSuchWorker(u64),
+
/// In Import key, the key already exists
#[error(
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
@@ -53,6 +57,7 @@ impl Error {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::NoSuchWorker(_) => "NoSuchWorker",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@@ -63,7 +68,7 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
- Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND,
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}
diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs
index bf7eede9..4b183bec 100644
--- a/src/api/admin/macros.rs
+++ b/src/api/admin/macros.rs
@@ -111,19 +111,11 @@ macro_rules! local_admin_endpoints {
}
$(
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct [< $endpoint Request >] {
- pub node: String,
- pub body: [< Local $endpoint Request >],
- }
+ pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct [< $endpoint Response >] {
- pub success: HashMap<String, [< Local $endpoint Response >] >,
- pub error: HashMap<String, String>,
- }
+ pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
index e0ce5b93..6334b3b1 100644
--- a/src/api/admin/router_v2.rs
+++ b/src/api/admin/router_v2.rs
@@ -60,7 +60,10 @@ impl AdminApiRequest {
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
// Worker APIs
+ POST ListWorkers (body_field, query::node),
+ POST GetWorkerInfo (body_field, query::node),
POST GetWorkerVariable (body_field, query::node),
+ POST SetWorkerVariable (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
index 78508175..c7c75700 100644
--- a/src/api/admin/worker.rs
+++ b/src/api/admin/worker.rs
@@ -3,6 +3,9 @@ use std::sync::Arc;
use async_trait::async_trait;
+use garage_util::background::*;
+use garage_util::time::now_msec;
+
use garage_model::garage::Garage;
use crate::api::*;
@@ -10,6 +13,50 @@ use crate::error::Error;
use crate::{Admin, RequestHandler};
#[async_trait]
+impl RequestHandler for LocalListWorkersRequest {
+ type Response = LocalListWorkersResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalListWorkersResponse, Error> {
+ let workers = admin.background.get_worker_info();
+ let info = workers
+ .into_iter()
+ .filter(|(_, w)| {
+ (!self.busy_only
+ || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
+ && (!self.error_only || w.errors > 0)
+ })
+ .map(|(id, w)| worker_info_to_api(id as u64, w))
+ .collect::<Vec<_>>();
+ Ok(LocalListWorkersResponse(info))
+ }
+}
+
+#[async_trait]
+impl RequestHandler for LocalGetWorkerInfoRequest {
+ type Response = LocalGetWorkerInfoResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalGetWorkerInfoResponse, Error> {
+ let info = admin
+ .background
+ .get_worker_info()
+ .get(&(self.id as usize))
+ .ok_or(Error::NoSuchWorker(self.id))?
+ .clone();
+ Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
+ self.id, info,
+ )))
+ }
+}
+
+#[async_trait]
impl RequestHandler for LocalGetWorkerVariableRequest {
type Response = LocalGetWorkerVariableResponse;
@@ -48,3 +95,30 @@ impl RequestHandler for LocalSetWorkerVariableRequest {
})
}
}
+
+// ---- helper functions ----
+
+fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
+ WorkerInfoResp {
+ id: id,
+ name: info.name,
+ state: match info.state {
+ WorkerState::Busy => WorkerStateResp::Busy,
+ WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
+ WorkerState::Idle => WorkerStateResp::Idle,
+ WorkerState::Done => WorkerStateResp::Done,
+ },
+ errors: info.errors as u64,
+ consecutive_errors: info.consecutive_errors as u64,
+ last_error: info.last_error.map(|(message, t)| WorkerLastError {
+ message,
+ secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000,
+ }),
+
+ tranquility: info.status.tranquility,
+ progress: info.status.progress,
+ queue_length: info.status.queue_length,
+ persistent_errors: info.status.persistent_errors,
+ freeform: info.status.freeform,
+ }
+}
diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs
index 299420f7..f4a93c67 100644
--- a/src/api/common/router_macros.rs
+++ b/src/api/common/router_macros.rs
@@ -141,6 +141,9 @@ macro_rules! router_match {
}
}};
+ (@@parse_param $query:expr, default, $param:ident) => {{
+ Default::default()
+ }};
(@@parse_param $query:expr, query_opt, $param:ident) => {{
// extract optional query parameter
$query.$param.take().map(|param| param.into_owned())
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index f493d0c5..c0e63524 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -22,7 +22,7 @@ use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::garage::Garage;
-use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::helper::error::Error;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
@@ -40,17 +40,11 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
pub enum AdminRpc {
LaunchRepair(RepairOpt),
Stats(StatsOpt),
- Worker(WorkerOperation),
BlockOperation(BlockOperation),
MetaOperation(MetaOperation),
// Replies
Ok(String),
- WorkerList(
- HashMap<usize, garage_util::background::WorkerInfo>,
- WorkerListOpt,
- ),
- WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
hash: Hash,
@@ -340,27 +334,6 @@ impl AdminRpcHandler {
))
}
- // ================ WORKER COMMANDS ====================
-
- async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
- match cmd {
- WorkerOperation::List { opt } => {
- let workers = self.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, *opt))
- }
- WorkerOperation::Info { tid } => {
- let info = self
- .background
- .get_worker_info()
- .get(tid)
- .ok_or_bad_request(format!("No worker with TID {}", tid))?
- .clone();
- Ok(AdminRpc::WorkerInfo(*tid, info))
- }
- _ => unreachable!(),
- }
- }
-
// ================ META DB COMMANDS ====================
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
@@ -409,7 +382,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
match message {
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 6f1b0681..bc34d014 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -17,12 +17,6 @@ pub async fn cmd_admin(
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
- AdminRpc::WorkerList(wi, wlo) => {
- print_worker_list(wi, wlo);
- }
- AdminRpc::WorkerInfo(tid, wi) => {
- print_worker_info(tid, wi);
- }
AdminRpc::BlockErrorList(el) => {
print_block_error_list(el);
}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 8261fb3e..43b28623 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,8 +1,6 @@
-use std::collections::HashMap;
use std::time::Duration;
use format_table::format_table;
-use garage_util::background::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -11,121 +9,6 @@ use garage_block::manager::BlockResyncErrorInfo;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::*;
-use crate::cli::structs::WorkerListOpt;
-
-pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
- let mut wi = wi.into_iter().collect::<Vec<_>>();
- wi.sort_by_key(|(tid, info)| {
- (
- match info.state {
- WorkerState::Busy | WorkerState::Throttled(_) => 0,
- WorkerState::Idle => 1,
- WorkerState::Done => 2,
- },
- *tid,
- )
- });
-
- let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
- for (tid, info) in wi.iter() {
- if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
- continue;
- }
- if wlo.errors && info.errors == 0 {
- continue;
- }
-
- let tf = timeago::Formatter::new();
- let err_ago = info
- .last_error
- .as_ref()
- .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
- .unwrap_or_default();
- let (total_err, consec_err) = if info.errors > 0 {
- (info.errors.to_string(), info.consecutive_errors.to_string())
- } else {
- ("-".into(), "-".into())
- };
-
- table.push(format!(
- "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
- tid,
- info.state,
- info.name,
- info.status
- .tranquility
- .as_ref()
- .map(ToString::to_string)
- .unwrap_or_else(|| "-".into()),
- info.status.progress.as_deref().unwrap_or("-"),
- info.status
- .queue_length
- .as_ref()
- .map(ToString::to_string)
- .unwrap_or_else(|| "-".into()),
- total_err,
- consec_err,
- err_ago,
- ));
- }
- format_table(table);
-}
-
-pub fn print_worker_info(tid: usize, info: WorkerInfo) {
- let mut table = vec![];
- table.push(format!("Task id:\t{}", tid));
- table.push(format!("Worker name:\t{}", info.name));
- match info.state {
- WorkerState::Throttled(t) => {
- table.push(format!(
- "Worker state:\tBusy (throttled, paused for {:.3}s)",
- t
- ));
- }
- s => {
- table.push(format!("Worker state:\t{}", s));
- }
- };
- if let Some(tql) = info.status.tranquility {
- table.push(format!("Tranquility:\t{}", tql));
- }
-
- table.push("".into());
- table.push(format!("Total errors:\t{}", info.errors));
- table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
- if let Some((s, t)) = info.last_error {
- table.push(format!("Last error:\t{}", s));
- let tf = timeago::Formatter::new();
- table.push(format!(
- "Last error time:\t{}",
- tf.convert(Duration::from_millis(now_msec() - t))
- ));
- }
-
- table.push("".into());
- if let Some(p) = info.status.progress {
- table.push(format!("Progress:\t{}", p));
- }
- if let Some(ql) = info.status.queue_length {
- table.push(format!("Queue length:\t{}", ql));
- }
- if let Some(pe) = info.status.persistent_errors {
- table.push(format!("Persistent errors:\t{}", pe));
- }
-
- for (i, s) in info.status.freeform.iter().enumerate() {
- if i == 0 {
- if table.last() != Some(&"".into()) {
- table.push("".into());
- }
- table.push(format!("Message:\t{}", s));
- } else {
- table.push(format!("\t{}", s));
- }
- }
- format_table(table);
-}
-
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs
index 0dfe3e96..9db729ec 100644
--- a/src/garage/cli_v2/worker.rs
+++ b/src/garage/cli_v2/worker.rs
@@ -11,6 +11,8 @@ use crate::cli_v2::*;
impl Cli {
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
match cmd {
+ WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
+ WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
WorkerOperation::Get {
all_nodes,
variable,
@@ -20,14 +22,136 @@ impl Cli {
variable,
value,
} => self.cmd_set_var(all_nodes, variable, value).await,
- wo => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::Worker(wo),
+ }
+ }
+
+ pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
+ let mut list = self
+ .api_request(ListWorkersRequest {
+ node: hex::encode(self.rpc_host),
+ body: LocalListWorkersRequest {
+ busy_only: opt.busy,
+ error_only: opt.errors,
+ },
+ })
+ .await?
+ .into_single_response()?
+ .0;
+
+ list.sort_by_key(|info| {
+ (
+ match info.state {
+ WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
+ WorkerStateResp::Idle => 1,
+ WorkerStateResp::Done => 2,
+ },
+ info.id,
)
- .await
- .ok_or_message("cli_v1"),
+ });
+
+ let mut table =
+ vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
+ let tf = timeago::Formatter::new();
+ for info in list.iter() {
+ let err_ago = info
+ .last_error
+ .as_ref()
+ .map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ info.id,
+ format_worker_state(&info.state),
+ info.name,
+ info.tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ info.progress.as_deref().unwrap_or("-"),
+ info.queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
}
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
+ let info = self
+ .api_request(GetWorkerInfoRequest {
+ node: hex::encode(self.rpc_host),
+ body: LocalGetWorkerInfoRequest { id: tid as u64 },
+ })
+ .await?
+ .into_single_response()?
+ .0;
+
+ let mut table = vec![];
+ table.push(format!("Task id:\t{}", info.id));
+ table.push(format!("Worker name:\t{}", info.name));
+ match &info.state {
+ WorkerStateResp::Throttled { duration_secs } => {
+ table.push(format!(
+ "Worker state:\tBusy (throttled, paused for {:.3}s)",
+ duration_secs
+ ));
+ }
+ s => {
+ table.push(format!("Worker state:\t{}", format_worker_state(s)));
+ }
+ };
+ if let Some(tql) = info.tranquility {
+ table.push(format!("Tranquility:\t{}", tql));
+ }
+
+ table.push("".into());
+ table.push(format!("Total errors:\t{}", info.errors));
+ table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
+ if let Some(err) = info.last_error {
+ table.push(format!("Last error:\t{}", err.message));
+ let tf = timeago::Formatter::new();
+ table.push(format!(
+ "Last error time:\t{}",
+ tf.convert(Duration::from_secs(err.secs_ago))
+ ));
+ }
+
+ table.push("".into());
+ if let Some(p) = info.progress {
+ table.push(format!("Progress:\t{}", p));
+ }
+ if let Some(ql) = info.queue_length {
+ table.push(format!("Queue length:\t{}", ql));
+ }
+ if let Some(pe) = info.persistent_errors {
+ table.push(format!("Persistent errors:\t{}", pe));
+ }
+
+ for (i, s) in info.freeform.iter().enumerate() {
+ if i == 0 {
+ if table.last() != Some(&"".into()) {
+ table.push("".into());
+ }
+ table.push(format!("Message:\t{}", s));
+ } else {
+ table.push(format!("\t{}", s));
+ }
+ }
+ format_table(table);
+
+ Ok(())
}
pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
@@ -87,3 +211,12 @@ impl Cli {
Ok(())
}
}
+
+fn format_worker_state(s: &WorkerStateResp) -> &'static str {
+ match s {
+ WorkerStateResp::Busy => "Busy",
+ WorkerStateResp::Throttled { .. } => "Busy*",
+ WorkerStateResp::Idle => "Idle",
+ WorkerStateResp::Done => "Done",
+ }
+}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index f17f641b..e629041c 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -1,5 +1,4 @@
use std::path::PathBuf;
-use std::sync::Arc;
use tokio::sync::watch;
@@ -65,7 +64,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
}
info!("Initialize Admin API server and metrics collector...");
- let admin_server: Arc<AdminApiServer> = AdminApiServer::new(
+ let admin_server = AdminApiServer::new(
garage.clone(),
background.clone(),
#[cfg(feature = "metrics")]
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 607cd7a3..cae3a462 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -6,7 +6,6 @@ pub mod worker;
use std::collections::HashMap;
use std::sync::Arc;
-use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use worker::WorkerProcessor;
@@ -18,7 +17,7 @@ pub struct BackgroundRunner {
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
-#[derive(Clone, Serialize, Deserialize, Debug)]
+#[derive(Clone, Debug)]
pub struct WorkerInfo {
pub name: String,
pub status: WorkerStatus,
@@ -30,7 +29,7 @@ pub struct WorkerInfo {
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
-#[derive(Clone, Serialize, Deserialize, Debug, Default)]
+#[derive(Clone, Debug, Default)]
pub struct WorkerStatus {
pub tranquility: Option<u32>,
pub progress: Option<String>,
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 76fb14e8..9028a052 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -6,7 +6,6 @@ use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
-use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -18,7 +17,7 @@ use crate::time::now_msec;
// will be interrupted in the middle of whatever they are doing.
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
-#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerState {
Busy,
Throttled(f32),
@@ -26,17 +25,6 @@ pub enum WorkerState {
Done,
}
-impl std::fmt::Display for WorkerState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- WorkerState::Busy => write!(f, "Busy"),
- WorkerState::Throttled(_) => write!(f, "Busy*"),
- WorkerState::Idle => write!(f, "Idle"),
- WorkerState::Done => write!(f, "Done"),
- }
- }
-}
-
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;