aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/admin/api.rs41
-rw-r--r--src/api/admin/api_server.rs129
-rw-r--r--src/api/admin/bucket.rs82
-rw-r--r--src/api/admin/cluster.rs58
-rw-r--r--src/api/admin/key.rs46
-rw-r--r--src/api/admin/lib.rs12
-rw-r--r--src/api/admin/macros.rs142
-rw-r--r--src/api/admin/router_v2.rs3
-rw-r--r--src/api/admin/special.rs26
-rw-r--r--src/api/admin/worker.rs50
10 files changed, 506 insertions, 83 deletions
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
index 44fc9fca..89ddb286 100644
--- a/src/api/admin/api.rs
+++ b/src/api/admin/api.rs
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -6,13 +7,17 @@ use async_trait::async_trait;
use paste::paste;
use serde::{Deserialize, Serialize};
+use garage_rpc::*;
+
use garage_model::garage::Garage;
+use garage_api_common::common_error::CommonErrorDerivative;
use garage_api_common::helpers::is_default;
+use crate::api_server::{AdminRpc, AdminRpcResponse};
use crate::error::Error;
use crate::macros::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
// This generates the following:
//
@@ -71,8 +76,14 @@ admin_endpoints![
// Operations on bucket aliases
AddBucketAlias,
RemoveBucketAlias,
+
+ // Worker operations
+ GetWorkerVariable,
+ SetWorkerVariable,
];
+local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,];
+
// **********************************************
// Special endpoints
//
@@ -580,3 +591,31 @@ pub struct RemoveBucketAliasRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
+
+// **********************************************
+// Worker operations
+// **********************************************
+
+// ---- GetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableRequest {
+ pub variable: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
+
+// ---- SetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableRequest {
+ pub variable: String,
+ pub value: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableResponse {
+ pub variable: String,
+ pub value: String,
+}
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index be29e617..e865d199 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -6,6 +6,7 @@ use async_trait::async_trait;
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
@@ -16,6 +17,8 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
+use garage_rpc::{Endpoint as RpcEndpoint, *};
+use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
@@ -27,7 +30,70 @@ use crate::error::*;
use crate::router_v0;
use crate::router_v1;
use crate::Authorization;
-use crate::EndpointHandler;
+use crate::RequestHandler;
+
+// ---- FOR RPC ----
+
+pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpc {
+ Proxy(AdminApiRequest),
+ Internal(LocalAdminApiRequest),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpcResponse {
+ ProxyApiOkResponse(TaggedAdminApiResponse),
+ InternalApiOkResponse(LocalAdminApiResponse),
+ ApiErrorResponse {
+ http_code: u16,
+ error_code: String,
+ message: String,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpcResponse, GarageError>;
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminApiServer {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpcResponse, GarageError> {
+ match message {
+ AdminRpc::Proxy(req) => {
+ info!("Proxied admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ AdminRpc::Internal(req) => {
+ info!("Internal admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ }
+ }
+}
+
+// ---- FOR HTTP ----
pub type ResBody = BoxBody<Error>;
@@ -37,37 +103,48 @@ pub struct AdminApiServer {
exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
+ pub(crate) background: Arc<BackgroundRunner>,
+ pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
}
-pub enum Endpoint {
+pub enum HttpEndpoint {
Old(router_v1::Endpoint),
New(String),
}
+struct ArcAdminApiServer(Arc<AdminApiServer>);
+
impl AdminApiServer {
pub fn new(
garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
- ) -> Self {
+ ) -> Arc<Self> {
let cfg = &garage.config.admin;
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
- Self {
+
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
garage,
#[cfg(feature = "metrics")]
exporter,
metrics_token,
admin_token,
- }
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
pub async fn run(
- self,
+ self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
- ApiServer::new(region, self)
+ ApiServer::new(region, ArcAdminApiServer(self))
.run_server(bind_addr, Some(0o220), must_exit)
.await
}
@@ -102,36 +179,46 @@ impl AdminApiServer {
}
#[async_trait]
-impl ApiHandler for AdminApiServer {
+impl ApiHandler for ArcAdminApiServer {
const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin";
- type Endpoint = Endpoint;
+ type Endpoint = HttpEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
- Ok(Endpoint::Old(endpoint_v1))
+ Ok(HttpEndpoint::Old(endpoint_v1))
} else if req.uri().path().starts_with("/v1/") {
let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
- Ok(Endpoint::Old(endpoint_v1))
+ Ok(HttpEndpoint::Old(endpoint_v1))
} else {
- Ok(Endpoint::New(req.uri().path().to_string()))
+ Ok(HttpEndpoint::New(req.uri().path().to_string()))
}
}
async fn handle(
&self,
req: Request<IncomingBody>,
- endpoint: Endpoint,
+ endpoint: HttpEndpoint,
+ ) -> Result<Response<ResBody>, Error> {
+ self.0.handle_http_api(req, endpoint).await
+ }
+}
+
+impl AdminApiServer {
+ async fn handle_http_api(
+ &self,
+ req: Request<IncomingBody>,
+ endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
let auth_header = req.headers().get(AUTHORIZATION).cloned();
let request = match endpoint {
- Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
- Endpoint::New(_) => AdminApiRequest::from_request(req).await?,
+ HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
+ HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
};
let required_auth_hash =
@@ -156,12 +243,12 @@ impl ApiHandler for AdminApiServer {
}
match request {
- AdminApiRequest::Options(req) => req.handle(&self.garage).await,
- AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await,
- AdminApiRequest::Health(req) => req.handle(&self.garage).await,
+ AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::Metrics(_req) => self.handle_metrics(),
req => {
- let res = req.handle(&self.garage).await?;
+ let res = req.handle(&self.garage, &self).await?;
let mut res = json_ok_response(&res)?;
res.headers_mut()
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
@@ -171,7 +258,7 @@ impl ApiHandler for AdminApiServer {
}
}
-impl ApiEndpoint for Endpoint {
+impl ApiEndpoint for HttpEndpoint {
fn name(&self) -> Cow<'static, str> {
match self {
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 7b7c09e7..73e63df0 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -21,13 +21,17 @@ use garage_api_common::common_error::CommonError;
use crate::api::*;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
#[async_trait]
-impl EndpointHandler for ListBucketsRequest {
+impl RequestHandler for ListBucketsRequest {
type Response = ListBucketsResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ListBucketsResponse, Error> {
let buckets = garage
.bucket_table
.get_range(
@@ -71,10 +75,14 @@ impl EndpointHandler for ListBucketsRequest {
}
#[async_trait]
-impl EndpointHandler for GetBucketInfoRequest {
+impl RequestHandler for GetBucketInfoRequest {
type Response = GetBucketInfoResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage
@@ -223,10 +231,14 @@ async fn bucket_info_results(
}
#[async_trait]
-impl EndpointHandler for CreateBucketRequest {
+impl RequestHandler for CreateBucketRequest {
type Response = CreateBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateBucketResponse, Error> {
let helper = garage.locked_helper().await;
if let Some(ga) = &self.global_alias {
@@ -294,10 +306,14 @@ impl EndpointHandler for CreateBucketRequest {
}
#[async_trait]
-impl EndpointHandler for DeleteBucketRequest {
+impl RequestHandler for DeleteBucketRequest {
type Response = DeleteBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteBucketResponse, Error> {
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&self.id)?;
@@ -343,10 +359,14 @@ impl EndpointHandler for DeleteBucketRequest {
}
#[async_trait]
-impl EndpointHandler for UpdateBucketRequest {
+impl RequestHandler for UpdateBucketRequest {
type Response = UpdateBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateBucketResponse, Error> {
let bucket_id = parse_bucket_id(&self.id)?;
let mut bucket = garage
@@ -390,10 +410,14 @@ impl EndpointHandler for UpdateBucketRequest {
}
#[async_trait]
-impl EndpointHandler for CleanupIncompleteUploadsRequest {
+impl RequestHandler for CleanupIncompleteUploadsRequest {
type Response = CleanupIncompleteUploadsResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CleanupIncompleteUploadsResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CleanupIncompleteUploadsResponse, Error> {
let duration = Duration::from_secs(self.older_than_secs);
let bucket_id = parse_bucket_id(&self.bucket_id)?;
@@ -412,20 +436,28 @@ impl EndpointHandler for CleanupIncompleteUploadsRequest {
// ---- BUCKET/KEY PERMISSIONS ----
#[async_trait]
-impl EndpointHandler for AllowBucketKeyRequest {
+impl RequestHandler for AllowBucketKeyRequest {
type Response = AllowBucketKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AllowBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
Ok(AllowBucketKeyResponse(res))
}
}
#[async_trait]
-impl EndpointHandler for DenyBucketKeyRequest {
+impl RequestHandler for DenyBucketKeyRequest {
type Response = DenyBucketKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DenyBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
Ok(DenyBucketKeyResponse(res))
}
@@ -471,10 +503,14 @@ pub async fn handle_bucket_change_key_perm(
// ---- BUCKET ALIASES ----
#[async_trait]
-impl EndpointHandler for AddBucketAliasRequest {
+impl RequestHandler for AddBucketAliasRequest {
type Response = AddBucketAliasResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AddBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
@@ -502,10 +538,14 @@ impl EndpointHandler for AddBucketAliasRequest {
}
#[async_trait]
-impl EndpointHandler for RemoveBucketAliasRequest {
+impl RequestHandler for RemoveBucketAliasRequest {
type Response = RemoveBucketAliasResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RemoveBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index dc16bd50..6a7a3d69 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -12,13 +12,17 @@ use garage_model::garage::Garage;
use crate::api::*;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
#[async_trait]
-impl EndpointHandler for GetClusterStatusRequest {
+impl RequestHandler for GetClusterStatusRequest {
type Response = GetClusterStatusResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatusResponse, Error> {
let layout = garage.system.cluster_layout();
let mut nodes = garage
.system
@@ -117,10 +121,14 @@ impl EndpointHandler for GetClusterStatusRequest {
}
#[async_trait]
-impl EndpointHandler for GetClusterHealthRequest {
+impl RequestHandler for GetClusterHealthRequest {
type Response = GetClusterHealthResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterHealthResponse, Error> {
use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
let health = GetClusterHealthResponse {
@@ -143,10 +151,14 @@ impl EndpointHandler for GetClusterHealthRequest {
}
#[async_trait]
-impl EndpointHandler for ConnectClusterNodesRequest {
+impl RequestHandler for ConnectClusterNodesRequest {
type Response = ConnectClusterNodesResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ConnectClusterNodesResponse, Error> {
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
.await
.into_iter()
@@ -166,10 +178,14 @@ impl EndpointHandler for ConnectClusterNodesRequest {
}
#[async_trait]
-impl EndpointHandler for GetClusterLayoutRequest {
+impl RequestHandler for GetClusterLayoutRequest {
type Response = GetClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterLayoutResponse, Error> {
Ok(format_cluster_layout(
garage.system.cluster_layout().inner(),
))
@@ -226,10 +242,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
// ---- update functions ----
#[async_trait]
-impl EndpointHandler for UpdateClusterLayoutRequest {
+impl RequestHandler for UpdateClusterLayoutRequest {
type Response = UpdateClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateClusterLayoutResponse, Error> {
let mut layout = garage.system.cluster_layout().inner().clone();
let mut roles = layout.current().roles.clone();
@@ -272,10 +292,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest {
}
#[async_trait]
-impl EndpointHandler for ApplyClusterLayoutRequest {
+impl RequestHandler for ApplyClusterLayoutRequest {
type Response = ApplyClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ApplyClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
@@ -293,10 +317,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest {
}
#[async_trait]
-impl EndpointHandler for RevertClusterLayoutRequest {
+impl RequestHandler for RevertClusterLayoutRequest {
type Response = RevertClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RevertClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let layout = layout.revert_staged_changes()?;
garage
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index 5b7de075..440a8322 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -10,13 +10,13 @@ use garage_model::key_table::*;
use crate::api::*;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
#[async_trait]
-impl EndpointHandler for ListKeysRequest {
+impl RequestHandler for ListKeysRequest {
type Response = ListKeysResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> {
+ async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
let res = garage
.key_table
.get_range(
@@ -39,10 +39,14 @@ impl EndpointHandler for ListKeysRequest {
}
#[async_trait]
-impl EndpointHandler for GetKeyInfoRequest {
+impl RequestHandler for GetKeyInfoRequest {
type Response = GetKeyInfoResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetKeyInfoResponse, Error> {
let key = match (self.id, self.search) {
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
(None, Some(search)) => {
@@ -63,10 +67,14 @@ impl EndpointHandler for GetKeyInfoRequest {
}
#[async_trait]
-impl EndpointHandler for CreateKeyRequest {
+impl RequestHandler for CreateKeyRequest {
type Response = CreateKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateKeyResponse, Error> {
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
@@ -77,10 +85,14 @@ impl EndpointHandler for CreateKeyRequest {
}
#[async_trait]
-impl EndpointHandler for ImportKeyRequest {
+impl RequestHandler for ImportKeyRequest {
type Response = ImportKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ImportKeyResponse, Error> {
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
@@ -101,10 +113,14 @@ impl EndpointHandler for ImportKeyRequest {
}
#[async_trait]
-impl EndpointHandler for UpdateKeyRequest {
+impl RequestHandler for UpdateKeyRequest {
type Response = UpdateKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateKeyResponse, Error> {
let mut key = garage.key_helper().get_existing_key(&self.id).await?;
let key_state = key.state.as_option_mut().unwrap();
@@ -132,10 +148,14 @@ impl EndpointHandler for UpdateKeyRequest {
}
#[async_trait]
-impl EndpointHandler for DeleteKeyRequest {
+impl RequestHandler for DeleteKeyRequest {
type Response = DeleteKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteKeyResponse, Error> {
let helper = garage.locked_helper().await;
let mut key = helper.key().get_existing_key(&self.id).await?;
diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs
index 31b3874d..4ad10532 100644
--- a/src/api/admin/lib.rs
+++ b/src/api/admin/lib.rs
@@ -15,12 +15,16 @@ mod cluster;
mod key;
mod special;
+mod worker;
+
use std::sync::Arc;
use async_trait::async_trait;
use garage_model::garage::Garage;
+pub use api_server::AdminApiServer as Admin;
+
pub enum Authorization {
None,
MetricsToken,
@@ -28,8 +32,12 @@ pub enum Authorization {
}
#[async_trait]
-pub trait EndpointHandler {
+pub trait RequestHandler {
type Response;
- async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<Self::Response, error::Error>;
}
diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs
index 9521616e..bf7eede9 100644
--- a/src/api/admin/macros.rs
+++ b/src/api/admin/macros.rs
@@ -71,10 +71,10 @@ macro_rules! admin_endpoints {
)*
#[async_trait]
- impl EndpointHandler for AdminApiRequest {
+ impl RequestHandler for AdminApiRequest {
type Response = AdminApiResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> {
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
Ok(match self {
$(
AdminApiRequest::$special_endpoint(_) => panic!(
@@ -82,7 +82,142 @@ macro_rules! admin_endpoints {
),
)*
$(
- AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?),
+ AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
+ )*
+ })
+ }
+ }
+ }
+ };
+}
+
+macro_rules! local_admin_endpoints {
+ [
+ $($endpoint:ident,)*
+ ] => {
+ paste! {
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiRequest {
+ $(
+ $endpoint( [<Local $endpoint Request>] ),
+ )*
+ }
+
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiResponse {
+ $(
+ $endpoint( [<Local $endpoint Response>] ),
+ )*
+ }
+
+ $(
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub struct [< $endpoint Request >] {
+ pub node: String,
+ pub body: [< Local $endpoint Request >],
+ }
+
+ pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
+
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub struct [< $endpoint Response >] {
+ pub success: HashMap<String, [< Local $endpoint Response >] >,
+ pub error: HashMap<String, String>,
+ }
+
+ impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
+ fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
+ LocalAdminApiRequest::$endpoint(req)
+ }
+ }
+
+ impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] {
+ type Error = LocalAdminApiResponse;
+ fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> {
+ match resp {
+ LocalAdminApiResponse::$endpoint(v) => Ok(v),
+ x => Err(x),
+ }
+ }
+ }
+
+ #[async_trait]
+ impl RequestHandler for [< $endpoint Request >] {
+ type Response = [< $endpoint Response >];
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
+ let to = match self.node.as_str() {
+ "*" => garage.system.cluster_layout().all_nodes().to_vec(),
+ id => {
+ let nodes = garage.system.cluster_layout().all_nodes()
+ .iter()
+ .filter(|x| hex::encode(x).starts_with(id))
+ .cloned()
+ .collect::<Vec<_>>();
+ if nodes.len() != 1 {
+ return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
+ }
+ nodes
+ }
+ };
+
+ let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
+ &to,
+ AdminRpc::Internal(self.body.into()),
+ RequestStrategy::with_priority(PRIO_NORMAL),
+ ).await?;
+
+ let mut ret = [< $endpoint Response >] {
+ success: HashMap::new(),
+ error: HashMap::new(),
+ };
+ for (node, resp) in resps {
+ match resp {
+ Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
+ match [< Local $endpoint Response >]::try_from(r) {
+ Ok(r) => {
+ ret.success.insert(hex::encode(node), r);
+ }
+ Err(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ }
+ }
+ Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
+ ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
+ }
+ Ok(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ Err(e) => {
+ ret.error.insert(hex::encode(node), e.to_string());
+ }
+ }
+ }
+
+ Ok(ret)
+ }
+ }
+ )*
+
+ impl LocalAdminApiRequest {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(
+ Self::$endpoint(_) => stringify!($endpoint),
+ )*
+ }
+ }
+ }
+
+ #[async_trait]
+ impl RequestHandler for LocalAdminApiRequest {
+ type Response = LocalAdminApiResponse;
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
+ Ok(match self {
+ $(
+ LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)*
})
}
@@ -92,3 +227,4 @@ macro_rules! admin_endpoints {
}
pub(crate) use admin_endpoints;
+pub(crate) use local_admin_endpoints;
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
index d1ccceb8..e0ce5b93 100644
--- a/src/api/admin/router_v2.rs
+++ b/src/api/admin/router_v2.rs
@@ -59,6 +59,8 @@ impl AdminApiRequest {
// Bucket aliases
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
+ // Worker APIs
+ POST GetWorkerVariable (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
@@ -240,6 +242,7 @@ impl AdminApiRequest {
generateQueryParameters! {
keywords: [],
fields: [
+ "node" => node,
"domain" => domain,
"format" => format,
"id" => id,
diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs
index 0b26fe32..4717238d 100644
--- a/src/api/admin/special.rs
+++ b/src/api/admin/special.rs
@@ -15,13 +15,17 @@ use garage_api_common::helpers::*;
use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
use crate::api_server::ResBody;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
#[async_trait]
-impl EndpointHandler for OptionsRequest {
+impl RequestHandler for OptionsRequest {
type Response = Response<ResBody>;
- async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(ALLOW, "OPTIONS,GET,POST")
@@ -33,10 +37,14 @@ impl EndpointHandler for OptionsRequest {
}
#[async_trait]
-impl EndpointHandler for CheckDomainRequest {
+impl RequestHandler for CheckDomainRequest {
type Response = Response<ResBody>;
- async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
if check_domain(garage, &self.domain).await? {
Ok(Response::builder()
.status(StatusCode::OK)
@@ -103,10 +111,14 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
}
#[async_trait]
-impl EndpointHandler for HealthRequest {
+impl RequestHandler for HealthRequest {
type Response = Response<ResBody>;
- async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
let health = garage.system.health();
let (status, status_str) = match health.status {
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
new file mode 100644
index 00000000..78508175
--- /dev/null
+++ b/src/api/admin/worker.rs
@@ -0,0 +1,50 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+#[async_trait]
+impl RequestHandler for LocalGetWorkerVariableRequest {
+ type Response = LocalGetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetWorkerVariableResponse, Error> {
+ let mut res = HashMap::new();
+ if let Some(k) = self.variable {
+ res.insert(k.clone(), garage.bg_vars.get(&k)?);
+ } else {
+ let vars = garage.bg_vars.get_all();
+ for (k, v) in vars.iter() {
+ res.insert(k.to_string(), v.to_string());
+ }
+ }
+ Ok(LocalGetWorkerVariableResponse(res))
+ }
+}
+
+#[async_trait]
+impl RequestHandler for LocalSetWorkerVariableRequest {
+ type Response = LocalSetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalSetWorkerVariableResponse, Error> {
+ garage.bg_vars.set(&self.variable, &self.value)?;
+
+ Ok(LocalSetWorkerVariableResponse {
+ variable: self.variable,
+ value: self.value,
+ })
+ }
+}