aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/admin/Cargo.toml29
-rw-r--r--src/admin/lib.rs6
-rw-r--r--src/admin/metrics.rs146
-rw-r--r--src/api/Cargo.toml3
-rw-r--r--src/api/admin/api_server.rs189
-rw-r--r--src/api/admin/bucket.rs542
-rw-r--r--src/api/admin/cluster.rs198
-rw-r--r--src/api/admin/error.rs96
-rw-r--r--src/api/admin/key.rs264
-rw-r--r--src/api/admin/mod.rs7
-rw-r--r--src/api/admin/router.rs149
-rw-r--r--src/api/common_error.rs177
-rw-r--r--src/api/generic_server.rs21
-rw-r--r--src/api/helpers.rs53
-rw-r--r--src/api/k2v/api_server.rs33
-rw-r--r--src/api/k2v/batch.rs19
-rw-r--r--src/api/k2v/error.rs134
-rw-r--r--src/api/k2v/index.rs2
-rw-r--r--src/api/k2v/item.rs2
-rw-r--r--src/api/k2v/mod.rs1
-rw-r--r--src/api/k2v/range.rs4
-rw-r--r--src/api/k2v/router.rs6
-rw-r--r--src/api/lib.rs6
-rw-r--r--src/api/router_macros.rs33
-rw-r--r--src/api/s3/api_server.rs28
-rw-r--r--src/api/s3/bucket.rs24
-rw-r--r--src/api/s3/copy.rs25
-rw-r--r--src/api/s3/cors.rs34
-rw-r--r--src/api/s3/delete.rs2
-rw-r--r--src/api/s3/error.rs (renamed from src/api/error.rs)243
-rw-r--r--src/api/s3/get.rs10
-rw-r--r--src/api/s3/list.rs14
-rw-r--r--src/api/s3/mod.rs1
-rw-r--r--src/api/s3/post_object.rs62
-rw-r--r--src/api/s3/put.rs18
-rw-r--r--src/api/s3/router.rs5
-rw-r--r--src/api/s3/website.rs49
-rw-r--r--src/api/s3/xml.rs2
-rw-r--r--src/api/signature/error.rs36
-rw-r--r--src/api/signature/mod.rs7
-rw-r--r--src/api/signature/payload.rs20
-rw-r--r--src/api/signature/streaming.rs8
-rw-r--r--src/garage/Cargo.toml7
-rw-r--r--src/garage/admin.rs77
-rw-r--r--src/garage/cli/layout.rs47
-rw-r--r--src/garage/main.rs2
-rw-r--r--src/garage/server.rs36
-rw-r--r--src/garage/tracing_setup.rs (renamed from src/admin/tracing_setup.rs)0
-rw-r--r--src/model/garage.rs4
-rw-r--r--src/model/helper/bucket.rs152
-rw-r--r--src/model/helper/error.rs10
-rw-r--r--src/model/helper/key.rs102
-rw-r--r--src/model/helper/mod.rs1
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/layout.rs56
-rw-r--r--src/rpc/system.rs132
-rw-r--r--src/util/config.rs4
-rw-r--r--src/util/crdt/lww_map.rs5
-rw-r--r--src/web/error.rs36
-rw-r--r--src/web/web_server.rs10
60 files changed, 2534 insertions, 857 deletions
diff --git a/src/admin/Cargo.toml b/src/admin/Cargo.toml
deleted file mode 100644
index 2db4bb08..00000000
--- a/src/admin/Cargo.toml
+++ /dev/null
@@ -1,29 +0,0 @@
-[package]
-name = "garage_admin"
-version = "0.7.0"
-authors = ["Maximilien Richer <code@mricher.fr>"]
-edition = "2018"
-license = "AGPL-3.0"
-description = "Administration and metrics REST HTTP server for Garage"
-repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
-
-[lib]
-path = "lib.rs"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-garage_util = { version = "0.7.0", path = "../util" }
-
-hex = "0.4"
-
-futures = "0.3"
-futures-util = "0.3"
-http = "0.2"
-hyper = "0.14"
-tracing = "0.1.30"
-
-opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
-opentelemetry-prometheus = "0.10"
-opentelemetry-otlp = "0.10"
-prometheus = "0.13"
diff --git a/src/admin/lib.rs b/src/admin/lib.rs
deleted file mode 100644
index b5b0775b..00000000
--- a/src/admin/lib.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-//! Crate for handling the admin and metric HTTP APIs
-#[macro_use]
-extern crate tracing;
-
-pub mod metrics;
-pub mod tracing_setup;
diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs
deleted file mode 100644
index 7edc36c6..00000000
--- a/src/admin/metrics.rs
+++ /dev/null
@@ -1,146 +0,0 @@
-use std::convert::Infallible;
-use std::net::SocketAddr;
-use std::sync::Arc;
-use std::time::SystemTime;
-
-use futures::future::*;
-use hyper::{
- header::CONTENT_TYPE,
- service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server,
-};
-
-use opentelemetry::{
- global,
- metrics::{BoundCounter, BoundValueRecorder},
- trace::{FutureExt, TraceContextExt, Tracer},
- Context,
-};
-use opentelemetry_prometheus::PrometheusExporter;
-
-use prometheus::{Encoder, TextEncoder};
-
-use garage_util::error::Error as GarageError;
-use garage_util::metrics::*;
-
-// serve_req on metric endpoint
-async fn serve_req(
- req: Request<Body>,
- admin_server: Arc<AdminServer>,
-) -> Result<Response<Body>, hyper::Error> {
- debug!("Receiving request at path {}", req.uri());
- let request_start = SystemTime::now();
-
- admin_server.metrics.http_counter.add(1);
-
- let response = match (req.method(), req.uri().path()) {
- (&Method::GET, "/metrics") => {
- let mut buffer = vec![];
- let encoder = TextEncoder::new();
-
- let tracer = opentelemetry::global::tracer("garage");
- let metric_families = tracer.in_span("admin/gather_metrics", |_| {
- admin_server.exporter.registry().gather()
- });
-
- encoder.encode(&metric_families, &mut buffer).unwrap();
- admin_server
- .metrics
- .http_body_gauge
- .record(buffer.len() as u64);
-
- Response::builder()
- .status(200)
- .header(CONTENT_TYPE, encoder.format_type())
- .body(Body::from(buffer))
- .unwrap()
- }
- _ => Response::builder()
- .status(404)
- .body(Body::from("Not implemented"))
- .unwrap(),
- };
-
- admin_server
- .metrics
- .http_req_histogram
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
- Ok(response)
-}
-
-// AdminServer hold the admin server internal admin_server and the metric exporter
-pub struct AdminServer {
- exporter: PrometheusExporter,
- metrics: AdminServerMetrics,
-}
-
-// GarageMetricadmin_server holds the metrics counter definition for Garage
-// FIXME: we would rather have that split up among the different libraries?
-struct AdminServerMetrics {
- http_counter: BoundCounter<u64>,
- http_body_gauge: BoundValueRecorder<u64>,
- http_req_histogram: BoundValueRecorder<f64>,
-}
-
-impl AdminServer {
- /// init initilialize the AdminServer and background metric server
- pub fn init() -> AdminServer {
- let exporter = opentelemetry_prometheus::exporter().init();
- let meter = global::meter("garage/admin_server");
- AdminServer {
- exporter,
- metrics: AdminServerMetrics {
- http_counter: meter
- .u64_counter("admin.http_requests_total")
- .with_description("Total number of HTTP requests made.")
- .init()
- .bind(&[]),
- http_body_gauge: meter
- .u64_value_recorder("admin.http_response_size_bytes")
- .with_description("The metrics HTTP response sizes in bytes.")
- .init()
- .bind(&[]),
- http_req_histogram: meter
- .f64_value_recorder("admin.http_request_duration_seconds")
- .with_description("The HTTP request latencies in seconds.")
- .init()
- .bind(&[]),
- },
- }
- }
- /// run execute the admin server on the designated HTTP port and listen for requests
- pub async fn run(
- self,
- bind_addr: SocketAddr,
- shutdown_signal: impl Future<Output = ()>,
- ) -> Result<(), GarageError> {
- let admin_server = Arc::new(self);
- // For every connection, we must make a `Service` to handle all
- // incoming HTTP requests on said connection.
- let make_svc = make_service_fn(move |_conn| {
- let admin_server = admin_server.clone();
- // This is the `Service` that will handle the connection.
- // `service_fn` is a helper to convert a function that
- // returns a Response into a `Service`.
- async move {
- Ok::<_, Infallible>(service_fn(move |req| {
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer
- .span_builder("admin/request")
- .with_trace_id(gen_trace_id())
- .start(&tracer);
-
- serve_req(req, admin_server.clone())
- .with_context(Context::current_with_span(span))
- }))
- }
- });
-
- let server = Server::bind(&bind_addr).serve(make_svc);
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("Admin server listening on http://{}", bind_addr);
-
- graceful.await?;
- Ok(())
- }
-}
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 29b26e5e..db77cf38 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -54,6 +54,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"
opentelemetry = "0.17"
+opentelemetry-prometheus = "0.10"
+opentelemetry-otlp = "0.10"
+prometheus = "0.13"
[features]
k2v = [ "garage_util/k2v", "garage_model/k2v" ]
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
new file mode 100644
index 00000000..a0af9bd9
--- /dev/null
+++ b/src/api/admin/api_server.rs
@@ -0,0 +1,189 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use http::header::{
+ ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, CONTENT_TYPE,
+};
+use hyper::{Body, Request, Response};
+
+use opentelemetry::trace::{SpanRef, Tracer};
+use opentelemetry_prometheus::PrometheusExporter;
+use prometheus::{Encoder, TextEncoder};
+
+use garage_model::garage::Garage;
+use garage_util::error::Error as GarageError;
+
+use crate::generic_server::*;
+
+use crate::admin::bucket::*;
+use crate::admin::cluster::*;
+use crate::admin::error::*;
+use crate::admin::key::*;
+use crate::admin::router::{Authorization, Endpoint};
+
+pub struct AdminApiServer {
+ garage: Arc<Garage>,
+ exporter: PrometheusExporter,
+ metrics_token: Option<String>,
+ admin_token: Option<String>,
+}
+
+impl AdminApiServer {
+ pub fn new(garage: Arc<Garage>) -> Self {
+ let exporter = opentelemetry_prometheus::exporter().init();
+ let cfg = &garage.config.admin;
+ let metrics_token = cfg
+ .metrics_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ let admin_token = cfg
+ .admin_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ Self {
+ garage,
+ exporter,
+ metrics_token,
+ admin_token,
+ }
+ }
+
+ pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> {
+ if let Some(bind_addr) = self.garage.config.admin.api_bind_addr {
+ let region = self.garage.config.s3_api.s3_region.clone();
+ ApiServer::new(region, self)
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ } else {
+ Ok(())
+ }
+ }
+
+ fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
+ Ok(Response::builder()
+ .status(204)
+ .header(ALLOW, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .body(Body::empty())?)
+ }
+
+ fn handle_metrics(&self) -> Result<Response<Body>, Error> {
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ self.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(200)
+ .header(CONTENT_TYPE, encoder.format_type())
+ .body(Body::from(buffer))?)
+ }
+}
+
+#[async_trait]
+impl ApiHandler for AdminApiServer {
+ const API_NAME: &'static str = "admin";
+ const API_NAME_DISPLAY: &'static str = "Admin";
+
+ type Endpoint = Endpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
+ Endpoint::from_request(req)
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ let expected_auth_header = match endpoint.authorization_type() {
+ Authorization::MetricsToken => self.metrics_token.as_ref(),
+ Authorization::AdminToken => self.admin_token.as_ref(),
+ };
+
+ if let Some(h) = expected_auth_header {
+ match req.headers().get("Authorization") {
+ None => Err(Error::forbidden("Authorization token must be provided")),
+ Some(v) if v.to_str().map(|hv| hv == h).unwrap_or(false) => Ok(()),
+ _ => Err(Error::forbidden("Invalid authorization token provided")),
+ }?;
+ }
+
+ match endpoint {
+ Endpoint::Options => self.handle_options(&req),
+ Endpoint::Metrics => self.handle_metrics(),
+ Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
+ Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
+ // Layout
+ Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
+ Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
+ Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ // Keys
+ Endpoint::ListKeys => handle_list_keys(&self.garage).await,
+ Endpoint::GetKeyInfo { id, search } => {
+ handle_get_key_info(&self.garage, id, search).await
+ }
+ Endpoint::CreateKey => handle_create_key(&self.garage, req).await,
+ Endpoint::ImportKey => handle_import_key(&self.garage, req).await,
+ Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await,
+ Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await,
+ // Buckets
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage).await,
+ Endpoint::GetBucketInfo { id, global_alias } => {
+ handle_get_bucket_info(&self.garage, id, global_alias).await
+ }
+ Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
+ Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
+ Endpoint::PutBucketWebsite { id } => {
+ handle_put_bucket_website(&self.garage, id, req).await
+ }
+ Endpoint::DeleteBucketWebsite { id } => {
+ handle_delete_bucket_website(&self.garage, id).await
+ }
+ // Bucket-key permissions
+ Endpoint::BucketAllowKey => {
+ handle_bucket_change_key_perm(&self.garage, req, true).await
+ }
+ Endpoint::BucketDenyKey => {
+ handle_bucket_change_key_perm(&self.garage, req, false).await
+ }
+ // Bucket aliasing
+ Endpoint::GlobalAliasBucket { id, alias } => {
+ handle_global_alias_bucket(&self.garage, id, alias).await
+ }
+ Endpoint::GlobalUnaliasBucket { id, alias } => {
+ handle_global_unalias_bucket(&self.garage, id, alias).await
+ }
+ Endpoint::LocalAliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await,
+ Endpoint::LocalUnaliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await,
+ }
+ }
+}
+
+impl ApiEndpoint for Endpoint {
+ fn name(&self) -> &'static str {
+ Endpoint::name(self)
+ }
+
+ fn add_span_attributes(&self, _span: SpanRef<'_>) {}
+}
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
new file mode 100644
index 00000000..30dc3436
--- /dev/null
+++ b/src/api/admin/bucket.rs
@@ -0,0 +1,542 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::*;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_model::permission::*;
+
+use crate::admin::error::*;
+use crate::admin::key::ApiBucketKeyPerm;
+use crate::common_error::CommonError;
+use crate::helpers::parse_json_body;
+
+pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let buckets = garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ let res = buckets
+ .into_iter()
+ .map(|b| {
+ let state = b.state.as_option().unwrap();
+ ListBucketResultItem {
+ id: hex::encode(b.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|((k, n), _, _)| BucketLocalAlias {
+ access_key_id: k.to_string(),
+ alias: n.to_string(),
+ })
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct ListBucketResultItem {
+ id: String,
+ global_aliases: Vec<String>,
+ local_aliases: Vec<BucketLocalAlias>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct BucketLocalAlias {
+ access_key_id: String,
+ alias: String,
+}
+
+pub async fn handle_get_bucket_info(
+ garage: &Arc<Garage>,
+ id: Option<String>,
+ global_alias: Option<String>,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = match (id, global_alias) {
+ (Some(id), None) => parse_bucket_id(&id)?,
+ (None, Some(ga)) => garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&ga)
+ .await?
+ .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
+ _ => {
+ return Err(Error::bad_request(
+ "Either id or globalAlias must be provided (but not both)",
+ ));
+ }
+ };
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+async fn bucket_info_results(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+) -> Result<Response<Body>, Error> {
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ if !key.state.is_deleted() {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = garage.key_table.get(&EmptyKey, k).await? {
+ if !key.state.is_deleted() {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ }
+
+ let state = bucket.state.as_option().unwrap();
+
+ let res =
+ GetBucketInfoResult {
+ id: hex::encode(&bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ website_access: state.website_config.get().is_some(),
+ website_config: state.website_config.get().clone().map(|wsc| {
+ GetBucketInfoWebsiteResult {
+ index_document: wsc.index_document,
+ error_document: wsc.error_document,
+ }
+ }),
+ keys: relevant_keys
+ .into_iter()
+ .map(|(_, key)| {
+ let p = key.state.as_option().unwrap();
+ GetBucketInfoKey {
+ access_key_id: key.key_id,
+ name: p.name.get().to_string(),
+ permissions: p
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ bucket_local_aliases: p
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, b)| *b == Some(bucket.id))
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>(),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoResult {
+ id: String,
+ global_aliases: Vec<String>,
+ website_access: bool,
+ #[serde(default)]
+ website_config: Option<GetBucketInfoWebsiteResult>,
+ keys: Vec<GetBucketInfoKey>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoWebsiteResult {
+ index_document: String,
+ error_document: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoKey {
+ access_key_id: String,
+ name: String,
+ permissions: ApiBucketKeyPerm,
+ bucket_local_aliases: Vec<String>,
+}
+
+pub async fn handle_create_bucket(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<CreateBucketRequest>(req).await?;
+
+ if let Some(ga) = &req.global_alias {
+ if !is_valid_bucket_name(ga) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ ga, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
+ if alias.state.get().is_some() {
+ return Err(CommonError::BucketAlreadyExists.into());
+ }
+ }
+ }
+
+ if let Some(la) = &req.local_alias {
+ if !is_valid_bucket_name(&la.alias) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ la.alias, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ let key = garage
+ .key_helper()
+ .get_existing_key(&la.access_key_id)
+ .await?;
+ let state = key.state.as_option().unwrap();
+ if matches!(state.local_aliases.get(&la.alias), Some(_)) {
+ return Err(Error::bad_request("Local alias already exists"));
+ }
+ }
+
+ let bucket = Bucket::new();
+ garage.bucket_table.insert(&bucket).await?;
+
+ if let Some(ga) = &req.global_alias {
+ garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, ga)
+ .await?;
+ }
+
+ if let Some(la) = &req.local_alias {
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
+ .await?;
+ if la.all_permissions {
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(
+ bucket.id,
+ &la.access_key_id,
+ BucketKeyPerm::ALL_PERMISSIONS,
+ )
+ .await?;
+ }
+ }
+
+ bucket_info_results(garage, bucket.id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateBucketRequest {
+ global_alias: Option<String>,
+ local_alias: Option<CreateBucketLocalAlias>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateBucketLocalAlias {
+ access_key_id: String,
+ alias: String,
+ #[serde(default)]
+ all_permissions: bool,
+}
+
+pub async fn handle_delete_bucket(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<Body>, Error> {
+ let helper = garage.bucket_helper();
+
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let state = bucket.state.as_option().unwrap();
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(CommonError::BucketNotEmpty.into());
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, perm) in bucket.authorized_keys() {
+ if perm.is_any() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+ }
+ // 2. delete all local aliases
+ for ((key_id, alias), _, active) in state.local_aliases.items().iter() {
+ if *active {
+ helper
+ .unset_local_bucket_alias(bucket.id, key_id, alias)
+ .await?;
+ }
+ }
+ // 3. delete all global aliases
+ for (alias, _, active) in state.aliases.items().iter() {
+ if *active {
+ helper.purge_global_bucket_alias(bucket.id, alias).await?;
+ }
+ }
+
+ // 4. delete bucket
+ bucket.state = Deletable::delete();
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+// ---- BUCKET WEBSITE CONFIGURATION ----
+
+pub async fn handle_put_bucket_website(
+ garage: &Arc<Garage>,
+ id: String,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<PutBucketWebsiteRequest>(req).await?;
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let state = bucket.state.as_option_mut().unwrap();
+ state.website_config.update(Some(WebsiteConfig {
+ index_document: req.index_document,
+ error_document: req.error_document,
+ }));
+
+ garage.bucket_table.insert(&bucket).await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct PutBucketWebsiteRequest {
+ index_document: String,
+ #[serde(default)]
+ error_document: Option<String>,
+}
+
+pub async fn handle_delete_bucket_website(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let state = bucket.state.as_option_mut().unwrap();
+ state.website_config.update(None);
+
+ garage.bucket_table.insert(&bucket).await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+// ---- BUCKET/KEY PERMISSIONS ----
+
+pub async fn handle_bucket_change_key_perm(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+ new_perm_flag: bool,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?;
+
+ let bucket_id = parse_bucket_id(&req.bucket_id)?;
+
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let state = bucket.state.as_option().unwrap();
+
+ let key = garage
+ .key_helper()
+ .get_existing_key(&req.access_key_id)
+ .await?;
+
+ let mut perm = state
+ .authorized_keys
+ .get(&key.key_id)
+ .cloned()
+ .unwrap_or(BucketKeyPerm::NO_PERMISSIONS);
+
+ if req.permissions.read {
+ perm.allow_read = new_perm_flag;
+ }
+ if req.permissions.write {
+ perm.allow_write = new_perm_flag;
+ }
+ if req.permissions.owner {
+ perm.allow_owner = new_perm_flag;
+ }
+
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, &key.key_id, perm)
+ .await?;
+
+ bucket_info_results(garage, bucket.id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct BucketKeyPermChangeRequest {
+ bucket_id: String,
+ access_key_id: String,
+ permissions: ApiBucketKeyPerm,
+}
+
+// ---- BUCKET ALIASES ----
+
+pub async fn handle_global_alias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_global_unalias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_local_alias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ access_key_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket_id, &access_key_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_local_unalias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ access_key_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &access_key_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+// ---- HELPER ----
+
+fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
+ let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?;
+ Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
+}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
new file mode 100644
index 00000000..3401be42
--- /dev/null
+++ b/src/api/admin/cluster.rs
@@ -0,0 +1,198 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_rpc::layout::*;
+
+use garage_model::garage::Garage;
+
+use crate::admin::error::*;
+use crate::helpers::parse_json_body;
+
+pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = GetClusterStatusResponse {
+ node: hex::encode(garage.system.id),
+ garage_version: garage.system.garage_version(),
+ known_nodes: garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|i| {
+ (
+ hex::encode(i.id),
+ KnownNodeResp {
+ addr: i.addr,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ hostname: i.status.hostname,
+ },
+ )
+ })
+ .collect(),
+ layout: get_cluster_layout(garage),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+pub async fn handle_connect_cluster_nodes(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<Vec<String>>(req).await?;
+
+ let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
+ .await
+ .into_iter()
+ .map(|r| match r {
+ Ok(()) => ConnectClusterNodesResponse {
+ success: true,
+ error: None,
+ },
+ Err(e) => ConnectClusterNodesResponse {
+ success: false,
+ error: Some(format!("{}", e)),
+ },
+ })
+ .collect::<Vec<_>>();
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = get_cluster_layout(garage);
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
+ let layout = garage.system.get_cluster_layout();
+
+ GetClusterLayoutResponse {
+ version: layout.version,
+ roles: layout
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| v.0.is_some())
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ staged_role_changes: layout
+ .staging
+ .items()
+ .iter()
+ .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ }
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetClusterStatusResponse {
+ node: String,
+ garage_version: &'static str,
+ known_nodes: HashMap<String, KnownNodeResp>,
+ layout: GetClusterLayoutResponse,
+}
+
+#[derive(Serialize)]
+struct ConnectClusterNodesResponse {
+ success: bool,
+ error: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetClusterLayoutResponse {
+ version: u64,
+ roles: HashMap<String, Option<NodeRole>>,
+ staged_role_changes: HashMap<String, Option<NodeRole>>,
+}
+
+#[derive(Serialize)]
+struct KnownNodeResp {
+ addr: SocketAddr,
+ is_up: bool,
+ last_seen_secs_ago: Option<u64>,
+ hostname: String,
+}
+
+pub async fn handle_update_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+
+ let mut layout = garage.system.get_cluster_layout();
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for (node, role) in updates {
+ let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
+ let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(node, NodeRoleV(role)));
+ }
+
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_apply_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.apply_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.revert_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
+
+#[derive(Deserialize)]
+struct ApplyRevertLayoutRequest {
+ version: u64,
+}
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
new file mode 100644
index 00000000..c4613cb3
--- /dev/null
+++ b/src/api/admin/error.rs
@@ -0,0 +1,96 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+pub use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// The API access key does not exist
+ #[error(display = "Access key not found: {}", _0)]
+ NoSuchAccessKey(String),
+
+ /// In Import key, the key already exists
+ #[error(
+ display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
+ _0
+ )]
+ KeyAlreadyExists(String),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
+ }
+ }
+}
+
+impl Error {
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
+ }
+ }
+
+ fn add_http_headers(&self, _header_map: &mut HeaderMap<HeaderValue>) {
+ // nothing
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
new file mode 100644
index 00000000..f30b5dbb
--- /dev/null
+++ b/src/api/admin/key.rs
@@ -0,0 +1,264 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::error::Error as GarageError;
+
+use garage_table::*;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::*;
+
+use crate::admin::error::*;
+use crate::helpers::parse_json_body;
+
+pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| ListKeyResultItem {
+ id: k.key_id.to_string(),
+ name: k.params().unwrap().name.get().clone(),
+ })
+ .collect::<Vec<_>>();
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+struct ListKeyResultItem {
+ id: String,
+ name: String,
+}
+
+pub async fn handle_get_key_info(
+ garage: &Arc<Garage>,
+ id: Option<String>,
+ search: Option<String>,
+) -> Result<Response<Body>, Error> {
+ let key = if let Some(id) = id {
+ garage.key_helper().get_existing_key(&id).await?
+ } else if let Some(search) = search {
+ garage
+ .key_helper()
+ .get_existing_matching_key(&search)
+ .await?
+ } else {
+ unreachable!();
+ };
+
+ key_info_results(garage, key).await
+}
+
+pub async fn handle_create_key(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<CreateKeyRequest>(req).await?;
+
+ let key = Key::new(&req.name);
+ garage.key_table.insert(&key).await?;
+
+ key_info_results(garage, key).await
+}
+
+#[derive(Deserialize)]
+struct CreateKeyRequest {
+ name: String,
+}
+
+pub async fn handle_import_key(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<ImportKeyRequest>(req).await?;
+
+ let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::KeyAlreadyExists(req.access_key_id.to_string()));
+ }
+
+ let imported_key = Key::import(&req.access_key_id, &req.secret_access_key, &req.name);
+ garage.key_table.insert(&imported_key).await?;
+
+ key_info_results(garage, imported_key).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ImportKeyRequest {
+ access_key_id: String,
+ secret_access_key: String,
+ name: String,
+}
+
+pub async fn handle_update_key(
+ garage: &Arc<Garage>,
+ id: String,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<UpdateKeyRequest>(req).await?;
+
+ let mut key = garage.key_helper().get_existing_key(&id).await?;
+
+ let key_state = key.state.as_option_mut().unwrap();
+
+ if let Some(new_name) = req.name {
+ key_state.name.update(new_name);
+ }
+ if let Some(allow) = req.allow {
+ if allow.create_bucket {
+ key_state.allow_create_bucket.update(true);
+ }
+ }
+ if let Some(deny) = req.deny {
+ if deny.create_bucket {
+ key_state.allow_create_bucket.update(false);
+ }
+ }
+
+ garage.key_table.insert(&key).await?;
+
+ key_info_results(garage, key).await
+}
+
+#[derive(Deserialize)]
+struct UpdateKeyRequest {
+ name: Option<String>,
+ allow: Option<KeyPerm>,
+ deny: Option<KeyPerm>,
+}
+
+pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> {
+ let mut key = garage.key_helper().get_existing_key(&id).await?;
+
+ key.state.as_option().unwrap();
+
+ garage.key_helper().delete_key(&mut key).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Body>, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ let key_state = key.state.as_option().unwrap();
+
+ for id in key_state
+ .authorized_buckets
+ .items()
+ .iter()
+ .map(|(id, _)| id)
+ .chain(
+ key_state
+ .local_aliases
+ .items()
+ .iter()
+ .filter_map(|(_, _, v)| v.as_ref()),
+ ) {
+ if !relevant_buckets.contains_key(id) {
+ if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? {
+ if b.state.as_option().is_some() {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+ }
+ }
+
+ let res = GetKeyInfoResult {
+ name: key_state.name.get().clone(),
+ access_key_id: key.key_id.clone(),
+ secret_access_key: key_state.secret_key.clone(),
+ permissions: KeyPerm {
+ create_bucket: *key_state.allow_create_bucket.get(),
+ },
+ buckets: relevant_buckets
+ .into_iter()
+ .map(|(_, bucket)| {
+ let state = bucket.state.as_option().unwrap();
+ KeyInfoBucketResult {
+ id: hex::encode(bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|((k, _), _, a)| *a && *k == key.key_id)
+ .map(|((_, n), _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ permissions: key_state
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ }
+ })
+ .collect::<Vec<_>>(),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetKeyInfoResult {
+ name: String,
+ access_key_id: String,
+ secret_access_key: String,
+ permissions: KeyPerm,
+ buckets: Vec<KeyInfoBucketResult>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct KeyPerm {
+ #[serde(default)]
+ create_bucket: bool,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct KeyInfoBucketResult {
+ id: String,
+ global_aliases: Vec<String>,
+ local_aliases: Vec<String>,
+ permissions: ApiBucketKeyPerm,
+}
+
+#[derive(Serialize, Deserialize, Default)]
+pub(crate) struct ApiBucketKeyPerm {
+ #[serde(default)]
+ pub(crate) read: bool,
+ #[serde(default)]
+ pub(crate) write: bool,
+ #[serde(default)]
+ pub(crate) owner: bool,
+}
diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs
new file mode 100644
index 00000000..c4857c10
--- /dev/null
+++ b/src/api/admin/mod.rs
@@ -0,0 +1,7 @@
+pub mod api_server;
+mod error;
+mod router;
+
+mod bucket;
+mod cluster;
+mod key;
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
new file mode 100644
index 00000000..93639873
--- /dev/null
+++ b/src/api/admin/router.rs
@@ -0,0 +1,149 @@
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::admin::error::*;
+use crate::router_macros::*;
+
+pub enum Authorization {
+ MetricsToken,
+ AdminToken,
+}
+
+router_match! {@func
+
+/// List of all Admin API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ Options,
+ Metrics,
+ GetClusterStatus,
+ ConnectClusterNodes,
+ // Layout
+ GetClusterLayout,
+ UpdateClusterLayout,
+ ApplyClusterLayout,
+ RevertClusterLayout,
+ // Keys
+ ListKeys,
+ CreateKey,
+ ImportKey,
+ GetKeyInfo {
+ id: Option<String>,
+ search: Option<String>,
+ },
+ DeleteKey {
+ id: String,
+ },
+ UpdateKey {
+ id: String,
+ },
+ // Buckets
+ ListBuckets,
+ CreateBucket,
+ GetBucketInfo {
+ id: Option<String>,
+ global_alias: Option<String>,
+ },
+ DeleteBucket {
+ id: String,
+ },
+ PutBucketWebsite {
+ id: String,
+ },
+ DeleteBucketWebsite {
+ id: String,
+ },
+ // Bucket-Key Permissions
+ BucketAllowKey,
+ BucketDenyKey,
+ // Bucket aliases
+ GlobalAliasBucket {
+ id: String,
+ alias: String,
+ },
+ GlobalUnaliasBucket {
+ id: String,
+ alias: String,
+ },
+ LocalAliasBucket {
+ id: String,
+ access_key_id: String,
+ alias: String,
+ },
+ LocalUnaliasBucket {
+ id: String,
+ access_key_id: String,
+ alias: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
+ let uri = req.uri();
+ let path = uri.path();
+ let query = uri.query();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let res = router_match!(@gen_path_parser (req.method(), path, query) [
+ OPTIONS _ => Options,
+ GET "/metrics" => Metrics,
+ GET "/v0/status" => GetClusterStatus,
+ POST "/v0/connect" => ConnectClusterNodes,
+ // Layout endpoints
+ GET "/v0/layout" => GetClusterLayout,
+ POST "/v0/layout" => UpdateClusterLayout,
+ POST "/v0/layout/apply" => ApplyClusterLayout,
+ POST "/v0/layout/revert" => RevertClusterLayout,
+ // API key endpoints
+ GET "/v0/key" if id => GetKeyInfo (query_opt::id, query_opt::search),
+ GET "/v0/key" if search => GetKeyInfo (query_opt::id, query_opt::search),
+ POST "/v0/key" if id => UpdateKey (query::id),
+ POST "/v0/key" => CreateKey,
+ POST "/v0/key/import" => ImportKey,
+ DELETE "/v0/key" if id => DeleteKey (query::id),
+ GET "/v0/key" => ListKeys,
+ // Bucket endpoints
+ GET "/v0/bucket" if id => GetBucketInfo (query_opt::id, query_opt::global_alias),
+ GET "/v0/bucket" if global_alias => GetBucketInfo (query_opt::id, query_opt::global_alias),
+ GET "/v0/bucket" => ListBuckets,
+ POST "/v0/bucket" => CreateBucket,
+ DELETE "/v0/bucket" if id => DeleteBucket (query::id),
+ PUT "/v0/bucket/website" if id => PutBucketWebsite (query::id),
+ DELETE "/v0/bucket/website" if id => DeleteBucketWebsite (query::id),
+ // Bucket-key permissions
+ POST "/v0/bucket/allow" => BucketAllowKey,
+ POST "/v0/bucket/deny" => BucketDenyKey,
+ // Bucket aliases
+ PUT "/v0/bucket/alias/global" => GlobalAliasBucket (query::id, query::alias),
+ DELETE "/v0/bucket/alias/global" => GlobalUnaliasBucket (query::id, query::alias),
+ PUT "/v0/bucket/alias/local" => LocalAliasBucket (query::id, query::access_key_id, query::alias),
+ DELETE "/v0/bucket/alias/local" => LocalUnaliasBucket (query::id, query::access_key_id, query::alias),
+ ]);
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+
+ Ok(res)
+ }
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ match self {
+ Self::Metrics => Authorization::MetricsToken,
+ _ => Authorization::AdminToken,
+ }
+ }
+}
+
+generateQueryParameters! {
+ "id" => id,
+ "search" => search,
+ "globalAlias" => global_alias,
+ "alias" => alias,
+ "accessKeyId" => access_key_id
+}
diff --git a/src/api/common_error.rs b/src/api/common_error.rs
new file mode 100644
index 00000000..20f9f266
--- /dev/null
+++ b/src/api/common_error.rs
@@ -0,0 +1,177 @@
+use err_derive::Error;
+use hyper::StatusCode;
+
+use garage_util::error::Error as GarageError;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum CommonError {
+ // ---- INTERNAL ERRORS ----
+ /// Error related to deeper parts of Garage
+ #[error(display = "Internal error: {}", _0)]
+ InternalError(#[error(source)] GarageError),
+
+ /// Error related to Hyper
+ #[error(display = "Internal error (Hyper error): {}", _0)]
+ Hyper(#[error(source)] hyper::Error),
+
+ /// Error related to HTTP
+ #[error(display = "Internal error (HTTP error): {}", _0)]
+ Http(#[error(source)] http::Error),
+
+ // ---- GENERIC CLIENT ERRORS ----
+ /// Proper authentication was not provided
+ #[error(display = "Forbidden: {}", _0)]
+ Forbidden(String),
+
+ /// Generic bad request response with custom message
+ #[error(display = "Bad request: {}", _0)]
+ BadRequest(String),
+
+ // ---- SPECIFIC ERROR CONDITIONS ----
+ // These have to be error codes referenced in the S3 spec here:
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+ /// The bucket requested don't exists
+ #[error(display = "Bucket not found: {}", _0)]
+ NoSuchBucket(String),
+
+ /// Tried to create a bucket that already exist
+ #[error(display = "Bucket already exists")]
+ BucketAlreadyExists,
+
+ /// Tried to delete a non-empty bucket
+ #[error(display = "Tried to delete a non-empty bucket")]
+ BucketNotEmpty,
+
+ // Category: bad request
+ /// Bucket name is not valid according to AWS S3 specs
+ #[error(display = "Invalid bucket name: {}", _0)]
+ InvalidBucketName(String),
+}
+
+impl CommonError {
+ pub fn http_status_code(&self) -> StatusCode {
+ match self {
+ CommonError::InternalError(
+ GarageError::Timeout
+ | GarageError::RemoteError(_)
+ | GarageError::Quorum(_, _, _, _),
+ ) => StatusCode::SERVICE_UNAVAILABLE,
+ CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
+ StatusCode::INTERNAL_SERVER_ERROR
+ }
+ CommonError::BadRequest(_) => StatusCode::BAD_REQUEST,
+ CommonError::Forbidden(_) => StatusCode::FORBIDDEN,
+ CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND,
+ CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT,
+ CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ pub fn aws_code(&self) -> &'static str {
+ match self {
+ CommonError::Forbidden(_) => "AccessDenied",
+ CommonError::InternalError(
+ GarageError::Timeout
+ | GarageError::RemoteError(_)
+ | GarageError::Quorum(_, _, _, _),
+ ) => "ServiceUnavailable",
+ CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
+ "InternalError"
+ }
+ CommonError::BadRequest(_) => "InvalidRequest",
+ CommonError::NoSuchBucket(_) => "NoSuchBucket",
+ CommonError::BucketAlreadyExists => "BucketAlreadyExists",
+ CommonError::BucketNotEmpty => "BucketNotEmpty",
+ CommonError::InvalidBucketName(_) => "InvalidBucketName",
+ }
+ }
+
+ pub fn bad_request<M: ToString>(msg: M) -> Self {
+ CommonError::BadRequest(msg.to_string())
+ }
+}
+
+pub trait CommonErrorDerivative: From<CommonError> {
+ fn internal_error<M: ToString>(msg: M) -> Self {
+ Self::from(CommonError::InternalError(GarageError::Message(
+ msg.to_string(),
+ )))
+ }
+
+ fn bad_request<M: ToString>(msg: M) -> Self {
+ Self::from(CommonError::BadRequest(msg.to_string()))
+ }
+
+ fn forbidden<M: ToString>(msg: M) -> Self {
+ Self::from(CommonError::Forbidden(msg.to_string()))
+ }
+}
+
+/// Trait to map error to the Bad Request error code
+pub trait OkOrBadRequest {
+ type S;
+ fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>;
+}
+
+impl<T, E> OkOrBadRequest for Result<T, E>
+where
+ E: std::fmt::Display,
+{
+ type S = T;
+ fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(CommonError::BadRequest(format!(
+ "{}: {}",
+ reason.as_ref(),
+ e
+ ))),
+ }
+ }
+}
+
+impl<T> OkOrBadRequest for Option<T> {
+ type S = T;
+ fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Some(x) => Ok(x),
+ None => Err(CommonError::BadRequest(reason.as_ref().to_string())),
+ }
+ }
+}
+
+/// Trait to map an error to an Internal Error code
+pub trait OkOrInternalError {
+ type S;
+ fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>;
+}
+
+impl<T, E> OkOrInternalError for Result<T, E>
+where
+ E: std::fmt::Display,
+{
+ type S = T;
+ fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(CommonError::InternalError(GarageError::Message(format!(
+ "{}: {}",
+ reason.as_ref(),
+ e
+ )))),
+ }
+ }
+}
+
+impl<T> OkOrInternalError for Option<T> {
+ type S = T;
+ fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Some(x) => Ok(x),
+ None => Err(CommonError::InternalError(GarageError::Message(
+ reason.as_ref().to_string(),
+ ))),
+ }
+ }
+}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index 9281e596..77278908 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -5,9 +5,11 @@ use async_trait::async_trait;
use futures::future::Future;
+use hyper::header::HeaderValue;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
+use hyper::{HeaderMap, StatusCode};
use opentelemetry::{
global,
@@ -19,26 +21,31 @@ use opentelemetry::{
use garage_util::error::Error as GarageError;
use garage_util::metrics::{gen_trace_id, RecordDuration};
-use crate::error::*;
-
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn add_span_attributes(&self, span: SpanRef<'_>);
}
+pub trait ApiError: std::error::Error + Send + Sync + 'static {
+ fn http_status_code(&self) -> StatusCode;
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>);
+ fn http_body(&self, garage_region: &str, path: &str) -> Body;
+}
+
#[async_trait]
pub(crate) trait ApiHandler: Send + Sync + 'static {
const API_NAME: &'static str;
const API_NAME_DISPLAY: &'static str;
type Endpoint: ApiEndpoint;
+ type Error: ApiError;
- fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Error>;
+ fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>;
async fn handle(
&self,
req: Request<Body>,
endpoint: Self::Endpoint,
- ) -> Result<Response<Body>, Error>;
+ ) -> Result<Response<Body>, Self::Error>;
}
pub(crate) struct ApiServer<A: ApiHandler> {
@@ -142,13 +149,13 @@ impl<A: ApiHandler> ApiServer<A> {
Ok(x)
}
Err(e) => {
- let body: Body = Body::from(e.aws_xml(&self.region, uri.path()));
+ let body: Body = e.http_body(&self.region, uri.path());
let mut http_error_builder = Response::builder()
.status(e.http_status_code())
.header("Content-Type", "application/xml");
if let Some(header_map) = http_error_builder.headers_mut() {
- e.add_headers(header_map)
+ e.add_http_headers(header_map)
}
let http_error = http_error_builder.body(body)?;
@@ -163,7 +170,7 @@ impl<A: ApiHandler> ApiServer<A> {
}
}
- async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
+ async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> {
let endpoint = self.api_handler.parse_endpoint(&req)?;
debug!("Endpoint: {}", endpoint.name());
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index a994b82f..9fb12dbe 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,11 +1,8 @@
+use hyper::{Body, Request};
use idna::domain_to_unicode;
+use serde::{Deserialize, Serialize};
-use garage_util::data::*;
-
-use garage_model::garage::Garage;
-use garage_model::key_table::Key;
-
-use crate::error::*;
+use crate::common_error::{CommonError as Error, *};
/// What kind of authorization is required to perform a given action
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -50,7 +47,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
let mut iter = authority.chars().enumerate();
let (_, first_char) = iter
.next()
- .ok_or_else(|| Error::BadRequest("Authority is empty".to_string()))?;
+ .ok_or_else(|| Error::bad_request("Authority is empty".to_string()))?;
let split = match first_char {
'[' => {
@@ -58,7 +55,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
match iter.next() {
Some((_, ']')) => iter.next(),
_ => {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Authority {} has an illegal format",
authority
)))
@@ -71,7 +68,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
let authority = match split {
Some((i, ':')) => Ok(&authority[..i]),
None => Ok(authority),
- Some((_, _)) => Err(Error::BadRequest(format!(
+ Some((_, _)) => Err(Error::bad_request(format!(
"Authority {} has an illegal format",
authority
))),
@@ -79,28 +76,6 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
authority.map(|h| domain_to_unicode(h).0)
}
-#[allow(clippy::ptr_arg)]
-pub async fn resolve_bucket(
- garage: &Garage,
- bucket_name: &String,
- api_key: &Key,
-) -> Result<Uuid, Error> {
- let api_key_params = api_key
- .state
- .as_option()
- .ok_or_internal_error("Key should not be deleted at this point")?;
-
- if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
- Ok(*bucket_id)
- } else {
- Ok(garage
- .bucket_helper()
- .resolve_global_bucket_name(bucket_name)
- .await?
- .ok_or(Error::NoSuchBucket)?)
- }
-}
-
/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in
/// the host header of the request
///
@@ -132,7 +107,7 @@ pub fn parse_bucket_key<'a>(
None => (path, None),
};
if bucket.is_empty() {
- return Err(Error::BadRequest("No bucket specified".to_string()));
+ return Err(Error::bad_request("No bucket specified"));
}
Ok((bucket, key))
}
@@ -163,6 +138,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
None
}
+pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -298,3 +279,11 @@ mod tests {
);
}
}
+
+#[derive(Serialize)]
+pub(crate) struct CustomApiErrorBody {
+ pub(crate) code: String,
+ pub(crate) message: String,
+ pub(crate) region: String,
+ pub(crate) path: String,
+}
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 5f5e9030..eb0fbdd7 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -7,13 +7,12 @@ use hyper::{Body, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
-use garage_table::util::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
-use crate::error::*;
use crate::generic_server::*;
+use crate::k2v::error::*;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
@@ -60,6 +59,7 @@ impl ApiHandler for K2VApiServer {
const API_NAME_DISPLAY: &'static str = "K2V";
type Endpoint = K2VApiEndpoint;
+ type Error = Error;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
@@ -83,13 +83,14 @@ impl ApiHandler for K2VApiServer {
// The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, Some(bucket_name)).await;
+ return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
+ .await
+ .ok_or_bad_request("Error handling OPTIONS")?);
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
- let api_key = api_key.ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?;
+ let api_key = api_key
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let req = parse_streaming_body(
&api_key,
@@ -99,13 +100,14 @@ impl ApiHandler for K2VApiServer {
"k2v",
)?;
- let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
let bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@@ -115,9 +117,7 @@ impl ApiHandler for K2VApiServer {
};
if !allowed {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
+ return Err(Error::forbidden("Operation is not allowed for this key."));
}
// Look up what CORS rule might apply to response.
@@ -125,7 +125,8 @@ impl ApiHandler for K2VApiServer {
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
- Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
+ .ok_or_internal_error("Error looking up CORS rule")?,
_ => None,
};
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 4ecddeb9..db9901cf 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -12,7 +12,8 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::error::*;
+use crate::helpers::*;
+use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
@@ -20,9 +21,7 @@ pub async fn handle_insert_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let items: Vec<InsertBatchItem> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
let mut items2 = vec![];
for it in items {
@@ -52,9 +51,7 @@ pub async fn handle_read_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<ReadBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -91,7 +88,7 @@ async fn handle_read_batch_query(
let (items, more, next_start) = if query.single_item {
if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
- return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into()));
+ return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true."));
}
let sk = query
.start
@@ -149,9 +146,7 @@ pub async fn handle_delete_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<DeleteBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -188,7 +183,7 @@ async fn handle_delete_batch_query(
let deleted_items = if query.single_item {
if query.prefix.is_some() || query.end.is_some() {
- return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into()));
+ return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true."));
}
let sk = query
.start
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
new file mode 100644
index 00000000..4c55d8b5
--- /dev/null
+++ b/src/api/k2v/error.rs
@@ -0,0 +1,134 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+use crate::signature::error::Error as SignatureError;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// Authorization Header Malformed
+ #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ AuthorizationHeaderMalformed(String),
+
+ /// The object requested don't exists
+ #[error(display = "Key not found")]
+ NoSuchKey,
+
+ /// Some base64 encoded data was badly encoded
+ #[error(display = "Invalid base64: {}", _0)]
+ InvalidBase64(#[error(source)] base64::DecodeError),
+
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+
+ /// The client asked for an invalid return format (invalid Accept header)
+ #[error(display = "Not acceptable: {}", _0)]
+ NotAcceptable(String),
+
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
+ #[error(display = "Invalid UTF-8: {}", _0)]
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ e => Self::Common(CommonError::BadRequest(format!("{}", e))),
+ }
+ }
+}
+
+impl From<SignatureError> for Error {
+ fn from(err: SignatureError) -> Self {
+ match err {
+ SignatureError::Common(c) => Self::Common(c),
+ SignatureError::AuthorizationHeaderMalformed(c) => {
+ Self::AuthorizationHeaderMalformed(c)
+ }
+ SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
+ SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
+ }
+ }
+}
+
+impl Error {
+ /// This returns a keyword for the corresponding error.
+ /// Here, these keywords are not necessarily those from AWS S3,
+ /// as we are building a custom API
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchKey => "NoSuchKey",
+ Error::NotAcceptable(_) => "NotAcceptable",
+ Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
+ Error::InvalidBase64(_) => "InvalidBase64",
+ Error::InvalidHeader(_) => "InvalidHeaderValue",
+ Error::InvalidUtf8Str(_) => "InvalidUtf8String",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchKey => StatusCode::NOT_FOUND,
+ Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
+ Error::AuthorizationHeaderMalformed(_)
+ | Error::InvalidBase64(_)
+ | Error::InvalidHeader(_)
+ | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ fn add_http_headers(&self, _header_map: &mut HeaderMap<HeaderValue>) {
+ // nothing
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 896dbcf0..d5db906d 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -12,7 +12,7 @@ use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
-use crate::error::*;
+use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_read_index(
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 1860863e..836d386f 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -10,7 +10,7 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::error::*;
+use crate::k2v::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs
index ee210ad5..b6a8c5cf 100644
--- a/src/api/k2v/mod.rs
+++ b/src/api/k2v/mod.rs
@@ -1,4 +1,5 @@
pub mod api_server;
+mod error;
mod router;
mod batch;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
index 295c34aa..bb9d3be5 100644
--- a/src/api/k2v/range.rs
+++ b/src/api/k2v/range.rs
@@ -7,8 +7,8 @@ use std::sync::Arc;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::error::*;
use crate::helpers::key_after_prefix;
+use crate::k2v::error::*;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
@@ -31,7 +31,7 @@ where
(None, Some(s)) => (Some(s.clone()), false),
(Some(p), Some(s)) => {
if !s.starts_with(p) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Start key '{}' does not start with prefix '{}'",
s, p
)));
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
index f948ffce..50e6965b 100644
--- a/src/api/k2v/router.rs
+++ b/src/api/k2v/router.rs
@@ -1,4 +1,4 @@
-use crate::error::*;
+use crate::k2v::error::*;
use std::borrow::Cow;
@@ -62,7 +62,7 @@ impl Endpoint {
.unwrap_or((path.to_owned(), ""));
if bucket.is_empty() {
- return Err(Error::BadRequest("Missing bucket name".to_owned()));
+ return Err(Error::bad_request("Missing bucket name"));
}
if *req.method() == Method::OPTIONS {
@@ -83,7 +83,7 @@ impl Endpoint {
Method::PUT => Self::from_put(partition_key, &mut query)?,
Method::DELETE => Self::from_delete(partition_key, &mut query)?,
_ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
- _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ _ => return Err(Error::bad_request("Unknown method")),
};
if let Some(message) = query.nonempty_message() {
diff --git a/src/api/lib.rs b/src/api/lib.rs
index 0078f7b5..370dfd7a 100644
--- a/src/api/lib.rs
+++ b/src/api/lib.rs
@@ -2,16 +2,16 @@
#[macro_use]
extern crate tracing;
-pub mod error;
-pub use error::Error;
+pub mod common_error;
mod encoding;
-mod generic_server;
+pub mod generic_server;
pub mod helpers;
mod router_macros;
/// This mode is public only to help testing. Don't expect stability here
pub mod signature;
+pub mod admin;
#[cfg(feature = "k2v")]
pub mod k2v;
pub mod s3;
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs
index 8471407c..4c593300 100644
--- a/src/api/router_macros.rs
+++ b/src/api/router_macros.rs
@@ -23,6 +23,29 @@ macro_rules! router_match {
_ => None
}
}};
+ (@gen_path_parser ($method:expr, $reqpath:expr, $query:expr)
+ [
+ $($meth:ident $path:pat $(if $required:ident)? => $api:ident $(($($conv:ident :: $param:ident),*))?,)*
+ ]) => {{
+ {
+ use Endpoint::*;
+ match ($method, $reqpath) {
+ $(
+ (&Method::$meth, $path) if true $(&& $query.$required.is_some())? => $api {
+ $($(
+ $param: router_match!(@@parse_param $query, $conv, $param),
+ )*)?
+ },
+ )*
+ (m, p) => {
+ return Err(Error::bad_request(format!(
+ "Unknown API endpoint: {} {}",
+ m, p
+ )))
+ }
+ }
+ }
+ }};
(@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr),
key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
@@ -55,7 +78,7 @@ macro_rules! router_match {
)*)?
}),
)*
- (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw)))
+ (kw, _) => Err(Error::bad_request(format!("Invalid endpoint: {}", kw)))
}
}};
@@ -74,14 +97,14 @@ macro_rules! router_match {
.take()
.map(|param| param.parse())
.transpose()
- .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
+ .map_err(|_| Error::bad_request("Failed to parse query parameter"))?
}};
(@@parse_param $query:expr, parse, $param:ident) => {{
// extract and parse mandatory query parameter
// both missing and un-parseable parameters are reported as errors
$query.$param.take().ok_or_bad_request("Missing argument for endpoint")?
.parse()
- .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
+ .map_err(|_| Error::bad_request("Failed to parse query parameter"))?
}};
(@func
$(#[$doc:meta])*
@@ -150,7 +173,7 @@ macro_rules! generateQueryParameters {
false
} else if v.as_ref().is_empty() {
if res.keyword.replace(k).is_some() {
- return Err(Error::BadRequest("Multiple keywords".to_owned()));
+ return Err(Error::bad_request("Multiple keywords"));
}
continue;
} else {
@@ -160,7 +183,7 @@ macro_rules! generateQueryParameters {
}
};
if repeated {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Query parameter repeated: '{}'",
k
)));
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 78a69d53..ecc417ab 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -8,14 +8,13 @@ use hyper::{Body, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
-use garage_table::util::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use crate::error::*;
use crate::generic_server::*;
+use crate::s3::error::*;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
@@ -75,6 +74,7 @@ impl ApiHandler for S3ApiServer {
const API_NAME_DISPLAY: &'static str = "S3";
type Endpoint = S3ApiEndpoint;
+ type Error = Error;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
let authority = req
@@ -122,9 +122,8 @@ impl ApiHandler for S3ApiServer {
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
- let api_key = api_key.ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?;
+ let api_key = api_key
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let req = parse_streaming_body(
&api_key,
@@ -148,13 +147,14 @@ impl ApiHandler for S3ApiServer {
return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
}
- let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
let bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@@ -164,9 +164,7 @@ impl ApiHandler for S3ApiServer {
};
if !allowed {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
+ return Err(Error::forbidden("Operation is not allowed for this key."));
}
// Look up what CORS rule might apply to response.
@@ -309,7 +307,7 @@ impl ApiHandler for S3ApiServer {
)
.await
} else {
- Err(Error::BadRequest(format!(
+ Err(Error::bad_request(format!(
"Invalid endpoint: list-type={}",
list_type
)))
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 93048a8c..2071fe55 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -8,13 +8,13 @@ use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::permission::BucketKeyPerm;
-use garage_model::s3::object_table::ObjectFilter;
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
-use crate::error::*;
+use crate::common_error::CommonError;
+use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
@@ -130,7 +130,7 @@ pub async fn handle_create_bucket(
if let Some(location_constraint) = cmd {
if location_constraint != garage.config.s3_api.s3_region {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`",
location_constraint,
garage.config.s3_api.s3_region
@@ -158,12 +158,12 @@ pub async fn handle_create_bucket(
// otherwise return a forbidden error.
let kp = api_key.bucket_permissions(&bucket_id);
if !(kp.allow_write || kp.allow_owner) {
- return Err(Error::BucketAlreadyExists);
+ return Err(CommonError::BucketAlreadyExists.into());
}
} else {
// Create the bucket!
if !is_valid_bucket_name(&bucket_name) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"{}: {}",
bucket_name, INVALID_BUCKET_NAME_MESSAGE
)));
@@ -228,18 +228,8 @@ pub async fn handle_delete_bucket(
// Delete bucket
// Check bucket is empty
- let objects = garage
- .object_table
- .get_range(
- &bucket_id,
- None,
- Some(ObjectFilter::IsData),
- 10,
- EnumerationOrder::Forward,
- )
- .await?;
- if !objects.is_empty() {
- return Err(Error::BucketNotEmpty);
+ if !garage.bucket_helper().is_bucket_empty(bucket_id).await? {
+ return Err(CommonError::BucketNotEmpty.into());
}
// --- done checking, now commit ---
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4e94d887..0fc16993 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -18,8 +18,8 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::error::*;
-use crate::helpers::{parse_bucket_key, resolve_bucket};
+use crate::helpers::parse_bucket_key;
+use crate::s3::error::*;
use crate::s3::put::{decode_upload_id, get_headers};
use crate::s3::xml::{self as s3_xml, xmlns_tag};
@@ -201,8 +201,8 @@ pub async fn handle_upload_part_copy(
let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size)
.map_err(|e| (e, source_version_meta.size))?;
if ranges.len() != 1 {
- return Err(Error::BadRequest(
- "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(),
+ return Err(Error::bad_request(
+ "Invalid x-amz-copy-source-range header: exactly 1 range must be given",
));
} else {
ranges.pop().unwrap()
@@ -230,8 +230,8 @@ pub async fn handle_upload_part_copy(
// This is only for small files, we don't bother handling this.
// (in AWS UploadPartCopy works for parts at least 5MB which
// is never the case of an inline object)
- return Err(Error::BadRequest(
- "Source object is too small (minimum part size is 5Mb)".into(),
+ return Err(Error::bad_request(
+ "Source object is too small (minimum part size is 5Mb)",
));
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
@@ -250,7 +250,7 @@ pub async fn handle_upload_part_copy(
// Check this part number hasn't yet been uploaded
if let Some(dv) = dest_version {
if dv.has_part_number(part_number) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Part number {} has already been uploaded",
part_number
)));
@@ -413,10 +413,13 @@ async fn get_copy_source(
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
- let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?;
+ let source_bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&source_bucket.to_string(), api_key)
+ .await?;
if !api_key.allow_read(&source_bucket_id) {
- return Err(Error::Forbidden(format!(
+ return Err(Error::forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
)));
@@ -536,8 +539,8 @@ impl CopyPreconditionHeaders {
(None, None, None, Some(ims)) => v_date > *ims,
(None, None, None, None) => true,
_ => {
- return Err(Error::BadRequest(
- "Invalid combination of x-amz-copy-source-if-xxxxx headers".into(),
+ return Err(Error::bad_request(
+ "Invalid combination of x-amz-copy-source-if-xxxxx headers",
))
}
};
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 37ea2e43..c7273464 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -9,13 +9,12 @@ use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::error::*;
+use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
-use garage_table::*;
use garage_util::data::*;
pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
@@ -48,14 +47,11 @@ pub async fn handle_delete_cors(
bucket_id: Uuid,
) -> Result<Response<Body>, Error> {
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
param.cors_config.update(None);
garage.bucket_table.insert(&bucket).await?;
@@ -78,14 +74,11 @@ pub async fn handle_put_cors(
}
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
@@ -119,12 +112,7 @@ pub async fn handle_options_s3api(
let helper = garage.bucket_helper();
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
if let Some(id) = bucket_id {
- let bucket = garage
- .bucket_table
- .get(&EmptyKey, &id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
+ let bucket = garage.bucket_helper().get_existing_bucket(id).await?;
handle_options_for_bucket(req, &bucket)
} else {
// If there is a bucket name in the request, but that name
@@ -185,7 +173,7 @@ pub fn handle_options_for_bucket(
}
}
- Err(Error::Forbidden("This CORS request is not allowed.".into()))
+ Err(Error::forbidden("This CORS request is not allowed."))
}
pub fn find_matching_cors_rule<'a>(
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 1e3f1249..5065b285 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -8,7 +8,7 @@ use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
-use crate::error::*;
+use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
diff --git a/src/api/error.rs b/src/api/s3/error.rs
index 4b7254d2..ac632540 100644
--- a/src/api/error.rs
+++ b/src/api/s3/error.rs
@@ -2,34 +2,24 @@ use std::convert::TryInto;
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{HeaderMap, StatusCode};
+use hyper::{Body, HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
-use garage_util::error::Error as GarageError;
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
use crate::s3::xml as s3_xml;
+use crate::signature::error::Error as SignatureError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
- // Category: internal error
- /// Error related to deeper parts of Garage
- #[error(display = "Internal error: {}", _0)]
- InternalError(#[error(source)] GarageError),
-
- /// Error related to Hyper
- #[error(display = "Internal error (Hyper error): {}", _0)]
- Hyper(#[error(source)] hyper::Error),
-
- /// Error related to HTTP
- #[error(display = "Internal error (HTTP error): {}", _0)]
- Http(#[error(source)] http::Error),
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
// Category: cannot process
- /// No proper api key was used, or the signature was invalid
- #[error(display = "Forbidden: {}", _0)]
- Forbidden(String),
-
/// Authorization Header Malformed
#[error(display = "Authorization header malformed, expected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
@@ -38,22 +28,10 @@ pub enum Error {
#[error(display = "Key not found")]
NoSuchKey,
- /// The bucket requested don't exists
- #[error(display = "Bucket not found")]
- NoSuchBucket,
-
/// The multipart upload requested don't exists
#[error(display = "Upload not found")]
NoSuchUpload,
- /// Tried to create a bucket that already exist
- #[error(display = "Bucket already exists")]
- BucketAlreadyExists,
-
- /// Tried to delete a non-empty bucket
- #[error(display = "Tried to delete a non-empty bucket")]
- BucketNotEmpty,
-
/// Precondition failed (e.g. x-amz-copy-source-if-match)
#[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed,
@@ -80,10 +58,6 @@ pub enum Error {
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUtf8String(#[error(source)] std::string::FromUtf8Error),
- /// Some base64 encoded data was badly encoded
- #[error(display = "Invalid base64: {}", _0)]
- InvalidBase64(#[error(source)] base64::DecodeError),
-
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)]
InvalidXml(String),
@@ -96,19 +70,34 @@ pub enum Error {
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
- /// The client sent an invalid request
- #[error(display = "Bad request: {}", _0)]
- BadRequest(String),
-
- /// The client asked for an invalid return format (invalid Accept header)
- #[error(display = "Not acceptable: {}", _0)]
- NotAcceptable(String),
-
/// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String),
}
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ e => Self::bad_request(format!("{}", e)),
+ }
+ }
+}
+
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
@@ -121,88 +110,67 @@ impl From<quick_xml::de::DeError> for Error {
}
}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
+impl From<SignatureError> for Error {
+ fn from(err: SignatureError) -> Self {
match err {
- HelperError::Internal(i) => Self::InternalError(i),
- HelperError::BadRequest(b) => Self::BadRequest(b),
+ SignatureError::Common(c) => Self::Common(c),
+ SignatureError::AuthorizationHeaderMalformed(c) => {
+ Self::AuthorizationHeaderMalformed(c)
+ }
+ SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
+ SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
impl From<multer::Error> for Error {
fn from(err: multer::Error) -> Self {
- Self::BadRequest(err.to_string())
+ Self::bad_request(err)
}
}
impl Error {
- /// Get the HTTP status code that best represents the meaning of the error for the client
- pub fn http_status_code(&self) -> StatusCode {
- match self {
- Error::NoSuchKey | Error::NoSuchBucket | Error::NoSuchUpload => StatusCode::NOT_FOUND,
- Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT,
- Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
- Error::Forbidden(_) => StatusCode::FORBIDDEN,
- Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
- Error::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
- ) => StatusCode::SERVICE_UNAVAILABLE,
- Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
- StatusCode::INTERNAL_SERVER_ERROR
- }
- Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
- Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
- _ => StatusCode::BAD_REQUEST,
- }
- }
-
pub fn aws_code(&self) -> &'static str {
match self {
+ Error::Common(c) => c.aws_code(),
Error::NoSuchKey => "NoSuchKey",
- Error::NoSuchBucket => "NoSuchBucket",
Error::NoSuchUpload => "NoSuchUpload",
- Error::BucketAlreadyExists => "BucketAlreadyExists",
- Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed",
Error::InvalidPart => "InvalidPart",
Error::InvalidPartOrder => "InvalidPartOrder",
Error::EntityTooSmall => "EntityTooSmall",
- Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented",
- Error::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
- ) => "ServiceUnavailable",
- Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
- _ => "InvalidRequest",
+ Error::InvalidXml(_) => "MalformedXML",
+ Error::InvalidRange(_) => "InvalidRange",
+ Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => {
+ "InvalidRequest"
+ }
}
}
+}
- pub fn aws_xml(&self, garage_region: &str, path: &str) -> String {
- let error = s3_xml::Error {
- code: s3_xml::Value(self.aws_code().to_string()),
- message: s3_xml::Value(format!("{}", self)),
- resource: Some(s3_xml::Value(path.to_string())),
- region: Some(s3_xml::Value(garage_region.to_string())),
- };
- s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
- r#"
-<?xml version="1.0" encoding="UTF-8"?>
-<Error>
- <Code>InternalError</Code>
- <Message>XML encoding of error failed</Message>
-</Error>
- "#
- .into()
- })
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchKey | Error::NoSuchUpload => StatusCode::NOT_FOUND,
+ Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
+ Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
+ Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
+ Error::AuthorizationHeaderMalformed(_)
+ | Error::InvalidPart
+ | Error::InvalidPartOrder
+ | Error::EntityTooSmall
+ | Error::InvalidXml(_)
+ | Error::InvalidUtf8Str(_)
+ | Error::InvalidUtf8String(_)
+ | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
+ }
}
- pub fn add_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
use hyper::header;
#[allow(clippy::single_match)]
match self {
@@ -217,68 +185,23 @@ impl Error {
_ => (),
}
}
-}
-
-/// Trait to map error to the Bad Request error code
-pub trait OkOrBadRequest {
- type S;
- fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
-}
-
-impl<T, E> OkOrBadRequest for Result<T, E>
-where
- E: std::fmt::Display,
-{
- type S = T;
- fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Ok(x) => Ok(x),
- Err(e) => Err(Error::BadRequest(format!("{}: {}", reason.as_ref(), e))),
- }
- }
-}
-
-impl<T> OkOrBadRequest for Option<T> {
- type S = T;
- fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Some(x) => Ok(x),
- None => Err(Error::BadRequest(reason.as_ref().to_string())),
- }
- }
-}
-
-/// Trait to map an error to an Internal Error code
-pub trait OkOrInternalError {
- type S;
- fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
-}
-
-impl<T, E> OkOrInternalError for Result<T, E>
-where
- E: std::fmt::Display,
-{
- type S = T;
- fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Ok(x) => Ok(x),
- Err(e) => Err(Error::InternalError(GarageError::Message(format!(
- "{}: {}",
- reason.as_ref(),
- e
- )))),
- }
- }
-}
-impl<T> OkOrInternalError for Option<T> {
- type S = T;
- fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Some(x) => Ok(x),
- None => Err(Error::InternalError(GarageError::Message(
- reason.as_ref().to_string(),
- ))),
- }
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = s3_xml::Error {
+ code: s3_xml::Value(self.aws_code().to_string()),
+ message: s3_xml::Value(format!("{}", self)),
+ resource: Some(s3_xml::Value(path.to_string())),
+ region: Some(s3_xml::Value(garage_region.to_string())),
+ };
+ Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
+ r#"
+<?xml version="1.0" encoding="UTF-8"?>
+<Error>
+ <Code>InternalError</Code>
+ <Message>XML encoding of error failed</Message>
+</Error>
+ "#
+ .into()
+ }))
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 3edf22a6..7fa1a177 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -17,7 +17,7 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::error::*;
+use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@@ -210,8 +210,8 @@ pub async fn handle_get(
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => {
- return Err(Error::BadRequest(
- "Cannot specify both partNumber and Range header".into(),
+ return Err(Error::bad_request(
+ "Cannot specify both partNumber and Range header",
));
}
(Some(pn), None) => {
@@ -302,9 +302,9 @@ async fn handle_get_range(
let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
Ok(resp_builder.body(body)?)
} else {
- None.ok_or_internal_error(
+ Err(Error::internal_error(
"Requested range not present in inline bytes when it should have been",
- )
+ ))
}
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index e2848c57..e5f486c8 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -16,8 +16,8 @@ use garage_model::s3::version_table::Version;
use garage_table::{EmptyKey, EnumerationOrder};
use crate::encoding::*;
-use crate::error::*;
use crate::helpers::key_after_prefix;
+use crate::s3::error::*;
use crate::s3::put as s3_put;
use crate::s3::xml as s3_xml;
@@ -582,13 +582,19 @@ impl ListObjectsQuery {
// representing the key to start with.
(Some(token), _) => match &token[..1] {
"[" => Ok(RangeBegin::IncludingKey {
- key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ key: String::from_utf8(
+ base64::decode(token[1..].as_bytes())
+ .ok_or_bad_request("Invalid continuation token")?,
+ )?,
fallback_key: None,
}),
"]" => Ok(RangeBegin::AfterKey {
- key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ key: String::from_utf8(
+ base64::decode(token[1..].as_bytes())
+ .ok_or_bad_request("Invalid continuation token")?,
+ )?,
}),
- _ => Err(Error::BadRequest("Invalid continuation token".to_string())),
+ _ => Err(Error::bad_request("Invalid continuation token")),
},
// StartAfter has defined semantics in the spec:
diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs
index 3f5c1915..7b56d4d8 100644
--- a/src/api/s3/mod.rs
+++ b/src/api/s3/mod.rs
@@ -1,4 +1,5 @@
pub mod api_server;
+pub mod error;
mod bucket;
mod copy;
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 86fa7880..dc640f43 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -14,8 +14,7 @@ use serde::Deserialize;
use garage_model::garage::Garage;
-use crate::error::*;
-use crate::helpers::resolve_bucket;
+use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
use crate::signature::payload::{parse_date, verify_v4};
@@ -48,9 +47,7 @@ pub async fn handle_post_object(
let field = if let Some(field) = multipart.next_field().await? {
field
} else {
- return Err(Error::BadRequest(
- "Request did not contain a file".to_owned(),
- ));
+ return Err(Error::bad_request("Request did not contain a file"));
};
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
name
@@ -66,14 +63,14 @@ pub async fn handle_post_object(
"tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
"acl" => {
if params.insert("x-amz-acl", content).is_some() {
- return Err(Error::BadRequest(
- "Field 'acl' provided more than one time".to_string(),
+ return Err(Error::bad_request(
+ "Field 'acl' provided more than one time",
));
}
}
_ => {
if params.insert(&name, content).is_some() {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Field '{}' provided more than one time",
name
)));
@@ -90,9 +87,7 @@ pub async fn handle_post_object(
.to_str()?;
let credential = params
.get("x-amz-credential")
- .ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?
.to_str()?;
let policy = params
.get("policy")
@@ -129,15 +124,16 @@ pub async fn handle_post_object(
)
.await?;
- let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket, &api_key)
+ .await?;
if !api_key.allow_write(&bucket_id) {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
+ return Err(Error::forbidden("Operation is not allowed for this key."));
}
- let decoded_policy = base64::decode(&policy)?;
+ let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
@@ -145,9 +141,7 @@ pub async fn handle_post_object(
.ok_or_bad_request("Invalid expiration date")?
.into();
if Utc::now() - expiration > Duration::zero() {
- return Err(Error::BadRequest(
- "Expiration date is in the paste".to_string(),
- ));
+ return Err(Error::bad_request("Expiration date is in the paste"));
}
let mut conditions = decoded_policy.into_conditions()?;
@@ -159,7 +153,7 @@ pub async fn handle_post_object(
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| {
- Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
@@ -169,7 +163,7 @@ pub async fn handle_post_object(
}
};
if !ok {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
@@ -178,7 +172,7 @@ pub async fn handle_post_object(
}
"key" => {
let conds = conditions.params.remove("key").ok_or_else(|| {
- Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
@@ -186,7 +180,7 @@ pub async fn handle_post_object(
Operation::StartsWith(s) => key.starts_with(&s),
};
if !ok {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
@@ -201,7 +195,7 @@ pub async fn handle_post_object(
continue;
}
let conds = conditions.params.remove(&param_key).ok_or_else(|| {
- Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
@@ -209,7 +203,7 @@ pub async fn handle_post_object(
Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()),
};
if !ok {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
@@ -220,7 +214,7 @@ pub async fn handle_post_object(
}
if let Some((param_key, _)) = conditions.params.iter().next() {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' is required in policy, but no value was provided",
param_key
)));
@@ -326,7 +320,7 @@ impl Policy {
match condition {
PolicyCondition::Equal(map) => {
if map.len() != 1 {
- return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ return Err(Error::bad_request("Invalid policy item"));
}
let (mut k, v) = map.into_iter().next().expect("size was verified");
k.make_ascii_lowercase();
@@ -334,7 +328,7 @@ impl Policy {
}
PolicyCondition::OtherOp([cond, mut key, value]) => {
if key.remove(0) != '$' {
- return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ return Err(Error::bad_request("Invalid policy item"));
}
key.make_ascii_lowercase();
match cond.as_str() {
@@ -347,7 +341,7 @@ impl Policy {
.or_default()
.push(Operation::StartsWith(value));
}
- _ => return Err(Error::BadRequest("Invalid policy item".to_owned())),
+ _ => return Err(Error::bad_request("Invalid policy item")),
}
}
PolicyCondition::SizeRange(key, min, max) => {
@@ -355,7 +349,7 @@ impl Policy {
length.0 = length.0.max(min);
length.1 = length.1.min(max);
} else {
- return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ return Err(Error::bad_request("Invalid policy item"));
}
}
}
@@ -420,15 +414,15 @@ where
self.read += bytes.len() as u64;
// optimization to fail early when we know before the end it's too long
if self.length.end() < &self.read {
- return Poll::Ready(Some(Err(Error::BadRequest(
- "File size does not match policy".to_owned(),
+ return Poll::Ready(Some(Err(Error::bad_request(
+ "File size does not match policy",
))));
}
}
Poll::Ready(None) => {
if !self.length.contains(&self.read) {
- return Poll::Ready(Some(Err(Error::BadRequest(
- "File size does not match policy".to_owned(),
+ return Poll::Ready(Some(Err(Error::bad_request(
+ "File size does not match policy",
))));
}
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 89aa8d84..8b06ef3f 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -19,7 +19,7 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::error::*;
+use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
@@ -183,8 +183,8 @@ fn ensure_checksum_matches(
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
- return Err(Error::BadRequest(
- "Unable to validate x-amz-content-sha256".to_string(),
+ return Err(Error::bad_request(
+ "Unable to validate x-amz-content-sha256",
));
} else {
trace!("Successfully validated x-amz-content-sha256");
@@ -192,9 +192,7 @@ fn ensure_checksum_matches(
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
- return Err(Error::BadRequest(
- "Unable to validate content-md5".to_string(),
- ));
+ return Err(Error::bad_request("Unable to validate content-md5"));
} else {
trace!("Successfully validated content-md5");
}
@@ -428,7 +426,7 @@ pub async fn handle_put_part(
// Check part hasn't already been uploaded
if let Some(v) = version {
if v.has_part_number(part_number) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Part number {} has already been uploaded",
part_number
)));
@@ -513,7 +511,7 @@ pub async fn handle_complete_multipart_upload(
let version = version.ok_or(Error::NoSuchKey)?;
if version.blocks.is_empty() {
- return Err(Error::BadRequest("No data was uploaded".to_string()));
+ return Err(Error::bad_request("No data was uploaded"));
}
let headers = match object_version.state {
@@ -574,8 +572,8 @@ pub async fn handle_complete_multipart_upload(
.map(|x| x.part_number)
.eq(block_parts.into_iter());
if !same_parts {
- return Err(Error::BadRequest(
- "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
+ return Err(Error::bad_request(
+ "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again."
));
}
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
index 0525c649..44f581ff 100644
--- a/src/api/s3/router.rs
+++ b/src/api/s3/router.rs
@@ -1,5 +1,3 @@
-use crate::error::{Error, OkOrBadRequest};
-
use std::borrow::Cow;
use hyper::header::HeaderValue;
@@ -7,6 +5,7 @@ use hyper::{HeaderMap, Method, Request};
use crate::helpers::Authorization;
use crate::router_macros::{generateQueryParameters, router_match};
+use crate::s3::error::*;
router_match! {@func
@@ -343,7 +342,7 @@ impl Endpoint {
Method::POST => Self::from_post(key, &mut query)?,
Method::PUT => Self::from_put(key, &mut query, req.headers())?,
Method::DELETE => Self::from_delete(key, &mut query)?,
- _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ _ => return Err(Error::bad_request("Unknown method")),
};
if let Some(message) = query.nonempty_message() {
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index 561130dc..77738971 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -4,13 +4,12 @@ use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::error::*;
+use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
-use garage_table::*;
use garage_util::data::*;
pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> {
@@ -47,14 +46,11 @@ pub async fn handle_delete_website(
bucket_id: Uuid,
) -> Result<Response<Body>, Error> {
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
param.website_config.update(None);
garage.bucket_table.insert(&bucket).await?;
@@ -77,14 +73,11 @@ pub async fn handle_put_website(
}
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
@@ -176,8 +169,8 @@ impl WebsiteConfiguration {
|| self.index_document.is_some()
|| self.routing_rules.is_some())
{
- return Err(Error::BadRequest(
- "Bad XML: can't have RedirectAllRequestsTo and other fields".to_owned(),
+ return Err(Error::bad_request(
+ "Bad XML: can't have RedirectAllRequestsTo and other fields",
));
}
if let Some(ref ed) = self.error_document {
@@ -222,8 +215,8 @@ impl WebsiteConfiguration {
impl Key {
pub fn validate(&self) -> Result<(), Error> {
if self.key.0.is_empty() {
- Err(Error::BadRequest(
- "Bad XML: error document specified but empty".to_owned(),
+ Err(Error::bad_request(
+ "Bad XML: error document specified but empty",
))
} else {
Ok(())
@@ -234,8 +227,8 @@ impl Key {
impl Suffix {
pub fn validate(&self) -> Result<(), Error> {
if self.suffix.0.is_empty() | self.suffix.0.contains('/') {
- Err(Error::BadRequest(
- "Bad XML: index document is empty or contains /".to_owned(),
+ Err(Error::bad_request(
+ "Bad XML: index document is empty or contains /",
))
} else {
Ok(())
@@ -247,7 +240,7 @@ impl Target {
pub fn validate(&self) -> Result<(), Error> {
if let Some(ref protocol) = self.protocol {
if protocol.0 != "http" && protocol.0 != "https" {
- return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned()));
+ return Err(Error::bad_request("Bad XML: invalid protocol"));
}
}
Ok(())
@@ -269,19 +262,19 @@ impl Redirect {
pub fn validate(&self, has_prefix: bool) -> Result<(), Error> {
if self.replace_prefix.is_some() {
if self.replace_full.is_some() {
- return Err(Error::BadRequest(
- "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set".to_owned(),
+ return Err(Error::bad_request(
+ "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set",
));
}
if !has_prefix {
- return Err(Error::BadRequest(
- "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't".to_owned(),
+ return Err(Error::bad_request(
+ "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't",
));
}
}
if let Some(ref protocol) = self.protocol {
if protocol.0 != "http" && protocol.0 != "https" {
- return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned()));
+ return Err(Error::bad_request("Bad XML: invalid protocol"));
}
}
// TODO there are probably more invalide cases, but which ones?
diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs
index 75ec4559..111657a0 100644
--- a/src/api/s3/xml.rs
+++ b/src/api/s3/xml.rs
@@ -1,7 +1,7 @@
use quick_xml::se::to_string;
use serde::{Deserialize, Serialize, Serializer};
-use crate::Error as ApiError;
+use crate::s3::error::Error as ApiError;
pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> {
let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string();
diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs
new file mode 100644
index 00000000..f5a067bd
--- /dev/null
+++ b/src/api/signature/error.rs
@@ -0,0 +1,36 @@
+use err_derive::Error;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ /// Authorization Header Malformed
+ #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ AuthorizationHeaderMalformed(String),
+
+ // Category: bad request
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
+ #[error(display = "Invalid UTF-8: {}", _0)]
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
+
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs
index 5646f4fa..dd5b590c 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/signature/mod.rs
@@ -4,11 +4,12 @@ use sha2::Sha256;
use garage_util::data::{sha256sum, Hash};
-use crate::error::*;
-
+pub mod error;
pub mod payload;
pub mod streaming;
+use error::*;
+
pub const SHORT_DATE: &str = "%Y%m%d";
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
@@ -16,7 +17,7 @@ type HmacSha256 = Hmac<Sha256>;
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
- return Err(Error::BadRequest(
+ return Err(Error::bad_request(
"Request content hash does not match signed hash".to_string(),
));
}
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index 9137dd2d..4c7934e5 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -15,7 +15,7 @@ use super::LONG_DATETIME;
use super::{compute_scope, signing_hmac};
use crate::encoding::uri_encode;
-use crate::error::*;
+use crate::signature::error::*;
pub async fn check_payload_signature(
garage: &Garage,
@@ -105,7 +105,7 @@ fn parse_authorization(
let (auth_kind, rest) = authorization.split_at(first_space);
if auth_kind != "AWS4-HMAC-SHA256" {
- return Err(Error::BadRequest("Unsupported authorization method".into()));
+ return Err(Error::bad_request("Unsupported authorization method"));
}
let mut auth_params = HashMap::new();
@@ -129,10 +129,11 @@ fn parse_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")
+ .map_err(Error::from)
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::hours(24) {
- return Err(Error::BadRequest("Date is too old".to_string()));
+ return Err(Error::bad_request("Date is too old".to_string()));
}
let auth = Authorization {
@@ -156,7 +157,7 @@ fn parse_query_authorization(
headers: &HashMap<String, String>,
) -> Result<Authorization, Error> {
if algorithm != "AWS4-HMAC-SHA256" {
- return Err(Error::BadRequest(
+ return Err(Error::bad_request(
"Unsupported authorization method".to_string(),
));
}
@@ -179,10 +180,10 @@ fn parse_query_authorization(
.get("x-amz-expires")
.ok_or_bad_request("X-Amz-Expires not found in query parameters")?
.parse()
- .map_err(|_| Error::BadRequest("X-Amz-Expires is not a number".to_string()))?;
+ .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?;
if duration > 7 * 24 * 3600 {
- return Err(Error::BadRequest(
+ return Err(Error::bad_request(
"X-Amz-Exprires may not exceed a week".to_string(),
));
}
@@ -190,10 +191,11 @@ fn parse_query_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")
+ .map_err(Error::from)
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::seconds(duration) {
- return Err(Error::BadRequest("Date is too old".to_string()));
+ return Err(Error::bad_request("Date is too old".to_string()));
}
Ok(Authorization {
@@ -301,7 +303,7 @@ pub async fn verify_v4(
.get(&EmptyKey, &key_id)
.await?
.filter(|k| !k.state.is_deleted())
- .ok_or_else(|| Error::Forbidden(format!("No such key: {}", &key_id)))?;
+ .ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?;
let key_p = key.params().unwrap();
let mut hmac = signing_hmac(
@@ -314,7 +316,7 @@ pub async fn verify_v4(
hmac.update(payload);
let our_signature = hex::encode(hmac.finalize().into_bytes());
if signature != our_signature {
- return Err(Error::Forbidden("Invalid signature".to_string()));
+ return Err(Error::forbidden("Invalid signature".to_string()));
}
Ok(key)
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index ded9d993..c8358c4f 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -12,7 +12,7 @@ use garage_util::data::Hash;
use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
-use crate::error::*;
+use crate::signature::error::*;
pub fn parse_streaming_body(
api_key: &Key,
@@ -87,7 +87,7 @@ fn compute_streaming_payload_signature(
let mut hmac = signing_hmac.clone();
hmac.update(string_to_sign.as_bytes());
- Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
+ Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?)
}
mod payload {
@@ -163,10 +163,10 @@ impl From<SignedPayloadStreamError> for Error {
match err {
SignedPayloadStreamError::Stream(e) => e,
SignedPayloadStreamError::InvalidSignature => {
- Error::BadRequest("Invalid payload signature".into())
+ Error::bad_request("Invalid payload signature")
}
SignedPayloadStreamError::Message(e) => {
- Error::BadRequest(format!("Chunk format error: {}", e))
+ Error::bad_request(format!("Chunk format error: {}", e))
}
}
}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 3b69d7bc..902f67f8 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -27,10 +27,8 @@ garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
-garage_admin = { version = "0.7.0", path = "../admin" }
bytes = "1.0"
-git-version = "0.3.4"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
@@ -54,6 +52,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
+opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
+opentelemetry-prometheus = "0.10"
+opentelemetry-otlp = "0.10"
+prometheus = "0.13"
+
[dev-dependencies]
aws-sdk-s3 = "0.8"
chrono = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index af0c3f22..bc1f494a 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -22,7 +22,6 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
-use garage_model::s3::object_table::ObjectFilter;
use crate::cli::*;
use crate::repair::Repair;
@@ -213,18 +212,7 @@ impl AdminRpcHandler {
}
// Check bucket is empty
- let objects = self
- .garage
- .object_table
- .get_range(
- &bucket_id,
- None,
- Some(ObjectFilter::IsData),
- 10,
- EnumerationOrder::Forward,
- )
- .await?;
- if !objects.is_empty() {
+ if !helper.is_bucket_empty(bucket_id).await? {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name
@@ -261,6 +249,7 @@ impl AdminRpcHandler {
async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.existing_bucket)
@@ -268,7 +257,7 @@ impl AdminRpcHandler {
.ok_or_bad_request("Bucket not found")?;
if let Some(key_pattern) = &query.local {
- let key = helper.get_existing_matching_key(key_pattern).await?;
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
helper
.set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
@@ -290,9 +279,10 @@ impl AdminRpcHandler {
async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
if let Some(key_pattern) = &query.local {
- let key = helper.get_existing_matching_key(key_pattern).await?;
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
let bucket_id = key
.state
@@ -331,12 +321,15 @@ impl AdminRpcHandler {
async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
- let key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
let allow_read = query.read || key.allow_read(&bucket_id);
let allow_write = query.write || key.allow_write(&bucket_id);
@@ -363,12 +356,15 @@ impl AdminRpcHandler {
async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
- let key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
let allow_read = !query.read && key.allow_read(&bucket_id);
let allow_write = !query.write && key.allow_write(&bucket_id);
@@ -469,7 +465,7 @@ impl AdminRpcHandler {
async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
let key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
self.key_info_result(key).await
@@ -484,7 +480,7 @@ impl AdminRpcHandler {
async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
key.params_mut()
@@ -496,9 +492,11 @@ impl AdminRpcHandler {
}
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
- let mut key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
if !query.yes {
return Err(Error::BadRequest(
@@ -506,32 +504,7 @@ impl AdminRpcHandler {
));
}
- let state = key.state.as_option_mut().unwrap();
-
- // --- done checking, now commit ---
- // (the step at unset_local_bucket_alias will fail if a bucket
- // does not have another alias, the deletion will be
- // interrupted in the middle if that happens)
-
- // 1. Delete local aliases
- for (alias, _, to) in state.local_aliases.items().iter() {
- if let Some(bucket_id) = to {
- helper
- .unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
- .await?;
- }
- }
-
- // 2. Remove permissions on all authorized buckets
- for (ab_id, _auth) in state.authorized_buckets.items().iter() {
- helper
- .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
- }
-
- // 3. Actually delete key
- key.state = Deletable::delete();
- self.garage.key_table.insert(&key).await?;
+ key_helper.delete_key(&mut key).await?;
Ok(AdminRpc::Ok(format!(
"Key {} was deleted successfully.",
@@ -542,7 +515,7 @@ impl AdminRpcHandler {
async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
@@ -555,7 +528,7 @@ impl AdminRpcHandler {
async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
@@ -696,11 +669,7 @@ impl AdminRpcHandler {
writeln!(
&mut ret,
"\nGarage version: {}",
- option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
- prefix = "git:",
- cargo_prefix = "cargo:",
- fallback = "unknown"
- ))
+ self.garage.system.garage_version(),
)
.unwrap();
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 0247c32b..db0af57c 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,5 +1,4 @@
use garage_util::crdt::Crdt;
-use garage_util::data::*;
use garage_util::error::*;
use garage_util::formater::format_table;
@@ -212,31 +211,9 @@ pub async fn cmd_apply_layout(
rpc_host: NodeID,
apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match apply_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.roles.merge(&layout.staging);
-
- if !layout.calculate_partition_assignation() {
- return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
- }
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
-
- layout.version += 1;
+ let layout = layout.apply_staged_changes(apply_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -251,25 +228,9 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match revert_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.version += 1;
+ let layout = layout.revert_staged_changes(revert_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e898e680..bd09b6ea 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -8,6 +8,7 @@ mod admin;
mod cli;
mod repair;
mod server;
+mod tracing_setup;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -141,6 +142,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await {
Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))),
Err(HelperError::BadRequest(b)) => Err(Error::Message(b)),
+ Err(e) => Err(Error::Message(format!("{}", e))),
Ok(x) => Ok(x),
}
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 24bb25b3..b58ad286 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,8 +6,7 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_admin::metrics::*;
-use garage_admin::tracing_setup::*;
+use garage_api::admin::api_server::AdminApiServer;
use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::run_web_server;
@@ -16,6 +15,7 @@ use garage_web::run_web_server;
use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
+use crate::tracing_setup::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
- info!("Initialize admin web server and metric backend...");
- let admin_server_init = AdminServer::init();
-
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
init_tracing(&export_to, garage.system.id)?;
}
+ info!("Initialize Admin API server and metrics collector...");
+ let admin_server = AdminApiServer::new(garage.clone());
+
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
@@ -80,39 +80,41 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
wait_from(watch_cancel.clone()),
));
- let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr {
- info!("Configure and run admin web server...");
- Some(tokio::spawn(
- admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())),
- ))
- } else {
- None
- };
+ info!("Launching Admin API server...");
+ let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone())));
// Stuff runs
// When a cancel signal is sent, stuff stops
if let Err(e) = s3_api_server.await? {
warn!("S3 API server exited with error: {}", e);
+ } else {
+ info!("S3 API server exited without error.");
}
#[cfg(feature = "k2v")]
if let Err(e) = k2v_api_server.await? {
warn!("K2V API server exited with error: {}", e);
+ } else {
+ info!("K2V API server exited without error.");
}
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
+ } else {
+ info!("Web server exited without error.");
}
- if let Some(a) = admin_server {
- if let Err(e) = a.await? {
- warn!("Admin web server exited with error: {}", e);
- }
+ if let Err(e) = admin_server.await? {
+ warn!("Admin web server exited with error: {}", e);
+ } else {
+ info!("Admin API server exited without error.");
}
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();
+ opentelemetry::global::shutdown_tracer_provider();
// Await for netapp RPC system to end
run_system.await?;
+ info!("Netapp exited");
// Drop all references so that stuff can terminate properly
drop(garage);
diff --git a/src/admin/tracing_setup.rs b/src/garage/tracing_setup.rs
index 55fc4094..55fc4094 100644
--- a/src/admin/tracing_setup.rs
+++ b/src/garage/tracing_setup.rs
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 03e21f8a..2f99bd68 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -191,6 +191,10 @@ impl Garage {
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
+
+ pub fn key_helper(&self) -> helper::key::KeyHelper {
+ helper::key::KeyHelper(self)
+ }
}
#[cfg(feature = "k2v")]
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 54d2f97b..130ba5be 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -1,15 +1,18 @@
-use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::time::*;
+use garage_table::util::*;
+
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::garage::Garage;
use crate::helper::error::*;
-use crate::key_table::{Key, KeyFilter};
+use crate::helper::key::KeyHelper;
+use crate::key_table::*;
use crate::permission::BucketKeyPerm;
+use crate::s3::object_table::ObjectFilter;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@@ -49,6 +52,23 @@ impl<'a> BucketHelper<'a> {
}
}
+ #[allow(clippy::ptr_arg)]
+ pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result<Uuid, Error> {
+ let api_key_params = api_key
+ .state
+ .as_option()
+ .ok_or_message("Key should not be deleted at this point")?;
+
+ if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
+ Ok(*bucket_id)
+ } else {
+ Ok(self
+ .resolve_global_bucket_name(bucket_name)
+ .await?
+ .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?)
+ }
+ }
+
/// Returns a Bucket if it is present in bucket table,
/// even if it is in deleted state. Querying a non-existing
/// bucket ID returns an internal error.
@@ -71,64 +91,7 @@ impl<'a> BucketHelper<'a> {
.get(&EmptyKey, &bucket_id)
.await?
.filter(|b| !b.is_deleted())
- .ok_or_bad_request(format!(
- "Bucket {:?} does not exist or has been deleted",
- bucket_id
- ))
- }
-
- /// Returns a Key if it is present in key table,
- /// even if it is in deleted state. Querying a non-existing
- /// key ID returns an internal error.
- pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> {
- Ok(self
- .0
- .key_table
- .get(&EmptyKey, key_id)
- .await?
- .ok_or_message(format!("Key {} does not exist", key_id))?)
- }
-
- /// Returns a Key if it is present in key table,
- /// only if it is in non-deleted state.
- /// Querying a non-existing key ID or a deleted key
- /// returns a bad request error.
- pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> {
- self.0
- .key_table
- .get(&EmptyKey, key_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id))
- }
-
- /// Returns a Key if it is present in key table,
- /// looking it up by key ID or by a match on its name,
- /// only if it is in non-deleted state.
- /// Querying a non-existing key ID or a deleted key
- /// returns a bad request error.
- pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> {
- let candidates = self
- .0
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),
- 10,
- EnumerationOrder::Forward,
- )
- .await?
- .into_iter()
- .collect::<Vec<_>>();
- if candidates.len() != 1 {
- Err(Error::BadRequest(format!(
- "{} matching keys",
- candidates.len()
- )))
- } else {
- Ok(candidates.into_iter().next().unwrap())
- }
+ .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id)))
}
/// Sets a new alias for a bucket in global namespace.
@@ -142,10 +105,7 @@ impl<'a> BucketHelper<'a> {
alias_name: &String,
) -> Result<(), Error> {
if !is_valid_bucket_name(alias_name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- alias_name, INVALID_BUCKET_NAME_MESSAGE
- )));
+ return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.get_existing_bucket(bucket_id).await?;
@@ -176,7 +136,7 @@ impl<'a> BucketHelper<'a> {
let alias = match alias {
None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id))
- .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?,
+ .ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?,
Some(mut a) => {
a.state = Lww::raw(alias_ts, Some(bucket_id));
a
@@ -264,7 +224,7 @@ impl<'a> BucketHelper<'a> {
.bucket_alias_table
.get(&EmptyKey, alias_name)
.await?
- .ok_or_message(format!("Alias {} not found", alias_name))?;
+ .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?;
// Checks ok, remove alias
let alias_ts = match bucket.state.as_option() {
@@ -303,15 +263,14 @@ impl<'a> BucketHelper<'a> {
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
+ let key_helper = KeyHelper(self.0);
+
if !is_valid_bucket_name(alias_name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- alias_name, INVALID_BUCKET_NAME_MESSAGE
- )));
+ return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.get_existing_bucket(bucket_id).await?;
- let mut key = self.get_existing_key(key_id).await?;
+ let mut key = key_helper.get_existing_key(key_id).await?;
let mut key_param = key.state.as_option_mut().unwrap();
@@ -360,8 +319,10 @@ impl<'a> BucketHelper<'a> {
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
+ let key_helper = KeyHelper(self.0);
+
let mut bucket = self.get_existing_bucket(bucket_id).await?;
- let mut key = self.get_existing_key(key_id).await?;
+ let mut key = key_helper.get_existing_key(key_id).await?;
let mut bucket_p = bucket.state.as_option_mut().unwrap();
@@ -429,8 +390,10 @@ impl<'a> BucketHelper<'a> {
key_id: &String,
mut perm: BucketKeyPerm,
) -> Result<(), Error> {
+ let key_helper = KeyHelper(self.0);
+
let mut bucket = self.get_internal_bucket(bucket_id).await?;
- let mut key = self.get_internal_key(key_id).await?;
+ let mut key = key_helper.get_internal_key(key_id).await?;
if let Some(bstate) = bucket.state.as_option() {
if let Some(kp) = bstate.authorized_keys.get(key_id) {
@@ -466,4 +429,47 @@ impl<'a> BucketHelper<'a> {
Ok(())
}
+
+ pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> {
+ let objects = self
+ .0
+ .object_table
+ .get_range(
+ &bucket_id,
+ None,
+ Some(ObjectFilter::IsData),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ if !objects.is_empty() {
+ return Ok(false);
+ }
+
+ #[cfg(feature = "k2v")]
+ {
+ use garage_rpc::ring::Ring;
+ use std::sync::Arc;
+
+ let ring: Arc<Ring> = self.0.system.ring.borrow().clone();
+ let k2vindexes = self
+ .0
+ .k2v
+ .counter_table
+ .table
+ .get_range(
+ &bucket_id,
+ None,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ if !k2vindexes.is_empty() {
+ return Ok(false);
+ }
+ }
+
+ Ok(true)
+ }
}
diff --git a/src/model/helper/error.rs b/src/model/helper/error.rs
index 30b2ba32..3ca8f55c 100644
--- a/src/model/helper/error.rs
+++ b/src/model/helper/error.rs
@@ -10,6 +10,16 @@ pub enum Error {
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
+
+ /// Bucket name is not valid according to AWS S3 specs
+ #[error(display = "Invalid bucket name: {}", _0)]
+ InvalidBucketName(String),
+
+ #[error(display = "Access key not found: {}", _0)]
+ NoSuchAccessKey(String),
+
+ #[error(display = "Bucket not found: {}", _0)]
+ NoSuchBucket(String),
}
impl From<netapp::error::Error> for Error {
diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs
new file mode 100644
index 00000000..c1a8e974
--- /dev/null
+++ b/src/model/helper/key.rs
@@ -0,0 +1,102 @@
+use garage_table::util::*;
+use garage_util::crdt::*;
+use garage_util::error::OkOrMessage;
+
+use crate::garage::Garage;
+use crate::helper::bucket::BucketHelper;
+use crate::helper::error::*;
+use crate::key_table::{Key, KeyFilter};
+use crate::permission::BucketKeyPerm;
+
+pub struct KeyHelper<'a>(pub(crate) &'a Garage);
+
+#[allow(clippy::ptr_arg)]
+impl<'a> KeyHelper<'a> {
+ /// Returns a Key if it is present in key table,
+ /// even if it is in deleted state. Querying a non-existing
+ /// key ID returns an internal error.
+ pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> {
+ Ok(self
+ .0
+ .key_table
+ .get(&EmptyKey, key_id)
+ .await?
+ .ok_or_message(format!("Key {} does not exist", key_id))?)
+ }
+
+ /// Returns a Key if it is present in key table,
+ /// only if it is in non-deleted state.
+ /// Querying a non-existing key ID or a deleted key
+ /// returns a bad request error.
+ pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> {
+ self.0
+ .key_table
+ .get(&EmptyKey, key_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string()))
+ }
+
+ /// Returns a Key if it is present in key table,
+ /// looking it up by key ID or by a match on its name,
+ /// only if it is in non-deleted state.
+ /// Querying a non-existing key ID or a deleted key
+ /// returns a bad request error.
+ pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> {
+ let candidates = self
+ .0
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .into_iter()
+ .collect::<Vec<_>>();
+ if candidates.len() != 1 {
+ Err(Error::BadRequest(format!(
+ "{} matching keys",
+ candidates.len()
+ )))
+ } else {
+ Ok(candidates.into_iter().next().unwrap())
+ }
+ }
+
+ /// Deletes an API access key
+ pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> {
+ let bucket_helper = BucketHelper(self.0);
+
+ let state = key.state.as_option_mut().unwrap();
+
+ // --- done checking, now commit ---
+ // (the step at unset_local_bucket_alias will fail if a bucket
+ // does not have another alias, the deletion will be
+ // interrupted in the middle if that happens)
+
+ // 1. Delete local aliases
+ for (alias, _, to) in state.local_aliases.items().iter() {
+ if let Some(bucket_id) = to {
+ bucket_helper
+ .unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
+ .await?;
+ }
+ }
+
+ // 2. Remove permissions on all authorized buckets
+ for (ab_id, _auth) in state.authorized_buckets.items().iter() {
+ bucket_helper
+ .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 3. Actually delete key
+ key.state = Deletable::delete();
+ self.0.key_table.insert(key).await?;
+
+ Ok(())
+ }
+}
diff --git a/src/model/helper/mod.rs b/src/model/helper/mod.rs
index 2f4e8898..dd947c86 100644
--- a/src/model/helper/mod.rs
+++ b/src/model/helper/mod.rs
@@ -1,2 +1,3 @@
pub mod bucket;
pub mod error;
+pub mod key;
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index bed7f44a..73328993 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,11 +15,11 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
-garage_admin = { version = "0.7.0", path = "../admin" }
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
+git-version = "0.3.4"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index b9c02c21..f517f36f 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::error::*;
use crate::ring::*;
@@ -100,6 +101,61 @@ impl ClusterLayout {
}
}
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.roles.merge(&self.staging);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+
+ if !self.calculate_partition_assignation() {
+ return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] {
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 68d94ea5..1d7c3ea4 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -312,6 +312,84 @@ impl System {
);
}
+ // ---- Administrative operations (directly available and
+ // also available through RPC) ----
+
+ pub fn garage_version(&self) -> &'static str {
+ option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ ))
+ }
+
+ pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
+ let node_status = self.node_status.read().unwrap();
+ let known_nodes = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| KnownNodeInfo {
+ id: n.id.into(),
+ addr: n.addr,
+ is_up: n.is_up(),
+ last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
+ status: node_status
+ .get(&n.id.into())
+ .cloned()
+ .map(|(_, st)| st)
+ .unwrap_or(NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ }),
+ })
+ .collect::<Vec<_>>();
+ known_nodes
+ }
+
+ pub fn get_cluster_layout(&self) -> ClusterLayout {
+ self.ring.borrow().layout.clone()
+ }
+
+ pub async fn update_cluster_layout(
+ self: &Arc<Self>,
+ layout: &ClusterLayout,
+ ) -> Result<(), Error> {
+ self.handle_advertise_cluster_layout(layout).await?;
+ Ok(())
+ }
+
+ pub async fn connect(&self, node: &str) -> Result<(), Error> {
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
+ Error::Message(format!(
+ "Unable to parse or resolve node specification: {}",
+ node
+ ))
+ })?;
+ let mut errors = vec![];
+ for ip in addrs.iter() {
+ match self
+ .netapp
+ .clone()
+ .try_connect(*ip, pubkey)
+ .await
+ .err_context(CONNECT_ERROR_MESSAGE)
+ {
+ Ok(()) => return Ok(()),
+ Err(e) => {
+ errors.push((*ip, e));
+ }
+ }
+ }
+ if errors.len() == 1 {
+ Err(Error::Message(errors[0].1.to_string()))
+ } else {
+ Err(Error::Message(format!("{:?}", errors)))
+ }
+ }
+
// ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
@@ -384,32 +462,11 @@ impl System {
self.local_status.swap(Arc::new(new_si));
}
+ // --- RPC HANDLERS ---
+
async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
- Error::Message(format!(
- "Unable to parse or resolve node specification: {}",
- node
- ))
- })?;
- let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
- Ok(()) => return Ok(SystemRpc::Ok),
- Err(e) => {
- errors.push((*ip, e));
- }
- }
- }
- return Err(Error::Message(format!(
- "Could not connect to specified peers. Errors: {:?}",
- errors
- )));
+ self.connect(node).await?;
+ Ok(SystemRpc::Ok)
}
fn handle_pull_cluster_layout(&self) -> SystemRpc {
@@ -418,28 +475,7 @@ impl System {
}
fn handle_get_known_nodes(&self) -> SystemRpc {
- let node_status = self.node_status.read().unwrap();
- let known_nodes = self
- .fullmesh
- .get_peer_list()
- .iter()
- .map(|n| KnownNodeInfo {
- id: n.id.into(),
- addr: n.addr,
- is_up: n.is_up(),
- last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
- status: node_status
- .get(&n.id.into())
- .cloned()
- .map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
- })
- .collect::<Vec<_>>();
+ let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
}
@@ -476,7 +512,7 @@ impl System {
}
async fn handle_advertise_cluster_layout(
- self: Arc<Self>,
+ self: &Arc<Self>,
adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await;
diff --git a/src/util/config.rs b/src/util/config.rs
index 4d66bfe4..99ebce31 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -121,6 +121,10 @@ pub struct WebConfig {
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
+ /// Bearer token to use to scrape metrics
+ pub metrics_token: Option<String>,
+ /// Bearer token to use to access Admin API endpoints
+ pub admin_token: Option<String>,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}
diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
index c155c3a8..91d24c7f 100644
--- a/src/util/crdt/lww_map.rs
+++ b/src/util/crdt/lww_map.rs
@@ -140,6 +140,11 @@ where
self.vals.clear();
}
+ /// Retain only values that match a certain predicate
+ pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) {
+ self.vals.retain(pred);
+ }
+
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
diff --git a/src/web/error.rs b/src/web/error.rs
index 55990e9d..bd8f17b5 100644
--- a/src/web/error.rs
+++ b/src/web/error.rs
@@ -2,57 +2,47 @@ use err_derive::Error;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode};
-use garage_util::error::Error as GarageError;
+use garage_api::generic_server::ApiError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)]
- ApiError(#[error(source)] garage_api::Error),
-
- // Category: internal error
- /// Error internal to garage
- #[error(display = "Internal error: {}", _0)]
- InternalError(#[error(source)] GarageError),
+ ApiError(garage_api::s3::error::Error),
/// The file does not exist
#[error(display = "Not found")]
NotFound,
- /// The request contained an invalid UTF-8 sequence in its path or in other parameters
- #[error(display = "Invalid UTF-8: {}", _0)]
- InvalidUtf8(#[error(source)] std::str::Utf8Error),
-
- /// The client send a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
-
/// The client sent a request without host, or with unsupported method
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
+impl<T> From<T> for Error
+where
+ garage_api::s3::error::Error: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::ApiError(garage_api::s3::error::Error::from(err))
+ }
+}
+
impl Error {
/// Transform errors into http status code
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
Error::ApiError(e) => e.http_status_code(),
- Error::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
- ) => StatusCode::SERVICE_UNAVAILABLE,
- Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
- _ => StatusCode::BAD_REQUEST,
+ Error::BadRequest(_) => StatusCode::BAD_REQUEST,
}
}
pub fn add_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
#[allow(clippy::single_match)]
match self {
- Error::ApiError(e) => e.add_headers(header_map),
+ Error::ApiError(e) => e.add_http_headers(header_map),
_ => (),
}
}
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 867adc51..c30d8957 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -18,9 +18,11 @@ use opentelemetry::{
use crate::error::*;
-use garage_api::error::{Error as ApiError, OkOrBadRequest, OkOrInternalError};
use garage_api::helpers::{authority_to_host, host_to_bucket};
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
+use garage_api::s3::error::{
+ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
+};
use garage_api::s3::get::{handle_get, handle_head};
use garage_model::garage::Garage;
@@ -207,7 +209,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
Method::OPTIONS => handle_options_for_bucket(req, &bucket),
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
- _ => Err(ApiError::BadRequest("HTTP method not supported".into())),
+ _ => Err(ApiError::bad_request("HTTP method not supported")),
}
.map_err(Error::from);
@@ -290,9 +292,7 @@ fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> {
let path_utf8 = percent_encoding::percent_decode_str(path).decode_utf8()?;
if !path_utf8.starts_with('/') {
- return Err(Error::BadRequest(
- "Path must start with a / (slash)".to_string(),
- ));
+ return Err(Error::BadRequest("Path must start with a / (slash)".into()));
}
match path_utf8.chars().last() {