aboutsummaryrefslogtreecommitdiff
path: root/src/api/admin
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/admin')
-rw-r--r--src/api/admin/api_server.rs199
-rw-r--r--src/api/admin/bucket.rs549
-rw-r--r--src/api/admin/cluster.rs198
-rw-r--r--src/api/admin/error.rs96
-rw-r--r--src/api/admin/key.rs264
-rw-r--r--src/api/admin/mod.rs7
-rw-r--r--src/api/admin/router.rs149
7 files changed, 1462 insertions, 0 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
new file mode 100644
index 00000000..57e3e5cf
--- /dev/null
+++ b/src/api/admin/api_server.rs
@@ -0,0 +1,199 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use http::header::{
+ ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, CONTENT_TYPE,
+};
+use hyper::{Body, Request, Response};
+
+use opentelemetry::trace::{SpanRef, Tracer};
+use opentelemetry_prometheus::PrometheusExporter;
+use prometheus::{Encoder, TextEncoder};
+
+use garage_model::garage::Garage;
+use garage_util::error::Error as GarageError;
+
+use crate::generic_server::*;
+
+use crate::admin::bucket::*;
+use crate::admin::cluster::*;
+use crate::admin::error::*;
+use crate::admin::key::*;
+use crate::admin::router::{Authorization, Endpoint};
+
+pub struct AdminApiServer {
+ garage: Arc<Garage>,
+ exporter: PrometheusExporter,
+ metrics_token: Option<String>,
+ admin_token: Option<String>,
+}
+
+impl AdminApiServer {
+ pub fn new(garage: Arc<Garage>) -> Self {
+ let exporter = opentelemetry_prometheus::exporter().init();
+ let cfg = &garage.config.admin;
+ let metrics_token = cfg
+ .metrics_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ let admin_token = cfg
+ .admin_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ Self {
+ garage,
+ exporter,
+ metrics_token,
+ admin_token,
+ }
+ }
+
+ pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> {
+ if let Some(bind_addr) = self.garage.config.admin.api_bind_addr {
+ let region = self.garage.config.s3_api.s3_region.clone();
+ ApiServer::new(region, self)
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ } else {
+ Ok(())
+ }
+ }
+
+ fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
+ Ok(Response::builder()
+ .status(204)
+ .header(ALLOW, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .body(Body::empty())?)
+ }
+
+ fn handle_metrics(&self) -> Result<Response<Body>, Error> {
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ self.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(200)
+ .header(CONTENT_TYPE, encoder.format_type())
+ .body(Body::from(buffer))?)
+ }
+}
+
+#[async_trait]
+impl ApiHandler for AdminApiServer {
+ const API_NAME: &'static str = "admin";
+ const API_NAME_DISPLAY: &'static str = "Admin";
+
+ type Endpoint = Endpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
+ Endpoint::from_request(req)
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ let expected_auth_header =
+ match endpoint.authorization_type() {
+ Authorization::MetricsToken => self.metrics_token.as_ref(),
+ Authorization::AdminToken => match &self.admin_token {
+ None => return Err(Error::forbidden(
+ "Admin token isn't configured, admin API access is disabled for security.",
+ )),
+ Some(t) => Some(t),
+ },
+ };
+
+ if let Some(h) = expected_auth_header {
+ match req.headers().get("Authorization") {
+ None => return Err(Error::forbidden("Authorization token must be provided")),
+ Some(v) => {
+ let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false);
+ if !authorized {
+ return Err(Error::forbidden("Invalid authorization token provided"));
+ }
+ }
+ }
+ }
+
+ match endpoint {
+ Endpoint::Options => self.handle_options(&req),
+ Endpoint::Metrics => self.handle_metrics(),
+ Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
+ Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
+ // Layout
+ Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
+ Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
+ Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ // Keys
+ Endpoint::ListKeys => handle_list_keys(&self.garage).await,
+ Endpoint::GetKeyInfo { id, search } => {
+ handle_get_key_info(&self.garage, id, search).await
+ }
+ Endpoint::CreateKey => handle_create_key(&self.garage, req).await,
+ Endpoint::ImportKey => handle_import_key(&self.garage, req).await,
+ Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await,
+ Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await,
+ // Buckets
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage).await,
+ Endpoint::GetBucketInfo { id, global_alias } => {
+ handle_get_bucket_info(&self.garage, id, global_alias).await
+ }
+ Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
+ Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
+ Endpoint::PutBucketWebsite { id } => {
+ handle_put_bucket_website(&self.garage, id, req).await
+ }
+ Endpoint::DeleteBucketWebsite { id } => {
+ handle_delete_bucket_website(&self.garage, id).await
+ }
+ // Bucket-key permissions
+ Endpoint::BucketAllowKey => {
+ handle_bucket_change_key_perm(&self.garage, req, true).await
+ }
+ Endpoint::BucketDenyKey => {
+ handle_bucket_change_key_perm(&self.garage, req, false).await
+ }
+ // Bucket aliasing
+ Endpoint::GlobalAliasBucket { id, alias } => {
+ handle_global_alias_bucket(&self.garage, id, alias).await
+ }
+ Endpoint::GlobalUnaliasBucket { id, alias } => {
+ handle_global_unalias_bucket(&self.garage, id, alias).await
+ }
+ Endpoint::LocalAliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await,
+ Endpoint::LocalUnaliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await,
+ }
+ }
+}
+
+impl ApiEndpoint for Endpoint {
+ fn name(&self) -> &'static str {
+ Endpoint::name(self)
+ }
+
+ fn add_span_attributes(&self, _span: SpanRef<'_>) {}
+}
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
new file mode 100644
index 00000000..849d28ac
--- /dev/null
+++ b/src/api/admin/bucket.rs
@@ -0,0 +1,549 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::*;
+
+use garage_table::*;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_model::permission::*;
+
+use crate::admin::error::*;
+use crate::admin::key::ApiBucketKeyPerm;
+use crate::common_error::CommonError;
+use crate::helpers::parse_json_body;
+
+pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let buckets = garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ let res = buckets
+ .into_iter()
+ .map(|b| {
+ let state = b.state.as_option().unwrap();
+ ListBucketResultItem {
+ id: hex::encode(b.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|((k, n), _, _)| BucketLocalAlias {
+ access_key_id: k.to_string(),
+ alias: n.to_string(),
+ })
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct ListBucketResultItem {
+ id: String,
+ global_aliases: Vec<String>,
+ local_aliases: Vec<BucketLocalAlias>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct BucketLocalAlias {
+ access_key_id: String,
+ alias: String,
+}
+
+pub async fn handle_get_bucket_info(
+ garage: &Arc<Garage>,
+ id: Option<String>,
+ global_alias: Option<String>,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = match (id, global_alias) {
+ (Some(id), None) => parse_bucket_id(&id)?,
+ (None, Some(ga)) => garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&ga)
+ .await?
+ .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
+ _ => {
+ return Err(Error::bad_request(
+ "Either id or globalAlias must be provided (but not both)",
+ ));
+ }
+ };
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+async fn bucket_info_results(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+) -> Result<Response<Body>, Error> {
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ if !key.state.is_deleted() {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = garage.key_table.get(&EmptyKey, k).await? {
+ if !key.state.is_deleted() {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ }
+
+ let state = bucket.state.as_option().unwrap();
+
+ let res =
+ GetBucketInfoResult {
+ id: hex::encode(&bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ website_access: state.website_config.get().is_some(),
+ website_config: state.website_config.get().clone().map(|wsc| {
+ GetBucketInfoWebsiteResult {
+ index_document: wsc.index_document,
+ error_document: wsc.error_document,
+ }
+ }),
+ keys: relevant_keys
+ .into_iter()
+ .map(|(_, key)| {
+ let p = key.state.as_option().unwrap();
+ GetBucketInfoKey {
+ access_key_id: key.key_id,
+ name: p.name.get().to_string(),
+ permissions: p
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ bucket_local_aliases: p
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, b)| *b == Some(bucket.id))
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>(),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoResult {
+ id: String,
+ global_aliases: Vec<String>,
+ website_access: bool,
+ #[serde(default)]
+ website_config: Option<GetBucketInfoWebsiteResult>,
+ keys: Vec<GetBucketInfoKey>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoWebsiteResult {
+ index_document: String,
+ error_document: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoKey {
+ access_key_id: String,
+ name: String,
+ permissions: ApiBucketKeyPerm,
+ bucket_local_aliases: Vec<String>,
+}
+
+pub async fn handle_create_bucket(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<CreateBucketRequest>(req).await?;
+
+ if let Some(ga) = &req.global_alias {
+ if !is_valid_bucket_name(ga) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ ga, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
+ if alias.state.get().is_some() {
+ return Err(CommonError::BucketAlreadyExists.into());
+ }
+ }
+ }
+
+ if let Some(la) = &req.local_alias {
+ if !is_valid_bucket_name(&la.alias) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ la.alias, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ let key = garage
+ .key_helper()
+ .get_existing_key(&la.access_key_id)
+ .await?;
+ let state = key.state.as_option().unwrap();
+ if matches!(state.local_aliases.get(&la.alias), Some(_)) {
+ return Err(Error::bad_request("Local alias already exists"));
+ }
+ }
+
+ let bucket = Bucket::new();
+ garage.bucket_table.insert(&bucket).await?;
+
+ if let Some(ga) = &req.global_alias {
+ garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, ga)
+ .await?;
+ }
+
+ if let Some(la) = &req.local_alias {
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
+ .await?;
+
+ if la.allow.read || la.allow.write || la.allow.owner {
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(
+ bucket.id,
+ &la.access_key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read: la.allow.read,
+ allow_write: la.allow.write,
+ allow_owner: la.allow.owner,
+ },
+ )
+ .await?;
+ }
+ }
+
+ bucket_info_results(garage, bucket.id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateBucketRequest {
+ global_alias: Option<String>,
+ local_alias: Option<CreateBucketLocalAlias>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateBucketLocalAlias {
+ access_key_id: String,
+ alias: String,
+ #[serde(default)]
+ allow: ApiBucketKeyPerm,
+}
+
+pub async fn handle_delete_bucket(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<Body>, Error> {
+ let helper = garage.bucket_helper();
+
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let state = bucket.state.as_option().unwrap();
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(CommonError::BucketNotEmpty.into());
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, perm) in bucket.authorized_keys() {
+ if perm.is_any() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+ }
+ // 2. delete all local aliases
+ for ((key_id, alias), _, active) in state.local_aliases.items().iter() {
+ if *active {
+ helper
+ .unset_local_bucket_alias(bucket.id, key_id, alias)
+ .await?;
+ }
+ }
+ // 3. delete all global aliases
+ for (alias, _, active) in state.aliases.items().iter() {
+ if *active {
+ helper.purge_global_bucket_alias(bucket.id, alias).await?;
+ }
+ }
+
+ // 4. delete bucket
+ bucket.state = Deletable::delete();
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+// ---- BUCKET WEBSITE CONFIGURATION ----
+
+pub async fn handle_put_bucket_website(
+ garage: &Arc<Garage>,
+ id: String,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<PutBucketWebsiteRequest>(req).await?;
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let state = bucket.state.as_option_mut().unwrap();
+ state.website_config.update(Some(WebsiteConfig {
+ index_document: req.index_document,
+ error_document: req.error_document,
+ }));
+
+ garage.bucket_table.insert(&bucket).await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct PutBucketWebsiteRequest {
+ index_document: String,
+ #[serde(default)]
+ error_document: Option<String>,
+}
+
+pub async fn handle_delete_bucket_website(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let state = bucket.state.as_option_mut().unwrap();
+ state.website_config.update(None);
+
+ garage.bucket_table.insert(&bucket).await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+// ---- BUCKET/KEY PERMISSIONS ----
+
+pub async fn handle_bucket_change_key_perm(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+ new_perm_flag: bool,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?;
+
+ let bucket_id = parse_bucket_id(&req.bucket_id)?;
+
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let state = bucket.state.as_option().unwrap();
+
+ let key = garage
+ .key_helper()
+ .get_existing_key(&req.access_key_id)
+ .await?;
+
+ let mut perm = state
+ .authorized_keys
+ .get(&key.key_id)
+ .cloned()
+ .unwrap_or(BucketKeyPerm::NO_PERMISSIONS);
+
+ if req.permissions.read {
+ perm.allow_read = new_perm_flag;
+ }
+ if req.permissions.write {
+ perm.allow_write = new_perm_flag;
+ }
+ if req.permissions.owner {
+ perm.allow_owner = new_perm_flag;
+ }
+
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, &key.key_id, perm)
+ .await?;
+
+ bucket_info_results(garage, bucket.id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct BucketKeyPermChangeRequest {
+ bucket_id: String,
+ access_key_id: String,
+ permissions: ApiBucketKeyPerm,
+}
+
+// ---- BUCKET ALIASES ----
+
+pub async fn handle_global_alias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_global_unalias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_local_alias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ access_key_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket_id, &access_key_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_local_unalias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ access_key_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &access_key_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+// ---- HELPER ----
+
+fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
+ let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?;
+ Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
+}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
new file mode 100644
index 00000000..3401be42
--- /dev/null
+++ b/src/api/admin/cluster.rs
@@ -0,0 +1,198 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_rpc::layout::*;
+
+use garage_model::garage::Garage;
+
+use crate::admin::error::*;
+use crate::helpers::parse_json_body;
+
+pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = GetClusterStatusResponse {
+ node: hex::encode(garage.system.id),
+ garage_version: garage.system.garage_version(),
+ known_nodes: garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|i| {
+ (
+ hex::encode(i.id),
+ KnownNodeResp {
+ addr: i.addr,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ hostname: i.status.hostname,
+ },
+ )
+ })
+ .collect(),
+ layout: get_cluster_layout(garage),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+pub async fn handle_connect_cluster_nodes(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<Vec<String>>(req).await?;
+
+ let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
+ .await
+ .into_iter()
+ .map(|r| match r {
+ Ok(()) => ConnectClusterNodesResponse {
+ success: true,
+ error: None,
+ },
+ Err(e) => ConnectClusterNodesResponse {
+ success: false,
+ error: Some(format!("{}", e)),
+ },
+ })
+ .collect::<Vec<_>>();
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = get_cluster_layout(garage);
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
+ let layout = garage.system.get_cluster_layout();
+
+ GetClusterLayoutResponse {
+ version: layout.version,
+ roles: layout
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| v.0.is_some())
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ staged_role_changes: layout
+ .staging
+ .items()
+ .iter()
+ .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ }
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetClusterStatusResponse {
+ node: String,
+ garage_version: &'static str,
+ known_nodes: HashMap<String, KnownNodeResp>,
+ layout: GetClusterLayoutResponse,
+}
+
+#[derive(Serialize)]
+struct ConnectClusterNodesResponse {
+ success: bool,
+ error: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetClusterLayoutResponse {
+ version: u64,
+ roles: HashMap<String, Option<NodeRole>>,
+ staged_role_changes: HashMap<String, Option<NodeRole>>,
+}
+
+#[derive(Serialize)]
+struct KnownNodeResp {
+ addr: SocketAddr,
+ is_up: bool,
+ last_seen_secs_ago: Option<u64>,
+ hostname: String,
+}
+
+pub async fn handle_update_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+
+ let mut layout = garage.system.get_cluster_layout();
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for (node, role) in updates {
+ let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
+ let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(node, NodeRoleV(role)));
+ }
+
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_apply_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.apply_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.revert_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
+
+#[derive(Deserialize)]
+struct ApplyRevertLayoutRequest {
+ version: u64,
+}
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
new file mode 100644
index 00000000..c4613cb3
--- /dev/null
+++ b/src/api/admin/error.rs
@@ -0,0 +1,96 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+pub use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// The API access key does not exist
+ #[error(display = "Access key not found: {}", _0)]
+ NoSuchAccessKey(String),
+
+ /// In Import key, the key already exists
+ #[error(
+ display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
+ _0
+ )]
+ KeyAlreadyExists(String),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
+ }
+ }
+}
+
+impl Error {
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
+ }
+ }
+
+ fn add_http_headers(&self, _header_map: &mut HeaderMap<HeaderValue>) {
+ // nothing
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
new file mode 100644
index 00000000..f30b5dbb
--- /dev/null
+++ b/src/api/admin/key.rs
@@ -0,0 +1,264 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::error::Error as GarageError;
+
+use garage_table::*;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::*;
+
+use crate::admin::error::*;
+use crate::helpers::parse_json_body;
+
+pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| ListKeyResultItem {
+ id: k.key_id.to_string(),
+ name: k.params().unwrap().name.get().clone(),
+ })
+ .collect::<Vec<_>>();
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+struct ListKeyResultItem {
+ id: String,
+ name: String,
+}
+
+pub async fn handle_get_key_info(
+ garage: &Arc<Garage>,
+ id: Option<String>,
+ search: Option<String>,
+) -> Result<Response<Body>, Error> {
+ let key = if let Some(id) = id {
+ garage.key_helper().get_existing_key(&id).await?
+ } else if let Some(search) = search {
+ garage
+ .key_helper()
+ .get_existing_matching_key(&search)
+ .await?
+ } else {
+ unreachable!();
+ };
+
+ key_info_results(garage, key).await
+}
+
+pub async fn handle_create_key(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<CreateKeyRequest>(req).await?;
+
+ let key = Key::new(&req.name);
+ garage.key_table.insert(&key).await?;
+
+ key_info_results(garage, key).await
+}
+
+#[derive(Deserialize)]
+struct CreateKeyRequest {
+ name: String,
+}
+
+pub async fn handle_import_key(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<ImportKeyRequest>(req).await?;
+
+ let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::KeyAlreadyExists(req.access_key_id.to_string()));
+ }
+
+ let imported_key = Key::import(&req.access_key_id, &req.secret_access_key, &req.name);
+ garage.key_table.insert(&imported_key).await?;
+
+ key_info_results(garage, imported_key).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ImportKeyRequest {
+ access_key_id: String,
+ secret_access_key: String,
+ name: String,
+}
+
+pub async fn handle_update_key(
+ garage: &Arc<Garage>,
+ id: String,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<UpdateKeyRequest>(req).await?;
+
+ let mut key = garage.key_helper().get_existing_key(&id).await?;
+
+ let key_state = key.state.as_option_mut().unwrap();
+
+ if let Some(new_name) = req.name {
+ key_state.name.update(new_name);
+ }
+ if let Some(allow) = req.allow {
+ if allow.create_bucket {
+ key_state.allow_create_bucket.update(true);
+ }
+ }
+ if let Some(deny) = req.deny {
+ if deny.create_bucket {
+ key_state.allow_create_bucket.update(false);
+ }
+ }
+
+ garage.key_table.insert(&key).await?;
+
+ key_info_results(garage, key).await
+}
+
+#[derive(Deserialize)]
+struct UpdateKeyRequest {
+ name: Option<String>,
+ allow: Option<KeyPerm>,
+ deny: Option<KeyPerm>,
+}
+
+pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> {
+ let mut key = garage.key_helper().get_existing_key(&id).await?;
+
+ key.state.as_option().unwrap();
+
+ garage.key_helper().delete_key(&mut key).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Body>, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ let key_state = key.state.as_option().unwrap();
+
+ for id in key_state
+ .authorized_buckets
+ .items()
+ .iter()
+ .map(|(id, _)| id)
+ .chain(
+ key_state
+ .local_aliases
+ .items()
+ .iter()
+ .filter_map(|(_, _, v)| v.as_ref()),
+ ) {
+ if !relevant_buckets.contains_key(id) {
+ if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? {
+ if b.state.as_option().is_some() {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+ }
+ }
+
+ let res = GetKeyInfoResult {
+ name: key_state.name.get().clone(),
+ access_key_id: key.key_id.clone(),
+ secret_access_key: key_state.secret_key.clone(),
+ permissions: KeyPerm {
+ create_bucket: *key_state.allow_create_bucket.get(),
+ },
+ buckets: relevant_buckets
+ .into_iter()
+ .map(|(_, bucket)| {
+ let state = bucket.state.as_option().unwrap();
+ KeyInfoBucketResult {
+ id: hex::encode(bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|((k, _), _, a)| *a && *k == key.key_id)
+ .map(|((_, n), _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ permissions: key_state
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ }
+ })
+ .collect::<Vec<_>>(),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetKeyInfoResult {
+ name: String,
+ access_key_id: String,
+ secret_access_key: String,
+ permissions: KeyPerm,
+ buckets: Vec<KeyInfoBucketResult>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct KeyPerm {
+ #[serde(default)]
+ create_bucket: bool,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct KeyInfoBucketResult {
+ id: String,
+ global_aliases: Vec<String>,
+ local_aliases: Vec<String>,
+ permissions: ApiBucketKeyPerm,
+}
+
+#[derive(Serialize, Deserialize, Default)]
+pub(crate) struct ApiBucketKeyPerm {
+ #[serde(default)]
+ pub(crate) read: bool,
+ #[serde(default)]
+ pub(crate) write: bool,
+ #[serde(default)]
+ pub(crate) owner: bool,
+}
diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs
new file mode 100644
index 00000000..c4857c10
--- /dev/null
+++ b/src/api/admin/mod.rs
@@ -0,0 +1,7 @@
+pub mod api_server;
+mod error;
+mod router;
+
+mod bucket;
+mod cluster;
+mod key;
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
new file mode 100644
index 00000000..93639873
--- /dev/null
+++ b/src/api/admin/router.rs
@@ -0,0 +1,149 @@
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::admin::error::*;
+use crate::router_macros::*;
+
+pub enum Authorization {
+ MetricsToken,
+ AdminToken,
+}
+
+router_match! {@func
+
+/// List of all Admin API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ Options,
+ Metrics,
+ GetClusterStatus,
+ ConnectClusterNodes,
+ // Layout
+ GetClusterLayout,
+ UpdateClusterLayout,
+ ApplyClusterLayout,
+ RevertClusterLayout,
+ // Keys
+ ListKeys,
+ CreateKey,
+ ImportKey,
+ GetKeyInfo {
+ id: Option<String>,
+ search: Option<String>,
+ },
+ DeleteKey {
+ id: String,
+ },
+ UpdateKey {
+ id: String,
+ },
+ // Buckets
+ ListBuckets,
+ CreateBucket,
+ GetBucketInfo {
+ id: Option<String>,
+ global_alias: Option<String>,
+ },
+ DeleteBucket {
+ id: String,
+ },
+ PutBucketWebsite {
+ id: String,
+ },
+ DeleteBucketWebsite {
+ id: String,
+ },
+ // Bucket-Key Permissions
+ BucketAllowKey,
+ BucketDenyKey,
+ // Bucket aliases
+ GlobalAliasBucket {
+ id: String,
+ alias: String,
+ },
+ GlobalUnaliasBucket {
+ id: String,
+ alias: String,
+ },
+ LocalAliasBucket {
+ id: String,
+ access_key_id: String,
+ alias: String,
+ },
+ LocalUnaliasBucket {
+ id: String,
+ access_key_id: String,
+ alias: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
+ let uri = req.uri();
+ let path = uri.path();
+ let query = uri.query();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let res = router_match!(@gen_path_parser (req.method(), path, query) [
+ OPTIONS _ => Options,
+ GET "/metrics" => Metrics,
+ GET "/v0/status" => GetClusterStatus,
+ POST "/v0/connect" => ConnectClusterNodes,
+ // Layout endpoints
+ GET "/v0/layout" => GetClusterLayout,
+ POST "/v0/layout" => UpdateClusterLayout,
+ POST "/v0/layout/apply" => ApplyClusterLayout,
+ POST "/v0/layout/revert" => RevertClusterLayout,
+ // API key endpoints
+ GET "/v0/key" if id => GetKeyInfo (query_opt::id, query_opt::search),
+ GET "/v0/key" if search => GetKeyInfo (query_opt::id, query_opt::search),
+ POST "/v0/key" if id => UpdateKey (query::id),
+ POST "/v0/key" => CreateKey,
+ POST "/v0/key/import" => ImportKey,
+ DELETE "/v0/key" if id => DeleteKey (query::id),
+ GET "/v0/key" => ListKeys,
+ // Bucket endpoints
+ GET "/v0/bucket" if id => GetBucketInfo (query_opt::id, query_opt::global_alias),
+ GET "/v0/bucket" if global_alias => GetBucketInfo (query_opt::id, query_opt::global_alias),
+ GET "/v0/bucket" => ListBuckets,
+ POST "/v0/bucket" => CreateBucket,
+ DELETE "/v0/bucket" if id => DeleteBucket (query::id),
+ PUT "/v0/bucket/website" if id => PutBucketWebsite (query::id),
+ DELETE "/v0/bucket/website" if id => DeleteBucketWebsite (query::id),
+ // Bucket-key permissions
+ POST "/v0/bucket/allow" => BucketAllowKey,
+ POST "/v0/bucket/deny" => BucketDenyKey,
+ // Bucket aliases
+ PUT "/v0/bucket/alias/global" => GlobalAliasBucket (query::id, query::alias),
+ DELETE "/v0/bucket/alias/global" => GlobalUnaliasBucket (query::id, query::alias),
+ PUT "/v0/bucket/alias/local" => LocalAliasBucket (query::id, query::access_key_id, query::alias),
+ DELETE "/v0/bucket/alias/local" => LocalUnaliasBucket (query::id, query::access_key_id, query::alias),
+ ]);
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+
+ Ok(res)
+ }
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ match self {
+ Self::Metrics => Authorization::MetricsToken,
+ _ => Authorization::AdminToken,
+ }
+ }
+}
+
+generateQueryParameters! {
+ "id" => id,
+ "search" => search,
+ "globalAlias" => global_alias,
+ "alias" => alias,
+ "accessKeyId" => access_key_id
+}