diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/admin/api.rs | 93 | ||||
-rw-r--r-- | src/api/admin/error.rs | 7 | ||||
-rw-r--r-- | src/api/admin/macros.rs | 12 | ||||
-rw-r--r-- | src/api/admin/router_v2.rs | 3 | ||||
-rw-r--r-- | src/api/admin/worker.rs | 74 | ||||
-rw-r--r-- | src/api/common/router_macros.rs | 3 |
6 files changed, 180 insertions, 12 deletions
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 89ddb286..1034f59c 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use garage_rpc::*; use garage_model::garage::Garage; +use garage_util::error::Error as GarageError; use garage_api_common::common_error::CommonErrorDerivative; use garage_api_common::helpers::is_default; @@ -78,11 +79,46 @@ admin_endpoints![ RemoveBucketAlias, // Worker operations + ListWorkers, + GetWorkerInfo, GetWorkerVariable, SetWorkerVariable, ]; -local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,]; +local_admin_endpoints![ + // Background workers + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, +]; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiRequest<RB> { + pub node: String, + pub body: RB, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiResponse<RB> { + pub success: HashMap<String, RB>, + pub error: HashMap<String, String>, +} + +impl<RB> MultiResponse<RB> { + pub fn into_single_response(self) -> Result<RB, GarageError> { + if let Some((_, e)) = self.error.into_iter().next() { + return Err(GarageError::Message(e)); + } + if self.success.len() != 1 { + return Err(GarageError::Message(format!( + "{} responses returned, expected 1", + self.success.len() + ))); + } + Ok(self.success.into_iter().next().unwrap().1) + } +} // ********************************************** // Special endpoints @@ -596,6 +632,61 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse); // Worker operations // ********************************************** +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalListWorkersRequest { + #[serde(default)] + pub busy_only: bool, + #[serde(default)] + pub error_only: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerInfoResp { + pub id: u64, + pub name: String, + pub state: WorkerStateResp, + pub errors: u64, + pub consecutive_errors: u64, + pub last_error: Option<WorkerLastError>, + pub tranquility: Option<u32>, + pub progress: Option<String>, + pub queue_length: Option<u64>, + pub persistent_errors: Option<u64>, + pub freeform: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum WorkerStateResp { + Busy, + Throttled { duration_secs: f32 }, + Idle, + Done, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerLastError { + pub message: String, + pub secs_ago: u64, +} + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoRequest { + pub id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp); + // ---- GetWorkerVariable ---- #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 3712ee7d..354a3bab 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -25,6 +25,10 @@ pub enum Error { #[error(display = "Access key not found: {}", _0)] NoSuchAccessKey(String), + /// The requested worker does not exist + #[error(display = "Worker not found: {}", _0)] + NoSuchWorker(u64), + /// In Import key, the key already exists #[error( display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", @@ -53,6 +57,7 @@ impl Error { match self { Error::Common(c) => c.aws_code(), Error::NoSuchAccessKey(_) => "NoSuchAccessKey", + Error::NoSuchWorker(_) => "NoSuchWorker", Error::KeyAlreadyExists(_) => "KeyAlreadyExists", } } @@ -63,7 +68,7 @@ impl ApiError for Error { fn http_status_code(&self) -> StatusCode { match self { Error::Common(c) => c.http_status_code(), - Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND, + Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND, Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, } } diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs index bf7eede9..4b183bec 100644 --- a/src/api/admin/macros.rs +++ b/src/api/admin/macros.rs @@ -111,19 +111,11 @@ macro_rules! local_admin_endpoints { } $( - #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct [< $endpoint Request >] { - pub node: String, - pub body: [< Local $endpoint Request >], - } + pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >; pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >]; - #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct [< $endpoint Response >] { - pub success: HashMap<String, [< Local $endpoint Response >] >, - pub error: HashMap<String, String>, - } + pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >; impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest { fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest { diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index e0ce5b93..6334b3b1 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -60,7 +60,10 @@ impl AdminApiRequest { POST AddBucketAlias (body), POST RemoveBucketAlias (body), // Worker APIs + POST ListWorkers (body_field, query::node), + POST GetWorkerInfo (body_field, query::node), POST GetWorkerVariable (body_field, query::node), + POST SetWorkerVariable (body_field, query::node), ]); if let Some(message) = query.nonempty_message() { diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs index 78508175..c7c75700 100644 --- a/src/api/admin/worker.rs +++ b/src/api/admin/worker.rs @@ -3,6 +3,9 @@ use std::sync::Arc; use async_trait::async_trait; +use garage_util::background::*; +use garage_util::time::now_msec; + use garage_model::garage::Garage; use crate::api::*; @@ -10,6 +13,50 @@ use crate::error::Error; use crate::{Admin, RequestHandler}; #[async_trait] +impl RequestHandler for LocalListWorkersRequest { + type Response = LocalListWorkersResponse; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalListWorkersResponse, Error> { + let workers = admin.background.get_worker_info(); + let info = workers + .into_iter() + .filter(|(_, w)| { + (!self.busy_only + || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_))) + && (!self.error_only || w.errors > 0) + }) + .map(|(id, w)| worker_info_to_api(id as u64, w)) + .collect::<Vec<_>>(); + Ok(LocalListWorkersResponse(info)) + } +} + +#[async_trait] +impl RequestHandler for LocalGetWorkerInfoRequest { + type Response = LocalGetWorkerInfoResponse; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalGetWorkerInfoResponse, Error> { + let info = admin + .background + .get_worker_info() + .get(&(self.id as usize)) + .ok_or(Error::NoSuchWorker(self.id))? + .clone(); + Ok(LocalGetWorkerInfoResponse(worker_info_to_api( + self.id, info, + ))) + } +} + +#[async_trait] impl RequestHandler for LocalGetWorkerVariableRequest { type Response = LocalGetWorkerVariableResponse; @@ -48,3 +95,30 @@ impl RequestHandler for LocalSetWorkerVariableRequest { }) } } + +// ---- helper functions ---- + +fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { + WorkerInfoResp { + id: id, + name: info.name, + state: match info.state { + WorkerState::Busy => WorkerStateResp::Busy, + WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t }, + WorkerState::Idle => WorkerStateResp::Idle, + WorkerState::Done => WorkerStateResp::Done, + }, + errors: info.errors as u64, + consecutive_errors: info.consecutive_errors as u64, + last_error: info.last_error.map(|(message, t)| WorkerLastError { + message, + secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000, + }), + + tranquility: info.status.tranquility, + progress: info.status.progress, + queue_length: info.status.queue_length, + persistent_errors: info.status.persistent_errors, + freeform: info.status.freeform, + } +} diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs index 299420f7..f4a93c67 100644 --- a/src/api/common/router_macros.rs +++ b/src/api/common/router_macros.rs @@ -141,6 +141,9 @@ macro_rules! router_match { } }}; + (@@parse_param $query:expr, default, $param:ident) => {{ + Default::default() + }}; (@@parse_param $query:expr, query_opt, $param:ident) => {{ // extract optional query parameter $query.$param.take().map(|param| param.into_owned()) |