aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/admin/api.rs93
-rw-r--r--src/api/admin/error.rs7
-rw-r--r--src/api/admin/macros.rs12
-rw-r--r--src/api/admin/router_v2.rs3
-rw-r--r--src/api/admin/worker.rs74
-rw-r--r--src/api/common/router_macros.rs3
6 files changed, 180 insertions, 12 deletions
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
index 89ddb286..1034f59c 100644
--- a/src/api/admin/api.rs
+++ b/src/api/admin/api.rs
@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use garage_rpc::*;
use garage_model::garage::Garage;
+use garage_util::error::Error as GarageError;
use garage_api_common::common_error::CommonErrorDerivative;
use garage_api_common::helpers::is_default;
@@ -78,11 +79,46 @@ admin_endpoints![
RemoveBucketAlias,
// Worker operations
+ ListWorkers,
+ GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
];
-local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,];
+local_admin_endpoints![
+ // Background workers
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+];
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiRequest<RB> {
+ pub node: String,
+ pub body: RB,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiResponse<RB> {
+ pub success: HashMap<String, RB>,
+ pub error: HashMap<String, String>,
+}
+
+impl<RB> MultiResponse<RB> {
+ pub fn into_single_response(self) -> Result<RB, GarageError> {
+ if let Some((_, e)) = self.error.into_iter().next() {
+ return Err(GarageError::Message(e));
+ }
+ if self.success.len() != 1 {
+ return Err(GarageError::Message(format!(
+ "{} responses returned, expected 1",
+ self.success.len()
+ )));
+ }
+ Ok(self.success.into_iter().next().unwrap().1)
+ }
+}
// **********************************************
// Special endpoints
@@ -596,6 +632,61 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
// Worker operations
// **********************************************
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalListWorkersRequest {
+ #[serde(default)]
+ pub busy_only: bool,
+ #[serde(default)]
+ pub error_only: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerInfoResp {
+ pub id: u64,
+ pub name: String,
+ pub state: WorkerStateResp,
+ pub errors: u64,
+ pub consecutive_errors: u64,
+ pub last_error: Option<WorkerLastError>,
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum WorkerStateResp {
+ Busy,
+ Throttled { duration_secs: f32 },
+ Idle,
+ Done,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerLastError {
+ pub message: String,
+ pub secs_ago: u64,
+}
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoRequest {
+ pub id: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
+
// ---- GetWorkerVariable ----
#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 3712ee7d..354a3bab 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -25,6 +25,10 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
+ /// The requested worker does not exist
+ #[error(display = "Worker not found: {}", _0)]
+ NoSuchWorker(u64),
+
/// In Import key, the key already exists
#[error(
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
@@ -53,6 +57,7 @@ impl Error {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::NoSuchWorker(_) => "NoSuchWorker",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@@ -63,7 +68,7 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
- Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND,
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}
diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs
index bf7eede9..4b183bec 100644
--- a/src/api/admin/macros.rs
+++ b/src/api/admin/macros.rs
@@ -111,19 +111,11 @@ macro_rules! local_admin_endpoints {
}
$(
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct [< $endpoint Request >] {
- pub node: String,
- pub body: [< Local $endpoint Request >],
- }
+ pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct [< $endpoint Response >] {
- pub success: HashMap<String, [< Local $endpoint Response >] >,
- pub error: HashMap<String, String>,
- }
+ pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
index e0ce5b93..6334b3b1 100644
--- a/src/api/admin/router_v2.rs
+++ b/src/api/admin/router_v2.rs
@@ -60,7 +60,10 @@ impl AdminApiRequest {
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
// Worker APIs
+ POST ListWorkers (body_field, query::node),
+ POST GetWorkerInfo (body_field, query::node),
POST GetWorkerVariable (body_field, query::node),
+ POST SetWorkerVariable (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
index 78508175..c7c75700 100644
--- a/src/api/admin/worker.rs
+++ b/src/api/admin/worker.rs
@@ -3,6 +3,9 @@ use std::sync::Arc;
use async_trait::async_trait;
+use garage_util::background::*;
+use garage_util::time::now_msec;
+
use garage_model::garage::Garage;
use crate::api::*;
@@ -10,6 +13,50 @@ use crate::error::Error;
use crate::{Admin, RequestHandler};
#[async_trait]
+impl RequestHandler for LocalListWorkersRequest {
+ type Response = LocalListWorkersResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalListWorkersResponse, Error> {
+ let workers = admin.background.get_worker_info();
+ let info = workers
+ .into_iter()
+ .filter(|(_, w)| {
+ (!self.busy_only
+ || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
+ && (!self.error_only || w.errors > 0)
+ })
+ .map(|(id, w)| worker_info_to_api(id as u64, w))
+ .collect::<Vec<_>>();
+ Ok(LocalListWorkersResponse(info))
+ }
+}
+
+#[async_trait]
+impl RequestHandler for LocalGetWorkerInfoRequest {
+ type Response = LocalGetWorkerInfoResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalGetWorkerInfoResponse, Error> {
+ let info = admin
+ .background
+ .get_worker_info()
+ .get(&(self.id as usize))
+ .ok_or(Error::NoSuchWorker(self.id))?
+ .clone();
+ Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
+ self.id, info,
+ )))
+ }
+}
+
+#[async_trait]
impl RequestHandler for LocalGetWorkerVariableRequest {
type Response = LocalGetWorkerVariableResponse;
@@ -48,3 +95,30 @@ impl RequestHandler for LocalSetWorkerVariableRequest {
})
}
}
+
+// ---- helper functions ----
+
+fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
+ WorkerInfoResp {
+ id: id,
+ name: info.name,
+ state: match info.state {
+ WorkerState::Busy => WorkerStateResp::Busy,
+ WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
+ WorkerState::Idle => WorkerStateResp::Idle,
+ WorkerState::Done => WorkerStateResp::Done,
+ },
+ errors: info.errors as u64,
+ consecutive_errors: info.consecutive_errors as u64,
+ last_error: info.last_error.map(|(message, t)| WorkerLastError {
+ message,
+ secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000,
+ }),
+
+ tranquility: info.status.tranquility,
+ progress: info.status.progress,
+ queue_length: info.status.queue_length,
+ persistent_errors: info.status.persistent_errors,
+ freeform: info.status.freeform,
+ }
+}
diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs
index 299420f7..f4a93c67 100644
--- a/src/api/common/router_macros.rs
+++ b/src/api/common/router_macros.rs
@@ -141,6 +141,9 @@ macro_rules! router_match {
}
}};
+ (@@parse_param $query:expr, default, $param:ident) => {{
+ Default::default()
+ }};
(@@parse_param $query:expr, query_opt, $param:ident) => {{
// extract optional query parameter
$query.$param.take().map(|param| param.into_owned())