diff options
Diffstat (limited to 'src')
60 files changed, 2249 insertions, 831 deletions
diff --git a/src/admin/Cargo.toml b/src/admin/Cargo.toml deleted file mode 100644 index 2db4bb08..00000000 --- a/src/admin/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "garage_admin" -version = "0.7.0" -authors = ["Maximilien Richer <code@mricher.fr>"] -edition = "2018" -license = "AGPL-3.0" -description = "Administration and metrics REST HTTP server for Garage" -repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" - -[lib] -path = "lib.rs" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -garage_util = { version = "0.7.0", path = "../util" } - -hex = "0.4" - -futures = "0.3" -futures-util = "0.3" -http = "0.2" -hyper = "0.14" -tracing = "0.1.30" - -opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } -opentelemetry-prometheus = "0.10" -opentelemetry-otlp = "0.10" -prometheus = "0.13" diff --git a/src/admin/lib.rs b/src/admin/lib.rs deleted file mode 100644 index b5b0775b..00000000 --- a/src/admin/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Crate for handling the admin and metric HTTP APIs -#[macro_use] -extern crate tracing; - -pub mod metrics; -pub mod tracing_setup; diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs deleted file mode 100644 index 7edc36c6..00000000 --- a/src/admin/metrics.rs +++ /dev/null @@ -1,146 +0,0 @@ -use std::convert::Infallible; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::SystemTime; - -use futures::future::*; -use hyper::{ - header::CONTENT_TYPE, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, -}; - -use opentelemetry::{ - global, - metrics::{BoundCounter, BoundValueRecorder}, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, -}; -use opentelemetry_prometheus::PrometheusExporter; - -use prometheus::{Encoder, TextEncoder}; - -use garage_util::error::Error as GarageError; -use garage_util::metrics::*; - -// serve_req on metric endpoint -async fn serve_req( - req: Request<Body>, - admin_server: Arc<AdminServer>, -) -> Result<Response<Body>, hyper::Error> { - debug!("Receiving request at path {}", req.uri()); - let request_start = SystemTime::now(); - - admin_server.metrics.http_counter.add(1); - - let response = match (req.method(), req.uri().path()) { - (&Method::GET, "/metrics") => { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let tracer = opentelemetry::global::tracer("garage"); - let metric_families = tracer.in_span("admin/gather_metrics", |_| { - admin_server.exporter.registry().gather() - }); - - encoder.encode(&metric_families, &mut buffer).unwrap(); - admin_server - .metrics - .http_body_gauge - .record(buffer.len() as u64); - - Response::builder() - .status(200) - .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap() - } - _ => Response::builder() - .status(404) - .body(Body::from("Not implemented")) - .unwrap(), - }; - - admin_server - .metrics - .http_req_histogram - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); - Ok(response) -} - -// AdminServer hold the admin server internal admin_server and the metric exporter -pub struct AdminServer { - exporter: PrometheusExporter, - metrics: AdminServerMetrics, -} - -// GarageMetricadmin_server holds the metrics counter definition for Garage -// FIXME: we would rather have that split up among the different libraries? -struct AdminServerMetrics { - http_counter: BoundCounter<u64>, - http_body_gauge: BoundValueRecorder<u64>, - http_req_histogram: BoundValueRecorder<f64>, -} - -impl AdminServer { - /// init initilialize the AdminServer and background metric server - pub fn init() -> AdminServer { - let exporter = opentelemetry_prometheus::exporter().init(); - let meter = global::meter("garage/admin_server"); - AdminServer { - exporter, - metrics: AdminServerMetrics { - http_counter: meter - .u64_counter("admin.http_requests_total") - .with_description("Total number of HTTP requests made.") - .init() - .bind(&[]), - http_body_gauge: meter - .u64_value_recorder("admin.http_response_size_bytes") - .with_description("The metrics HTTP response sizes in bytes.") - .init() - .bind(&[]), - http_req_histogram: meter - .f64_value_recorder("admin.http_request_duration_seconds") - .with_description("The HTTP request latencies in seconds.") - .init() - .bind(&[]), - }, - } - } - /// run execute the admin server on the designated HTTP port and listen for requests - pub async fn run( - self, - bind_addr: SocketAddr, - shutdown_signal: impl Future<Output = ()>, - ) -> Result<(), GarageError> { - let admin_server = Arc::new(self); - // For every connection, we must make a `Service` to handle all - // incoming HTTP requests on said connection. - let make_svc = make_service_fn(move |_conn| { - let admin_server = admin_server.clone(); - // This is the `Service` that will handle the connection. - // `service_fn` is a helper to convert a function that - // returns a Response into a `Service`. - async move { - Ok::<_, Infallible>(service_fn(move |req| { - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder("admin/request") - .with_trace_id(gen_trace_id()) - .start(&tracer); - - serve_req(req, admin_server.clone()) - .with_context(Context::current_with_span(span)) - })) - } - }); - - let server = Server::bind(&bind_addr).serve(make_svc); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Admin server listening on http://{}", bind_addr); - - graceful.await?; - Ok(()) - } -} diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 29b26e5e..db77cf38 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -54,6 +54,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] } url = "2.1" opentelemetry = "0.17" +opentelemetry-prometheus = "0.10" +opentelemetry-otlp = "0.10" +prometheus = "0.13" [features] k2v = [ "garage_util/k2v", "garage_model/k2v" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs new file mode 100644 index 00000000..098a54aa --- /dev/null +++ b/src/api/admin/api_server.rs @@ -0,0 +1,164 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use http::header::{ + ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, CONTENT_TYPE, +}; +use hyper::{Body, Request, Response}; + +use opentelemetry::trace::{SpanRef, Tracer}; +use opentelemetry_prometheus::PrometheusExporter; +use prometheus::{Encoder, TextEncoder}; + +use garage_model::garage::Garage; +use garage_util::error::Error as GarageError; + +use crate::generic_server::*; + +use crate::admin::bucket::*; +use crate::admin::cluster::*; +use crate::admin::error::*; +use crate::admin::key::*; +use crate::admin::router::{Authorization, Endpoint}; + +pub struct AdminApiServer { + garage: Arc<Garage>, + exporter: PrometheusExporter, + metrics_token: Option<String>, + admin_token: Option<String>, +} + +impl AdminApiServer { + pub fn new(garage: Arc<Garage>) -> Self { + let exporter = opentelemetry_prometheus::exporter().init(); + let cfg = &garage.config.admin; + let metrics_token = cfg + .metrics_token + .as_ref() + .map(|tok| format!("Bearer {}", tok)); + let admin_token = cfg + .admin_token + .as_ref() + .map(|tok| format!("Bearer {}", tok)); + Self { + garage, + exporter, + metrics_token, + admin_token, + } + } + + pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> { + if let Some(bind_addr) = self.garage.config.admin.api_bind_addr { + let region = self.garage.config.s3_api.s3_region.clone(); + ApiServer::new(region, self) + .run_server(bind_addr, shutdown_signal) + .await + } else { + Ok(()) + } + } + + fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { + Ok(Response::builder() + .status(204) + .header(ALLOW, "OPTIONS, GET, POST") + .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .body(Body::empty())?) + } + + fn handle_metrics(&self) -> Result<Response<Body>, Error> { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + self.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer))?) + } +} + +#[async_trait] +impl ApiHandler for AdminApiServer { + const API_NAME: &'static str = "admin"; + const API_NAME_DISPLAY: &'static str = "Admin"; + + type Endpoint = Endpoint; + type Error = Error; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> { + Endpoint::from_request(req) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: Endpoint, + ) -> Result<Response<Body>, Error> { + let expected_auth_header = match endpoint.authorization_type() { + Authorization::MetricsToken => self.metrics_token.as_ref(), + Authorization::AdminToken => self.admin_token.as_ref(), + }; + + if let Some(h) = expected_auth_header { + match req.headers().get("Authorization") { + None => Err(Error::forbidden("Authorization token must be provided")), + Some(v) if v.to_str().map(|hv| hv == h).unwrap_or(false) => Ok(()), + _ => Err(Error::forbidden("Invalid authorization token provided")), + }?; + } + + match endpoint { + Endpoint::Options => self.handle_options(&req), + Endpoint::Metrics => self.handle_metrics(), + Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, + // Layout + Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, + Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, + Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + // Keys + Endpoint::ListKeys => handle_list_keys(&self.garage).await, + Endpoint::GetKeyInfo { id, search } => { + handle_get_key_info(&self.garage, id, search).await + } + Endpoint::CreateKey => handle_create_key(&self.garage, req).await, + Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await, + Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await, + // Buckets + Endpoint::ListBuckets => handle_list_buckets(&self.garage).await, + Endpoint::GetBucketInfo { id, global_alias } => { + handle_get_bucket_info(&self.garage, id, global_alias).await + } + Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await, + Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await, + // Bucket-key permissions + Endpoint::BucketAllowKey => { + handle_bucket_change_key_perm(&self.garage, req, true).await + } + Endpoint::BucketDenyKey => { + handle_bucket_change_key_perm(&self.garage, req, false).await + } + } + } +} + +impl ApiEndpoint for Endpoint { + fn name(&self) -> &'static str { + Endpoint::name(self) + } + + fn add_span_attributes(&self, _span: SpanRef<'_>) {} +} diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs new file mode 100644 index 00000000..00450319 --- /dev/null +++ b/src/api/admin/bucket.rs @@ -0,0 +1,429 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::*; + +use garage_model::bucket_alias_table::*; +use garage_model::bucket_table::*; +use garage_model::garage::Garage; +use garage_model::permission::*; +use garage_model::s3::object_table::ObjectFilter; + +use crate::admin::error::*; +use crate::admin::key::ApiBucketKeyPerm; +use crate::common_error::CommonError; +use crate::helpers::parse_json_body; + +pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let buckets = garage + .bucket_table + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) + .await?; + + let res = buckets + .into_iter() + .map(|b| { + let state = b.state.as_option().unwrap(); + ListBucketResultItem { + id: hex::encode(b.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + local_aliases: state + .local_aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|((k, n), _, _)| BucketLocalAlias { + access_key_id: k.to_string(), + alias: n.to_string(), + }) + .collect::<Vec<_>>(), + } + }) + .collect::<Vec<_>>(); + + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct ListBucketResultItem { + id: String, + #[serde(rename = "globalAliases")] + global_aliases: Vec<String>, + #[serde(rename = "localAliases")] + local_aliases: Vec<BucketLocalAlias>, +} + +#[derive(Serialize)] +struct BucketLocalAlias { + #[serde(rename = "accessKeyId")] + access_key_id: String, + alias: String, +} + +pub async fn handle_get_bucket_info( + garage: &Arc<Garage>, + id: Option<String>, + global_alias: Option<String>, +) -> Result<Response<Body>, Error> { + let bucket_id = match (id, global_alias) { + (Some(id), None) => { + let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?; + Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")? + } + (None, Some(ga)) => garage + .bucket_helper() + .resolve_global_bucket_name(&ga) + .await? + .ok_or_bad_request("Bucket not found")?, + _ => { + return Err(Error::bad_request( + "Either id or globalAlias must be provided (but not both)", + )); + } + }; + + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + bucket_info_results(garage, bucket).await +} + +async fn bucket_info_results( + garage: &Arc<Garage>, + bucket: Bucket, +) -> Result<Response<Body>, Error> { + let mut relevant_keys = HashMap::new(); + for (k, _) in bucket + .state + .as_option() + .unwrap() + .authorized_keys + .items() + .iter() + { + if let Some(key) = garage + .key_table + .get(&EmptyKey, k) + .await? + .filter(|k| !k.is_deleted()) + { + if !key.state.is_deleted() { + relevant_keys.insert(k.clone(), key); + } + } + } + for ((k, _), _, _) in bucket + .state + .as_option() + .unwrap() + .local_aliases + .items() + .iter() + { + if relevant_keys.contains_key(k) { + continue; + } + if let Some(key) = garage.key_table.get(&EmptyKey, k).await? { + if !key.state.is_deleted() { + relevant_keys.insert(k.clone(), key); + } + } + } + + let state = bucket.state.as_option().unwrap(); + + let res = GetBucketInfoResult { + id: hex::encode(&bucket.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + keys: relevant_keys + .into_iter() + .map(|(_, key)| { + let p = key.state.as_option().unwrap(); + GetBucketInfoKey { + access_key_id: key.key_id, + name: p.name.get().to_string(), + permissions: p + .authorized_buckets + .get(&bucket.id) + .map(|p| ApiBucketKeyPerm { + read: p.allow_read, + write: p.allow_write, + owner: p.allow_owner, + }) + .unwrap_or_default(), + bucket_local_aliases: p + .local_aliases + .items() + .iter() + .filter(|(_, _, b)| *b == Some(bucket.id)) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + } + }) + .collect::<Vec<_>>(), + }; + + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct GetBucketInfoResult { + id: String, + #[serde(rename = "globalAliases")] + global_aliases: Vec<String>, + keys: Vec<GetBucketInfoKey>, +} + +#[derive(Serialize)] +struct GetBucketInfoKey { + #[serde(rename = "accessKeyId")] + access_key_id: String, + #[serde(rename = "name")] + name: String, + permissions: ApiBucketKeyPerm, + #[serde(rename = "bucketLocalAliases")] + bucket_local_aliases: Vec<String>, +} + +pub async fn handle_create_bucket( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<CreateBucketRequest>(req).await?; + + if let Some(ga) = &req.global_alias { + if !is_valid_bucket_name(ga) { + return Err(Error::bad_request(format!( + "{}: {}", + ga, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? { + if alias.state.get().is_some() { + return Err(CommonError::BucketAlreadyExists.into()); + } + } + } + + if let Some(la) = &req.local_alias { + if !is_valid_bucket_name(&la.alias) { + return Err(Error::bad_request(format!( + "{}: {}", + la.alias, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let key = garage + .key_helper() + .get_existing_key(&la.access_key_id) + .await?; + let state = key.state.as_option().unwrap(); + if matches!(state.local_aliases.get(&la.alias), Some(_)) { + return Err(Error::bad_request("Local alias already exists")); + } + } + + let bucket = Bucket::new(); + garage.bucket_table.insert(&bucket).await?; + + if let Some(ga) = &req.global_alias { + garage + .bucket_helper() + .set_global_bucket_alias(bucket.id, ga) + .await?; + } + + if let Some(la) = &req.local_alias { + garage + .bucket_helper() + .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias) + .await?; + if la.all_permissions { + garage + .bucket_helper() + .set_bucket_key_permissions( + bucket.id, + &la.access_key_id, + BucketKeyPerm::ALL_PERMISSIONS, + ) + .await?; + } + } + + let bucket = garage + .bucket_table + .get(&EmptyKey, &bucket.id) + .await? + .ok_or_internal_error("Bucket should now exist but doesn't")?; + bucket_info_results(garage, bucket).await +} + +#[derive(Deserialize)] +struct CreateBucketRequest { + #[serde(rename = "globalAlias")] + global_alias: Option<String>, + #[serde(rename = "localAlias")] + local_alias: Option<CreateBucketLocalAlias>, +} + +#[derive(Deserialize)] +struct CreateBucketLocalAlias { + #[serde(rename = "accessKeyId")] + access_key_id: String, + alias: String, + #[serde(rename = "allPermissions", default)] + all_permissions: bool, +} + +pub async fn handle_delete_bucket( + garage: &Arc<Garage>, + id: String, +) -> Result<Response<Body>, Error> { + let helper = garage.bucket_helper(); + + let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?; + let bucket_id = Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?; + + let mut bucket = helper.get_existing_bucket(bucket_id).await?; + let state = bucket.state.as_option().unwrap(); + + // Check bucket is empty + let objects = garage + .object_table + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) + .await?; + if !objects.is_empty() { + return Err(CommonError::BucketNotEmpty.into()); + } + + // --- done checking, now commit --- + // 1. delete authorization from keys that had access + for (key_id, perm) in bucket.authorized_keys() { + if perm.is_any() { + helper + .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + } + // 2. delete all local aliases + for ((key_id, alias), _, active) in state.local_aliases.items().iter() { + if *active { + helper + .unset_local_bucket_alias(bucket.id, key_id, alias) + .await?; + } + } + // 3. delete all global aliases + for (alias, _, active) in state.aliases.items().iter() { + if *active { + helper.purge_global_bucket_alias(bucket.id, alias).await?; + } + } + + // 4. delete bucket + bucket.state = Deletable::delete(); + garage.bucket_table.insert(&bucket).await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +pub async fn handle_bucket_change_key_perm( + garage: &Arc<Garage>, + req: Request<Body>, + new_perm_flag: bool, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?; + + let id_hex = hex::decode(&req.bucket_id).ok_or_bad_request("Invalid bucket id")?; + let bucket_id = Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?; + + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let state = bucket.state.as_option().unwrap(); + + let key = garage + .key_helper() + .get_existing_key(&req.access_key_id) + .await?; + + let mut perm = state + .authorized_keys + .get(&key.key_id) + .cloned() + .unwrap_or(BucketKeyPerm::NO_PERMISSIONS); + + if req.permissions.read { + perm.allow_read = new_perm_flag; + } + if req.permissions.write { + perm.allow_write = new_perm_flag; + } + if req.permissions.owner { + perm.allow_owner = new_perm_flag; + } + + garage + .bucket_helper() + .set_bucket_key_permissions(bucket.id, &key.key_id, perm) + .await?; + + let bucket = garage + .bucket_table + .get(&EmptyKey, &bucket.id) + .await? + .ok_or_internal_error("Bucket should now exist but doesn't")?; + bucket_info_results(garage, bucket).await +} + +#[derive(Deserialize)] +struct BucketKeyPermChangeRequest { + #[serde(rename = "bucketId")] + bucket_id: String, + #[serde(rename = "accessKeyId")] + access_key_id: String, + permissions: ApiBucketKeyPerm, +} diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs new file mode 100644 index 00000000..91d99d8a --- /dev/null +++ b/src/api/admin/cluster.rs @@ -0,0 +1,165 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_rpc::layout::*; + +use garage_model::garage::Garage; + +use crate::admin::error::*; +use crate::helpers::parse_json_body; + +pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let res = GetClusterStatusResponse { + node: hex::encode(garage.system.id), + garage_version: garage.system.garage_version(), + known_nodes: garage + .system + .get_known_nodes() + .into_iter() + .map(|i| { + ( + hex::encode(i.id), + KnownNodeResp { + addr: i.addr, + is_up: i.is_up, + last_seen_secs_ago: i.last_seen_secs_ago, + hostname: i.status.hostname, + }, + ) + }) + .collect(), + layout: get_cluster_layout(garage), + }; + + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let res = get_cluster_layout(garage); + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse { + let layout = garage.system.get_cluster_layout(); + + GetClusterLayoutResponse { + version: layout.version, + roles: layout + .roles + .items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(k, _, v)| (hex::encode(k), v.0.clone())) + .collect(), + staged_role_changes: layout + .staging + .items() + .iter() + .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) + .map(|(k, _, v)| (hex::encode(k), v.0.clone())) + .collect(), + } +} + +#[derive(Serialize)] +struct GetClusterStatusResponse { + node: String, + garage_version: &'static str, + #[serde(rename = "knownNodes")] + known_nodes: HashMap<String, KnownNodeResp>, + layout: GetClusterLayoutResponse, +} + +#[derive(Serialize)] +struct GetClusterLayoutResponse { + version: u64, + roles: HashMap<String, Option<NodeRole>>, + #[serde(rename = "stagedRoleChanges")] + staged_role_changes: HashMap<String, Option<NodeRole>>, +} + +#[derive(Serialize)] +struct KnownNodeResp { + addr: SocketAddr, + is_up: bool, + last_seen_secs_ago: Option<u64>, + hostname: String, +} + +pub async fn handle_update_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; + + let mut layout = garage.system.get_cluster_layout(); + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + for (node, role) in updates { + let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?; + let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; + + layout + .staging + .merge(&roles.update_mutator(node, NodeRoleV(role))); + } + + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_apply_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.apply_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_revert_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.revert_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>; + +#[derive(Deserialize)] +struct ApplyRevertLayoutRequest { + version: u64, +} diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs new file mode 100644 index 00000000..cd7e6af7 --- /dev/null +++ b/src/api/admin/error.rs @@ -0,0 +1,89 @@ +use err_derive::Error; +use hyper::header::HeaderValue; +use hyper::{Body, HeaderMap, StatusCode}; + +use garage_model::helper::error::Error as HelperError; + +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; +use crate::generic_server::ApiError; +use crate::helpers::CustomApiErrorBody; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "{}", _0)] + /// Error from common error + CommonError(CommonError), + + // Category: cannot process + /// The API access key does not exist + #[error(display = "Access key not found: {}", _0)] + NoSuchAccessKey(String), +} + +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::CommonError(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} + +impl From<HelperError> for Error { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::CommonError(CommonError::InternalError(i)), + HelperError::BadRequest(b) => Self::CommonError(CommonError::BadRequest(b)), + HelperError::InvalidBucketName(n) => { + Self::CommonError(CommonError::InvalidBucketName(n)) + } + HelperError::NoSuchBucket(n) => Self::CommonError(CommonError::NoSuchBucket(n)), + HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n), + } + } +} + +impl Error { + fn code(&self) -> &'static str { + match self { + Error::CommonError(c) => c.aws_code(), + Error::NoSuchAccessKey(_) => "NoSuchAccessKey", + } + } +} + +impl ApiError for Error { + /// Get the HTTP status code that best represents the meaning of the error for the client + fn http_status_code(&self) -> StatusCode { + match self { + Error::CommonError(c) => c.http_status_code(), + Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND, + } + } + + fn add_http_headers(&self, _header_map: &mut HeaderMap<HeaderValue>) { + // nothing + } + + fn http_body(&self, garage_region: &str, path: &str) -> Body { + let error = CustomApiErrorBody { + code: self.code().to_string(), + message: format!("{}", self), + path: path.to_string(), + region: garage_region.to_string(), + }; + Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + r#" +{ + "code": "InternalError", + "message": "JSON encoding of error failed" +} + "# + .into() + })) + } +} diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs new file mode 100644 index 00000000..1e910d52 --- /dev/null +++ b/src/api/admin/key.rs @@ -0,0 +1,240 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::error::Error as GarageError; + +use garage_table::*; + +use garage_model::garage::Garage; +use garage_model::key_table::*; + +use crate::admin::error::*; +use crate::helpers::parse_json_body; + +pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let res = garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + EnumerationOrder::Forward, + ) + .await? + .iter() + .map(|k| ListKeyResultItem { + id: k.key_id.to_string(), + name: k.params().unwrap().name.get().clone(), + }) + .collect::<Vec<_>>(); + + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct ListKeyResultItem { + id: String, + name: String, +} + +pub async fn handle_get_key_info( + garage: &Arc<Garage>, + id: Option<String>, + search: Option<String>, +) -> Result<Response<Body>, Error> { + let key = if let Some(id) = id { + garage.key_helper().get_existing_key(&id).await? + } else if let Some(search) = search { + garage + .key_helper() + .get_existing_matching_key(&search) + .await? + } else { + unreachable!(); + }; + + key_info_results(garage, key).await +} + +pub async fn handle_create_key( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<CreateKeyRequest>(req).await?; + + let key = Key::new(&req.name); + garage.key_table.insert(&key).await?; + + key_info_results(garage, key).await +} + +#[derive(Deserialize)] +struct CreateKeyRequest { + name: String, +} + +pub async fn handle_update_key( + garage: &Arc<Garage>, + id: String, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<UpdateKeyRequest>(req).await?; + + let mut key = garage.key_helper().get_existing_key(&id).await?; + + let key_state = key.state.as_option_mut().unwrap(); + + if let Some(new_name) = req.name { + key_state.name.update(new_name); + } + if let Some(allow) = req.allow { + if allow.create_bucket { + key_state.allow_create_bucket.update(true); + } + } + if let Some(deny) = req.deny { + if deny.create_bucket { + key_state.allow_create_bucket.update(false); + } + } + + garage.key_table.insert(&key).await?; + + key_info_results(garage, key).await +} + +#[derive(Deserialize)] +struct UpdateKeyRequest { + name: Option<String>, + allow: Option<KeyPerm>, + deny: Option<KeyPerm>, +} + +pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> { + let mut key = garage.key_helper().get_existing_key(&id).await?; + + key.state.as_option().unwrap(); + + garage.key_helper().delete_key(&mut key).await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Body>, Error> { + let mut relevant_buckets = HashMap::new(); + + let key_state = key.state.as_option().unwrap(); + + for id in key_state + .authorized_buckets + .items() + .iter() + .map(|(id, _)| id) + .chain( + key_state + .local_aliases + .items() + .iter() + .filter_map(|(_, _, v)| v.as_ref()), + ) { + if !relevant_buckets.contains_key(id) { + if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? { + if b.state.as_option().is_some() { + relevant_buckets.insert(*id, b); + } + } + } + } + + let res = GetKeyInfoResult { + name: key_state.name.get().clone(), + access_key_id: key.key_id.clone(), + secret_access_key: key_state.secret_key.clone(), + permissions: KeyPerm { + create_bucket: *key_state.allow_create_bucket.get(), + }, + buckets: relevant_buckets + .into_iter() + .map(|(_, bucket)| { + let state = bucket.state.as_option().unwrap(); + KeyInfoBucketResult { + id: hex::encode(bucket.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + local_aliases: state + .local_aliases + .items() + .iter() + .filter(|((k, _), _, a)| *a && *k == key.key_id) + .map(|((_, n), _, _)| n.to_string()) + .collect::<Vec<_>>(), + permissions: key_state + .authorized_buckets + .get(&bucket.id) + .map(|p| ApiBucketKeyPerm { + read: p.allow_read, + write: p.allow_write, + owner: p.allow_owner, + }) + .unwrap_or_default(), + } + }) + .collect::<Vec<_>>(), + }; + + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct GetKeyInfoResult { + name: String, + #[serde(rename = "accessKeyId")] + access_key_id: String, + #[serde(rename = "secretAccessKey")] + secret_access_key: String, + permissions: KeyPerm, + buckets: Vec<KeyInfoBucketResult>, +} + +#[derive(Serialize, Deserialize)] +struct KeyPerm { + #[serde(rename = "createBucket", default)] + create_bucket: bool, +} + +#[derive(Serialize)] +struct KeyInfoBucketResult { + id: String, + #[serde(rename = "globalAliases")] + global_aliases: Vec<String>, + #[serde(rename = "localAliases")] + local_aliases: Vec<String>, + permissions: ApiBucketKeyPerm, +} + +#[derive(Serialize, Deserialize, Default)] +pub(crate) struct ApiBucketKeyPerm { + #[serde(default)] + pub(crate) read: bool, + #[serde(default)] + pub(crate) write: bool, + #[serde(default)] + pub(crate) owner: bool, +} diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs new file mode 100644 index 00000000..c4857c10 --- /dev/null +++ b/src/api/admin/mod.rs @@ -0,0 +1,7 @@ +pub mod api_server; +mod error; +mod router; + +mod bucket; +mod cluster; +mod key; diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs new file mode 100644 index 00000000..2a5098bf --- /dev/null +++ b/src/api/admin/router.rs @@ -0,0 +1,111 @@ +use std::borrow::Cow; + +use hyper::{Method, Request}; + +use crate::admin::error::*; +use crate::router_macros::*; + +pub enum Authorization { + MetricsToken, + AdminToken, +} + +router_match! {@func + +/// List of all Admin API endpoints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Endpoint { + Options, + Metrics, + GetClusterStatus, + // Layout + GetClusterLayout, + UpdateClusterLayout, + ApplyClusterLayout, + RevertClusterLayout, + // Keys + ListKeys, + CreateKey, + GetKeyInfo { + id: Option<String>, + search: Option<String>, + }, + DeleteKey { + id: String, + }, + UpdateKey { + id: String, + }, + // Buckets + ListBuckets, + CreateBucket, + GetBucketInfo { + id: Option<String>, + global_alias: Option<String>, + }, + DeleteBucket { + id: String, + }, + // Bucket-Key Permissions + BucketAllowKey, + BucketDenyKey, +}} + +impl Endpoint { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> { + let uri = req.uri(); + let path = uri.path(); + let query = uri.query(); + + let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + + let res = router_match!(@gen_path_parser (req.method(), path, query) [ + OPTIONS _ => Options, + GET "/metrics" => Metrics, + GET "/status" => GetClusterStatus, + // Layout endpoints + GET "/layout" => GetClusterLayout, + POST "/layout" => UpdateClusterLayout, + POST "/layout/apply" => ApplyClusterLayout, + POST "/layout/revert" => RevertClusterLayout, + // API key endpoints + GET "/key" if id => GetKeyInfo (query_opt::id, query_opt::search), + GET "/key" if search => GetKeyInfo (query_opt::id, query_opt::search), + POST "/key" if id => UpdateKey (query::id), + POST "/key" => CreateKey, + DELETE "/key" if id => DeleteKey (query::id), + GET "/key" => ListKeys, + // Bucket endpoints + GET "/bucket" if id => GetBucketInfo (query_opt::id, query_opt::global_alias), + GET "/bucket" if global_alias => GetBucketInfo (query_opt::id, query_opt::global_alias), + GET "/bucket" => ListBuckets, + POST "/bucket" => CreateBucket, + DELETE "/bucket" if id => DeleteBucket (query::id), + // Bucket-key permissions + POST "/bucket/allow" => BucketAllowKey, + POST "/bucket/deny" => BucketDenyKey, + ]); + + if let Some(message) = query.nonempty_message() { + debug!("Unused query parameter: {}", message) + } + + Ok(res) + } + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + match self { + Self::Metrics => Authorization::MetricsToken, + _ => Authorization::AdminToken, + } + } +} + +generateQueryParameters! { + "id" => id, + "search" => search, + "globalAlias" => global_alias +} diff --git a/src/api/common_error.rs b/src/api/common_error.rs new file mode 100644 index 00000000..20f9f266 --- /dev/null +++ b/src/api/common_error.rs @@ -0,0 +1,177 @@ +use err_derive::Error; +use hyper::StatusCode; + +use garage_util::error::Error as GarageError; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum CommonError { + // ---- INTERNAL ERRORS ---- + /// Error related to deeper parts of Garage + #[error(display = "Internal error: {}", _0)] + InternalError(#[error(source)] GarageError), + + /// Error related to Hyper + #[error(display = "Internal error (Hyper error): {}", _0)] + Hyper(#[error(source)] hyper::Error), + + /// Error related to HTTP + #[error(display = "Internal error (HTTP error): {}", _0)] + Http(#[error(source)] http::Error), + + // ---- GENERIC CLIENT ERRORS ---- + /// Proper authentication was not provided + #[error(display = "Forbidden: {}", _0)] + Forbidden(String), + + /// Generic bad request response with custom message + #[error(display = "Bad request: {}", _0)] + BadRequest(String), + + // ---- SPECIFIC ERROR CONDITIONS ---- + // These have to be error codes referenced in the S3 spec here: + // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList + /// The bucket requested don't exists + #[error(display = "Bucket not found: {}", _0)] + NoSuchBucket(String), + + /// Tried to create a bucket that already exist + #[error(display = "Bucket already exists")] + BucketAlreadyExists, + + /// Tried to delete a non-empty bucket + #[error(display = "Tried to delete a non-empty bucket")] + BucketNotEmpty, + + // Category: bad request + /// Bucket name is not valid according to AWS S3 specs + #[error(display = "Invalid bucket name: {}", _0)] + InvalidBucketName(String), +} + +impl CommonError { + pub fn http_status_code(&self) -> StatusCode { + match self { + CommonError::InternalError( + GarageError::Timeout + | GarageError::RemoteError(_) + | GarageError::Quorum(_, _, _, _), + ) => StatusCode::SERVICE_UNAVAILABLE, + CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } + CommonError::BadRequest(_) => StatusCode::BAD_REQUEST, + CommonError::Forbidden(_) => StatusCode::FORBIDDEN, + CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND, + CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT, + CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST, + } + } + + pub fn aws_code(&self) -> &'static str { + match self { + CommonError::Forbidden(_) => "AccessDenied", + CommonError::InternalError( + GarageError::Timeout + | GarageError::RemoteError(_) + | GarageError::Quorum(_, _, _, _), + ) => "ServiceUnavailable", + CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { + "InternalError" + } + CommonError::BadRequest(_) => "InvalidRequest", + CommonError::NoSuchBucket(_) => "NoSuchBucket", + CommonError::BucketAlreadyExists => "BucketAlreadyExists", + CommonError::BucketNotEmpty => "BucketNotEmpty", + CommonError::InvalidBucketName(_) => "InvalidBucketName", + } + } + + pub fn bad_request<M: ToString>(msg: M) -> Self { + CommonError::BadRequest(msg.to_string()) + } +} + +pub trait CommonErrorDerivative: From<CommonError> { + fn internal_error<M: ToString>(msg: M) -> Self { + Self::from(CommonError::InternalError(GarageError::Message( + msg.to_string(), + ))) + } + + fn bad_request<M: ToString>(msg: M) -> Self { + Self::from(CommonError::BadRequest(msg.to_string())) + } + + fn forbidden<M: ToString>(msg: M) -> Self { + Self::from(CommonError::Forbidden(msg.to_string())) + } +} + +/// Trait to map error to the Bad Request error code +pub trait OkOrBadRequest { + type S; + fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>; +} + +impl<T, E> OkOrBadRequest for Result<T, E> +where + E: std::fmt::Display, +{ + type S = T; + fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Ok(x) => Ok(x), + Err(e) => Err(CommonError::BadRequest(format!( + "{}: {}", + reason.as_ref(), + e + ))), + } + } +} + +impl<T> OkOrBadRequest for Option<T> { + type S = T; + fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Some(x) => Ok(x), + None => Err(CommonError::BadRequest(reason.as_ref().to_string())), + } + } +} + +/// Trait to map an error to an Internal Error code +pub trait OkOrInternalError { + type S; + fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>; +} + +impl<T, E> OkOrInternalError for Result<T, E> +where + E: std::fmt::Display, +{ + type S = T; + fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Ok(x) => Ok(x), + Err(e) => Err(CommonError::InternalError(GarageError::Message(format!( + "{}: {}", + reason.as_ref(), + e + )))), + } + } +} + +impl<T> OkOrInternalError for Option<T> { + type S = T; + fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Some(x) => Ok(x), + None => Err(CommonError::InternalError(GarageError::Message( + reason.as_ref().to_string(), + ))), + } + } +} diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 9281e596..77278908 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -5,9 +5,11 @@ use async_trait::async_trait; use futures::future::Future; +use hyper::header::HeaderValue; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; +use hyper::{HeaderMap, StatusCode}; use opentelemetry::{ global, @@ -19,26 +21,31 @@ use opentelemetry::{ use garage_util::error::Error as GarageError; use garage_util::metrics::{gen_trace_id, RecordDuration}; -use crate::error::*; - pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); } +pub trait ApiError: std::error::Error + Send + Sync + 'static { + fn http_status_code(&self) -> StatusCode; + fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>); + fn http_body(&self, garage_region: &str, path: &str) -> Body; +} + #[async_trait] pub(crate) trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; type Endpoint: ApiEndpoint; + type Error: ApiError; - fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Error>; + fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>; async fn handle( &self, req: Request<Body>, endpoint: Self::Endpoint, - ) -> Result<Response<Body>, Error>; + ) -> Result<Response<Body>, Self::Error>; } pub(crate) struct ApiServer<A: ApiHandler> { @@ -142,13 +149,13 @@ impl<A: ApiHandler> ApiServer<A> { Ok(x) } Err(e) => { - let body: Body = Body::from(e.aws_xml(&self.region, uri.path())); + let body: Body = e.http_body(&self.region, uri.path()); let mut http_error_builder = Response::builder() .status(e.http_status_code()) .header("Content-Type", "application/xml"); if let Some(header_map) = http_error_builder.headers_mut() { - e.add_headers(header_map) + e.add_http_headers(header_map) } let http_error = http_error_builder.body(body)?; @@ -163,7 +170,7 @@ impl<A: ApiHandler> ApiServer<A> { } } - async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, Error> { + async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> { let endpoint = self.api_handler.parse_endpoint(&req)?; debug!("Endpoint: {}", endpoint.name()); diff --git a/src/api/helpers.rs b/src/api/helpers.rs index a994b82f..9fb12dbe 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,11 +1,8 @@ +use hyper::{Body, Request}; use idna::domain_to_unicode; +use serde::{Deserialize, Serialize}; -use garage_util::data::*; - -use garage_model::garage::Garage; -use garage_model::key_table::Key; - -use crate::error::*; +use crate::common_error::{CommonError as Error, *}; /// What kind of authorization is required to perform a given action #[derive(Debug, Clone, PartialEq, Eq)] @@ -50,7 +47,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { let mut iter = authority.chars().enumerate(); let (_, first_char) = iter .next() - .ok_or_else(|| Error::BadRequest("Authority is empty".to_string()))?; + .ok_or_else(|| Error::bad_request("Authority is empty".to_string()))?; let split = match first_char { '[' => { @@ -58,7 +55,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { match iter.next() { Some((_, ']')) => iter.next(), _ => { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Authority {} has an illegal format", authority ))) @@ -71,7 +68,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { let authority = match split { Some((i, ':')) => Ok(&authority[..i]), None => Ok(authority), - Some((_, _)) => Err(Error::BadRequest(format!( + Some((_, _)) => Err(Error::bad_request(format!( "Authority {} has an illegal format", authority ))), @@ -79,28 +76,6 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { authority.map(|h| domain_to_unicode(h).0) } -#[allow(clippy::ptr_arg)] -pub async fn resolve_bucket( - garage: &Garage, - bucket_name: &String, - api_key: &Key, -) -> Result<Uuid, Error> { - let api_key_params = api_key - .state - .as_option() - .ok_or_internal_error("Key should not be deleted at this point")?; - - if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { - Ok(*bucket_id) - } else { - Ok(garage - .bucket_helper() - .resolve_global_bucket_name(bucket_name) - .await? - .ok_or(Error::NoSuchBucket)?) - } -} - /// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in /// the host header of the request /// @@ -132,7 +107,7 @@ pub fn parse_bucket_key<'a>( None => (path, None), }; if bucket.is_empty() { - return Err(Error::BadRequest("No bucket specified".to_string())); + return Err(Error::bad_request("No bucket specified")); } Ok((bucket, key)) } @@ -163,6 +138,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> { None } +pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) +} + #[cfg(test)] mod tests { use super::*; @@ -298,3 +279,11 @@ mod tests { ); } } + +#[derive(Serialize)] +pub(crate) struct CustomApiErrorBody { + pub(crate) code: String, + pub(crate) message: String, + pub(crate) region: String, + pub(crate) path: String, +} diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 5f5e9030..eb0fbdd7 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -7,13 +7,12 @@ use hyper::{Body, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; -use garage_table::util::*; use garage_util::error::Error as GarageError; use garage_model::garage::Garage; -use crate::error::*; use crate::generic_server::*; +use crate::k2v::error::*; use crate::signature::payload::check_payload_signature; use crate::signature::streaming::*; @@ -60,6 +59,7 @@ impl ApiHandler for K2VApiServer { const API_NAME_DISPLAY: &'static str = "K2V"; type Endpoint = K2VApiEndpoint; + type Error = Error; fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> { let (endpoint, bucket_name) = Endpoint::from_request(req)?; @@ -83,13 +83,14 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is procesed early, before we even check for an API key if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, Some(bucket_name)).await; + return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) + .await + .ok_or_bad_request("Error handling OPTIONS")?); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; - let api_key = api_key.ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })?; + let api_key = api_key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; let req = parse_streaming_body( &api_key, @@ -99,13 +100,14 @@ impl ApiHandler for K2VApiServer { "k2v", )?; - let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket_name, &api_key) + .await?; let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -115,9 +117,7 @@ impl ApiHandler for K2VApiServer { }; if !allowed { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); + return Err(Error::forbidden("Operation is not allowed for this key.")); } // Look up what CORS rule might apply to response. @@ -125,7 +125,8 @@ impl ApiHandler for K2VApiServer { // are always preflighted, i.e. the browser should make // an OPTIONS call before to check it is allowed let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req) + .ok_or_internal_error("Error looking up CORS rule")?, _ => None, }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 4ecddeb9..db9901cf 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -12,7 +12,8 @@ use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::error::*; +use crate::helpers::*; +use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( @@ -20,9 +21,7 @@ pub async fn handle_insert_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let items: Vec<InsertBatchItem> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; let mut items2 = vec![]; for it in items { @@ -52,9 +51,7 @@ pub async fn handle_read_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let queries: Vec<ReadBatchQuery> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; let resp_results = futures::future::join_all( queries @@ -91,7 +88,7 @@ async fn handle_read_batch_query( let (items, more, next_start) = if query.single_item { if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse { - return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into())); + return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.")); } let sk = query .start @@ -149,9 +146,7 @@ pub async fn handle_delete_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let queries: Vec<DeleteBatchQuery> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; let resp_results = futures::future::join_all( queries @@ -188,7 +183,7 @@ async fn handle_delete_batch_query( let deleted_items = if query.single_item { if query.prefix.is_some() || query.end.is_some() { - return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into())); + return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.")); } let sk = query .start diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs new file mode 100644 index 00000000..dbd9a5f5 --- /dev/null +++ b/src/api/k2v/error.rs @@ -0,0 +1,136 @@ +use err_derive::Error; +use hyper::header::HeaderValue; +use hyper::{Body, HeaderMap, StatusCode}; + +use garage_model::helper::error::Error as HelperError; + +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; +use crate::generic_server::ApiError; +use crate::helpers::CustomApiErrorBody; +use crate::signature::error::Error as SignatureError; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "{}", _0)] + /// Error from common error + CommonError(CommonError), + + // Category: cannot process + /// Authorization Header Malformed + #[error(display = "Authorization header malformed, expected scope: {}", _0)] + AuthorizationHeaderMalformed(String), + + /// The object requested don't exists + #[error(display = "Key not found")] + NoSuchKey, + + /// Some base64 encoded data was badly encoded + #[error(display = "Invalid base64: {}", _0)] + InvalidBase64(#[error(source)] base64::DecodeError), + + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), + + /// The client asked for an invalid return format (invalid Accept header) + #[error(display = "Not acceptable: {}", _0)] + NotAcceptable(String), + + /// The request contained an invalid UTF-8 sequence in its path or in other parameters + #[error(display = "Invalid UTF-8: {}", _0)] + InvalidUtf8Str(#[error(source)] std::str::Utf8Error), +} + +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::CommonError(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} + +impl From<HelperError> for Error { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::CommonError(CommonError::InternalError(i)), + HelperError::BadRequest(b) => Self::CommonError(CommonError::BadRequest(b)), + HelperError::InvalidBucketName(n) => { + Self::CommonError(CommonError::InvalidBucketName(n)) + } + HelperError::NoSuchBucket(n) => Self::CommonError(CommonError::NoSuchBucket(n)), + e => Self::CommonError(CommonError::BadRequest(format!("{}", e))), + } + } +} + +impl From<SignatureError> for Error { + fn from(err: SignatureError) -> Self { + match err { + SignatureError::CommonError(c) => Self::CommonError(c), + SignatureError::AuthorizationHeaderMalformed(c) => { + Self::AuthorizationHeaderMalformed(c) + } + SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), + } + } +} + +impl Error { + /// This returns a keyword for the corresponding error. + /// Here, these keywords are not necessarily those from AWS S3, + /// as we are building a custom API + fn code(&self) -> &'static str { + match self { + Error::CommonError(c) => c.aws_code(), + Error::NoSuchKey => "NoSuchKey", + Error::NotAcceptable(_) => "NotAcceptable", + Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", + Error::InvalidBase64(_) => "InvalidBase64", + Error::InvalidHeader(_) => "InvalidHeaderValue", + Error::InvalidUtf8Str(_) => "InvalidUtf8String", + } + } +} + +impl ApiError for Error { + /// Get the HTTP status code that best represents the meaning of the error for the client + fn http_status_code(&self) -> StatusCode { + match self { + Error::CommonError(c) => c.http_status_code(), + Error::NoSuchKey => StatusCode::NOT_FOUND, + Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, + Error::AuthorizationHeaderMalformed(_) + | Error::InvalidBase64(_) + | Error::InvalidHeader(_) + | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, + } + } + + fn add_http_headers(&self, _header_map: &mut HeaderMap<HeaderValue>) { + // nothing + } + + fn http_body(&self, garage_region: &str, path: &str) -> Body { + let error = CustomApiErrorBody { + code: self.code().to_string(), + message: format!("{}", self), + path: path.to_string(), + region: garage_region.to_string(), + }; + Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + r#" +{ + "code": "InternalError", + "message": "JSON encoding of error failed" +} + "# + .into() + })) + } +} diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 896dbcf0..d5db906d 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -12,7 +12,7 @@ use garage_table::util::*; use garage_model::garage::Garage; use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; -use crate::error::*; +use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_read_index( diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 1860863e..836d386f 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -10,7 +10,7 @@ use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::error::*; +use crate::k2v::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs index ee210ad5..b6a8c5cf 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/mod.rs @@ -1,4 +1,5 @@ pub mod api_server; +mod error; mod router; mod batch; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index 295c34aa..bb9d3be5 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -7,8 +7,8 @@ use std::sync::Arc; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::error::*; use crate::helpers::key_after_prefix; +use crate::k2v::error::*; /// Read range in a Garage table. /// Returns (entries, more?, nextStart) @@ -31,7 +31,7 @@ where (None, Some(s)) => (Some(s.clone()), false), (Some(p), Some(s)) => { if !s.starts_with(p) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Start key '{}' does not start with prefix '{}'", s, p ))); diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index f948ffce..50e6965b 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -1,4 +1,4 @@ -use crate::error::*; +use crate::k2v::error::*; use std::borrow::Cow; @@ -62,7 +62,7 @@ impl Endpoint { .unwrap_or((path.to_owned(), "")); if bucket.is_empty() { - return Err(Error::BadRequest("Missing bucket name".to_owned())); + return Err(Error::bad_request("Missing bucket name")); } if *req.method() == Method::OPTIONS { @@ -83,7 +83,7 @@ impl Endpoint { Method::PUT => Self::from_put(partition_key, &mut query)?, Method::DELETE => Self::from_delete(partition_key, &mut query)?, _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?, - _ => return Err(Error::BadRequest("Unknown method".to_owned())), + _ => return Err(Error::bad_request("Unknown method")), }; if let Some(message) = query.nonempty_message() { diff --git a/src/api/lib.rs b/src/api/lib.rs index 0078f7b5..370dfd7a 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -2,16 +2,16 @@ #[macro_use] extern crate tracing; -pub mod error; -pub use error::Error; +pub mod common_error; mod encoding; -mod generic_server; +pub mod generic_server; pub mod helpers; mod router_macros; /// This mode is public only to help testing. Don't expect stability here pub mod signature; +pub mod admin; #[cfg(feature = "k2v")] pub mod k2v; pub mod s3; diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs index 8471407c..4c593300 100644 --- a/src/api/router_macros.rs +++ b/src/api/router_macros.rs @@ -23,6 +23,29 @@ macro_rules! router_match { _ => None } }}; + (@gen_path_parser ($method:expr, $reqpath:expr, $query:expr) + [ + $($meth:ident $path:pat $(if $required:ident)? => $api:ident $(($($conv:ident :: $param:ident),*))?,)* + ]) => {{ + { + use Endpoint::*; + match ($method, $reqpath) { + $( + (&Method::$meth, $path) if true $(&& $query.$required.is_some())? => $api { + $($( + $param: router_match!(@@parse_param $query, $conv, $param), + )*)? + }, + )* + (m, p) => { + return Err(Error::bad_request(format!( + "Unknown API endpoint: {} {}", + m, p + ))) + } + } + } + }}; (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr), key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ @@ -55,7 +78,7 @@ macro_rules! router_match { )*)? }), )* - (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw))) + (kw, _) => Err(Error::bad_request(format!("Invalid endpoint: {}", kw))) } }}; @@ -74,14 +97,14 @@ macro_rules! router_match { .take() .map(|param| param.parse()) .transpose() - .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? + .map_err(|_| Error::bad_request("Failed to parse query parameter"))? }}; (@@parse_param $query:expr, parse, $param:ident) => {{ // extract and parse mandatory query parameter // both missing and un-parseable parameters are reported as errors $query.$param.take().ok_or_bad_request("Missing argument for endpoint")? .parse() - .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? + .map_err(|_| Error::bad_request("Failed to parse query parameter"))? }}; (@func $(#[$doc:meta])* @@ -150,7 +173,7 @@ macro_rules! generateQueryParameters { false } else if v.as_ref().is_empty() { if res.keyword.replace(k).is_some() { - return Err(Error::BadRequest("Multiple keywords".to_owned())); + return Err(Error::bad_request("Multiple keywords")); } continue; } else { @@ -160,7 +183,7 @@ macro_rules! generateQueryParameters { } }; if repeated { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Query parameter repeated: '{}'", k ))); diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 78a69d53..ecc417ab 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -8,14 +8,13 @@ use hyper::{Body, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; -use garage_table::util::*; use garage_util::error::Error as GarageError; use garage_model::garage::Garage; use garage_model::key_table::Key; -use crate::error::*; use crate::generic_server::*; +use crate::s3::error::*; use crate::signature::payload::check_payload_signature; use crate::signature::streaming::*; @@ -75,6 +74,7 @@ impl ApiHandler for S3ApiServer { const API_NAME_DISPLAY: &'static str = "S3"; type Endpoint = S3ApiEndpoint; + type Error = Error; fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> { let authority = req @@ -122,9 +122,8 @@ impl ApiHandler for S3ApiServer { } let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; - let api_key = api_key.ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })?; + let api_key = api_key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; let req = parse_streaming_body( &api_key, @@ -148,13 +147,14 @@ impl ApiHandler for S3ApiServer { return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await; } - let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket_name, &api_key) + .await?; let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -164,9 +164,7 @@ impl ApiHandler for S3ApiServer { }; if !allowed { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); + return Err(Error::forbidden("Operation is not allowed for this key.")); } // Look up what CORS rule might apply to response. @@ -309,7 +307,7 @@ impl ApiHandler for S3ApiServer { ) .await } else { - Err(Error::BadRequest(format!( + Err(Error::bad_request(format!( "Invalid endpoint: list-type={}", list_type ))) diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 93048a8c..1304cc07 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -14,7 +14,8 @@ use garage_util::crdt::*; use garage_util::data::*; use garage_util::time::*; -use crate::error::*; +use crate::common_error::CommonError; +use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; @@ -130,7 +131,7 @@ pub async fn handle_create_bucket( if let Some(location_constraint) = cmd { if location_constraint != garage.config.s3_api.s3_region { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`", location_constraint, garage.config.s3_api.s3_region @@ -158,12 +159,12 @@ pub async fn handle_create_bucket( // otherwise return a forbidden error. let kp = api_key.bucket_permissions(&bucket_id); if !(kp.allow_write || kp.allow_owner) { - return Err(Error::BucketAlreadyExists); + return Err(CommonError::BucketAlreadyExists.into()); } } else { // Create the bucket! if !is_valid_bucket_name(&bucket_name) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "{}: {}", bucket_name, INVALID_BUCKET_NAME_MESSAGE ))); @@ -239,7 +240,7 @@ pub async fn handle_delete_bucket( ) .await?; if !objects.is_empty() { - return Err(Error::BucketNotEmpty); + return Err(CommonError::BucketNotEmpty.into()); } // --- done checking, now commit --- diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 4e94d887..0fc16993 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -18,8 +18,8 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::error::*; -use crate::helpers::{parse_bucket_key, resolve_bucket}; +use crate::helpers::parse_bucket_key; +use crate::s3::error::*; use crate::s3::put::{decode_upload_id, get_headers}; use crate::s3::xml::{self as s3_xml, xmlns_tag}; @@ -201,8 +201,8 @@ pub async fn handle_upload_part_copy( let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size) .map_err(|e| (e, source_version_meta.size))?; if ranges.len() != 1 { - return Err(Error::BadRequest( - "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(), + return Err(Error::bad_request( + "Invalid x-amz-copy-source-range header: exactly 1 range must be given", )); } else { ranges.pop().unwrap() @@ -230,8 +230,8 @@ pub async fn handle_upload_part_copy( // This is only for small files, we don't bother handling this. // (in AWS UploadPartCopy works for parts at least 5MB which // is never the case of an inline object) - return Err(Error::BadRequest( - "Source object is too small (minimum part size is 5Mb)".into(), + return Err(Error::bad_request( + "Source object is too small (minimum part size is 5Mb)", )); } ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (), @@ -250,7 +250,7 @@ pub async fn handle_upload_part_copy( // Check this part number hasn't yet been uploaded if let Some(dv) = dest_version { if dv.has_part_number(part_number) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Part number {} has already been uploaded", part_number ))); @@ -413,10 +413,13 @@ async fn get_copy_source( let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; - let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?; + let source_bucket_id = garage + .bucket_helper() + .resolve_bucket(&source_bucket.to_string(), api_key) + .await?; if !api_key.allow_read(&source_bucket_id) { - return Err(Error::Forbidden(format!( + return Err(Error::forbidden(format!( "Reading from bucket {} not allowed for this key", source_bucket ))); @@ -536,8 +539,8 @@ impl CopyPreconditionHeaders { (None, None, None, Some(ims)) => v_date > *ims, (None, None, None, None) => true, _ => { - return Err(Error::BadRequest( - "Invalid combination of x-amz-copy-source-if-xxxxx headers".into(), + return Err(Error::bad_request( + "Invalid combination of x-amz-copy-source-if-xxxxx headers", )) } }; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 37ea2e43..c7273464 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -9,13 +9,12 @@ use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::error::*; +use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; -use garage_table::*; use garage_util::data::*; pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { @@ -48,14 +47,11 @@ pub async fn handle_delete_cors( bucket_id: Uuid, ) -> Result<Response<Body>, Error> { let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); param.cors_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -78,14 +74,11 @@ pub async fn handle_put_cors( } let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; @@ -119,12 +112,7 @@ pub async fn handle_options_s3api( let helper = garage.bucket_helper(); let bucket_id = helper.resolve_global_bucket_name(&bn).await?; if let Some(id) = bucket_id { - let bucket = garage - .bucket_table - .get(&EmptyKey, &id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; + let bucket = garage.bucket_helper().get_existing_bucket(id).await?; handle_options_for_bucket(req, &bucket) } else { // If there is a bucket name in the request, but that name @@ -185,7 +173,7 @@ pub fn handle_options_for_bucket( } } - Err(Error::Forbidden("This CORS request is not allowed.".into())) + Err(Error::forbidden("This CORS request is not allowed.")) } pub fn find_matching_cors_rule<'a>( diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 1e3f1249..5065b285 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -8,7 +8,7 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::s3::object_table::*; -use crate::error::*; +use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; diff --git a/src/api/error.rs b/src/api/s3/error.rs index 4b7254d2..f2096bea 100644 --- a/src/api/error.rs +++ b/src/api/s3/error.rs @@ -2,34 +2,24 @@ use std::convert::TryInto; use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{HeaderMap, StatusCode}; +use hyper::{Body, HeaderMap, StatusCode}; use garage_model::helper::error::Error as HelperError; -use garage_util::error::Error as GarageError; +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; +use crate::generic_server::ApiError; use crate::s3::xml as s3_xml; +use crate::signature::error::Error as SignatureError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { - // Category: internal error - /// Error related to deeper parts of Garage - #[error(display = "Internal error: {}", _0)] - InternalError(#[error(source)] GarageError), - - /// Error related to Hyper - #[error(display = "Internal error (Hyper error): {}", _0)] - Hyper(#[error(source)] hyper::Error), - - /// Error related to HTTP - #[error(display = "Internal error (HTTP error): {}", _0)] - Http(#[error(source)] http::Error), + #[error(display = "{}", _0)] + /// Error from common error + CommonError(CommonError), // Category: cannot process - /// No proper api key was used, or the signature was invalid - #[error(display = "Forbidden: {}", _0)] - Forbidden(String), - /// Authorization Header Malformed #[error(display = "Authorization header malformed, expected scope: {}", _0)] AuthorizationHeaderMalformed(String), @@ -38,22 +28,10 @@ pub enum Error { #[error(display = "Key not found")] NoSuchKey, - /// The bucket requested don't exists - #[error(display = "Bucket not found")] - NoSuchBucket, - /// The multipart upload requested don't exists #[error(display = "Upload not found")] NoSuchUpload, - /// Tried to create a bucket that already exist - #[error(display = "Bucket already exists")] - BucketAlreadyExists, - - /// Tried to delete a non-empty bucket - #[error(display = "Tried to delete a non-empty bucket")] - BucketNotEmpty, - /// Precondition failed (e.g. x-amz-copy-source-if-match) #[error(display = "At least one of the preconditions you specified did not hold")] PreconditionFailed, @@ -80,10 +58,6 @@ pub enum Error { #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8String(#[error(source)] std::string::FromUtf8Error), - /// Some base64 encoded data was badly encoded - #[error(display = "Invalid base64: {}", _0)] - InvalidBase64(#[error(source)] base64::DecodeError), - /// The client sent invalid XML data #[error(display = "Invalid XML: {}", _0)] InvalidXml(String), @@ -96,19 +70,36 @@ pub enum Error { #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), - /// The client sent an invalid request - #[error(display = "Bad request: {}", _0)] - BadRequest(String), - - /// The client asked for an invalid return format (invalid Accept header) - #[error(display = "Not acceptable: {}", _0)] - NotAcceptable(String), - /// The client sent a request for an action not supported by garage #[error(display = "Unimplemented action: {}", _0)] NotImplemented(String), } +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::CommonError(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} + +impl From<HelperError> for Error { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::CommonError(CommonError::InternalError(i)), + HelperError::BadRequest(b) => Self::CommonError(CommonError::BadRequest(b)), + HelperError::InvalidBucketName(n) => { + Self::CommonError(CommonError::InvalidBucketName(n)) + } + HelperError::NoSuchBucket(n) => Self::CommonError(CommonError::NoSuchBucket(n)), + e => Self::bad_request(format!("{}", e)), + } + } +} + impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -121,88 +112,67 @@ impl From<quick_xml::de::DeError> for Error { } } -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { +impl From<SignatureError> for Error { + fn from(err: SignatureError) -> Self { match err { - HelperError::Internal(i) => Self::InternalError(i), - HelperError::BadRequest(b) => Self::BadRequest(b), + SignatureError::CommonError(c) => Self::CommonError(c), + SignatureError::AuthorizationHeaderMalformed(c) => { + Self::AuthorizationHeaderMalformed(c) + } + SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } impl From<multer::Error> for Error { fn from(err: multer::Error) -> Self { - Self::BadRequest(err.to_string()) + Self::bad_request(err) } } impl Error { - /// Get the HTTP status code that best represents the meaning of the error for the client - pub fn http_status_code(&self) -> StatusCode { - match self { - Error::NoSuchKey | Error::NoSuchBucket | Error::NoSuchUpload => StatusCode::NOT_FOUND, - Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT, - Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, - Error::Forbidden(_) => StatusCode::FORBIDDEN, - Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, - Error::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => StatusCode::SERVICE_UNAVAILABLE, - Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } - Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE, - Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED, - _ => StatusCode::BAD_REQUEST, - } - } - pub fn aws_code(&self) -> &'static str { match self { + Error::CommonError(c) => c.aws_code(), Error::NoSuchKey => "NoSuchKey", - Error::NoSuchBucket => "NoSuchBucket", Error::NoSuchUpload => "NoSuchUpload", - Error::BucketAlreadyExists => "BucketAlreadyExists", - Error::BucketNotEmpty => "BucketNotEmpty", Error::PreconditionFailed => "PreconditionFailed", Error::InvalidPart => "InvalidPart", Error::InvalidPartOrder => "InvalidPartOrder", Error::EntityTooSmall => "EntityTooSmall", - Error::Forbidden(_) => "AccessDenied", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::NotImplemented(_) => "NotImplemented", - Error::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => "ServiceUnavailable", - Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError", - _ => "InvalidRequest", + Error::InvalidXml(_) => "MalformedXML", + Error::InvalidRange(_) => "InvalidRange", + Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { + "InvalidRequest" + } } } +} - pub fn aws_xml(&self, garage_region: &str, path: &str) -> String { - let error = s3_xml::Error { - code: s3_xml::Value(self.aws_code().to_string()), - message: s3_xml::Value(format!("{}", self)), - resource: Some(s3_xml::Value(path.to_string())), - region: Some(s3_xml::Value(garage_region.to_string())), - }; - s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { - r#" -<?xml version="1.0" encoding="UTF-8"?> -<Error> - <Code>InternalError</Code> - <Message>XML encoding of error failed</Message> -</Error> - "# - .into() - }) +impl ApiError for Error { + /// Get the HTTP status code that best represents the meaning of the error for the client + fn http_status_code(&self) -> StatusCode { + match self { + Error::CommonError(c) => c.http_status_code(), + Error::NoSuchKey | Error::NoSuchUpload => StatusCode::NOT_FOUND, + Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, + Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE, + Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED, + Error::AuthorizationHeaderMalformed(_) + | Error::InvalidPart + | Error::InvalidPartOrder + | Error::EntityTooSmall + | Error::InvalidXml(_) + | Error::InvalidUtf8Str(_) + | Error::InvalidUtf8String(_) + | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST, + } } - pub fn add_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { + fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { use hyper::header; #[allow(clippy::single_match)] match self { @@ -217,68 +187,23 @@ impl Error { _ => (), } } -} - -/// Trait to map error to the Bad Request error code -pub trait OkOrBadRequest { - type S; - fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>; -} - -impl<T, E> OkOrBadRequest for Result<T, E> -where - E: std::fmt::Display, -{ - type S = T; - fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Ok(x) => Ok(x), - Err(e) => Err(Error::BadRequest(format!("{}: {}", reason.as_ref(), e))), - } - } -} - -impl<T> OkOrBadRequest for Option<T> { - type S = T; - fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Some(x) => Ok(x), - None => Err(Error::BadRequest(reason.as_ref().to_string())), - } - } -} - -/// Trait to map an error to an Internal Error code -pub trait OkOrInternalError { - type S; - fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>; -} - -impl<T, E> OkOrInternalError for Result<T, E> -where - E: std::fmt::Display, -{ - type S = T; - fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Ok(x) => Ok(x), - Err(e) => Err(Error::InternalError(GarageError::Message(format!( - "{}: {}", - reason.as_ref(), - e - )))), - } - } -} -impl<T> OkOrInternalError for Option<T> { - type S = T; - fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Some(x) => Ok(x), - None => Err(Error::InternalError(GarageError::Message( - reason.as_ref().to_string(), - ))), - } + fn http_body(&self, garage_region: &str, path: &str) -> Body { + let error = s3_xml::Error { + code: s3_xml::Value(self.aws_code().to_string()), + message: s3_xml::Value(format!("{}", self)), + resource: Some(s3_xml::Value(path.to_string())), + region: Some(s3_xml::Value(garage_region.to_string())), + }; + Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { + r#" +<?xml version="1.0" encoding="UTF-8"?> +<Error> + <Code>InternalError</Code> + <Message>XML encoding of error failed</Message> +</Error> + "# + .into() + })) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 3edf22a6..7fa1a177 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -17,7 +17,7 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::error::*; +use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -210,8 +210,8 @@ pub async fn handle_get( match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => { - return Err(Error::BadRequest( - "Cannot specify both partNumber and Range header".into(), + return Err(Error::bad_request( + "Cannot specify both partNumber and Range header", )); } (Some(pn), None) => { @@ -302,9 +302,9 @@ async fn handle_get_range( let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); Ok(resp_builder.body(body)?) } else { - None.ok_or_internal_error( + Err(Error::internal_error( "Requested range not present in inline bytes when it should have been", - ) + )) } } ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index e2848c57..e5f486c8 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -16,8 +16,8 @@ use garage_model::s3::version_table::Version; use garage_table::{EmptyKey, EnumerationOrder}; use crate::encoding::*; -use crate::error::*; use crate::helpers::key_after_prefix; +use crate::s3::error::*; use crate::s3::put as s3_put; use crate::s3::xml as s3_xml; @@ -582,13 +582,19 @@ impl ListObjectsQuery { // representing the key to start with. (Some(token), _) => match &token[..1] { "[" => Ok(RangeBegin::IncludingKey { - key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + key: String::from_utf8( + base64::decode(token[1..].as_bytes()) + .ok_or_bad_request("Invalid continuation token")?, + )?, fallback_key: None, }), "]" => Ok(RangeBegin::AfterKey { - key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + key: String::from_utf8( + base64::decode(token[1..].as_bytes()) + .ok_or_bad_request("Invalid continuation token")?, + )?, }), - _ => Err(Error::BadRequest("Invalid continuation token".to_string())), + _ => Err(Error::bad_request("Invalid continuation token")), }, // StartAfter has defined semantics in the spec: diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs index 3f5c1915..7b56d4d8 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/mod.rs @@ -1,4 +1,5 @@ pub mod api_server; +pub mod error; mod bucket; mod copy; diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 86fa7880..dc640f43 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -14,8 +14,7 @@ use serde::Deserialize; use garage_model::garage::Garage; -use crate::error::*; -use crate::helpers::resolve_bucket; +use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; use crate::signature::payload::{parse_date, verify_v4}; @@ -48,9 +47,7 @@ pub async fn handle_post_object( let field = if let Some(field) = multipart.next_field().await? { field } else { - return Err(Error::BadRequest( - "Request did not contain a file".to_owned(), - )); + return Err(Error::bad_request("Request did not contain a file")); }; let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) { name @@ -66,14 +63,14 @@ pub async fn handle_post_object( "tag" => (/* tag need to be reencoded, but we don't support them yet anyway */), "acl" => { if params.insert("x-amz-acl", content).is_some() { - return Err(Error::BadRequest( - "Field 'acl' provided more than one time".to_string(), + return Err(Error::bad_request( + "Field 'acl' provided more than one time", )); } } _ => { if params.insert(&name, content).is_some() { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Field '{}' provided more than one time", name ))); @@ -90,9 +87,7 @@ pub async fn handle_post_object( .to_str()?; let credential = params .get("x-amz-credential") - .ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })? + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))? .to_str()?; let policy = params .get("policy") @@ -129,15 +124,16 @@ pub async fn handle_post_object( ) .await?; - let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?; + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket, &api_key) + .await?; if !api_key.allow_write(&bucket_id) { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); + return Err(Error::forbidden("Operation is not allowed for this key.")); } - let decoded_policy = base64::decode(&policy)?; + let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; @@ -145,9 +141,7 @@ pub async fn handle_post_object( .ok_or_bad_request("Invalid expiration date")? .into(); if Utc::now() - expiration > Duration::zero() { - return Err(Error::BadRequest( - "Expiration date is in the paste".to_string(), - )); + return Err(Error::bad_request("Expiration date is in the paste")); } let mut conditions = decoded_policy.into_conditions()?; @@ -159,7 +153,7 @@ pub async fn handle_post_object( "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields "content-type" => { let conds = conditions.params.remove("content-type").ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -169,7 +163,7 @@ pub async fn handle_post_object( } }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -178,7 +172,7 @@ pub async fn handle_post_object( } "key" => { let conds = conditions.params.remove("key").ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -186,7 +180,7 @@ pub async fn handle_post_object( Operation::StartsWith(s) => key.starts_with(&s), }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -201,7 +195,7 @@ pub async fn handle_post_object( continue; } let conds = conditions.params.remove(¶m_key).ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -209,7 +203,7 @@ pub async fn handle_post_object( Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()), }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -220,7 +214,7 @@ pub async fn handle_post_object( } if let Some((param_key, _)) = conditions.params.iter().next() { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' is required in policy, but no value was provided", param_key ))); @@ -326,7 +320,7 @@ impl Policy { match condition { PolicyCondition::Equal(map) => { if map.len() != 1 { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } let (mut k, v) = map.into_iter().next().expect("size was verified"); k.make_ascii_lowercase(); @@ -334,7 +328,7 @@ impl Policy { } PolicyCondition::OtherOp([cond, mut key, value]) => { if key.remove(0) != '$' { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } key.make_ascii_lowercase(); match cond.as_str() { @@ -347,7 +341,7 @@ impl Policy { .or_default() .push(Operation::StartsWith(value)); } - _ => return Err(Error::BadRequest("Invalid policy item".to_owned())), + _ => return Err(Error::bad_request("Invalid policy item")), } } PolicyCondition::SizeRange(key, min, max) => { @@ -355,7 +349,7 @@ impl Policy { length.0 = length.0.max(min); length.1 = length.1.min(max); } else { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } } } @@ -420,15 +414,15 @@ where self.read += bytes.len() as u64; // optimization to fail early when we know before the end it's too long if self.length.end() < &self.read { - return Poll::Ready(Some(Err(Error::BadRequest( - "File size does not match policy".to_owned(), + return Poll::Ready(Some(Err(Error::bad_request( + "File size does not match policy", )))); } } Poll::Ready(None) => { if !self.length.contains(&self.read) { - return Poll::Ready(Some(Err(Error::BadRequest( - "File size does not match policy".to_owned(), + return Poll::Ready(Some(Err(Error::bad_request( + "File size does not match policy", )))); } } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 89aa8d84..8b06ef3f 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -19,7 +19,7 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::error::*; +use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; @@ -183,8 +183,8 @@ fn ensure_checksum_matches( ) -> Result<(), Error> { if let Some(expected_sha256) = content_sha256 { if expected_sha256 != data_sha256sum { - return Err(Error::BadRequest( - "Unable to validate x-amz-content-sha256".to_string(), + return Err(Error::bad_request( + "Unable to validate x-amz-content-sha256", )); } else { trace!("Successfully validated x-amz-content-sha256"); @@ -192,9 +192,7 @@ fn ensure_checksum_matches( } if let Some(expected_md5) = content_md5 { if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { - return Err(Error::BadRequest( - "Unable to validate content-md5".to_string(), - )); + return Err(Error::bad_request("Unable to validate content-md5")); } else { trace!("Successfully validated content-md5"); } @@ -428,7 +426,7 @@ pub async fn handle_put_part( // Check part hasn't already been uploaded if let Some(v) = version { if v.has_part_number(part_number) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Part number {} has already been uploaded", part_number ))); @@ -513,7 +511,7 @@ pub async fn handle_complete_multipart_upload( let version = version.ok_or(Error::NoSuchKey)?; if version.blocks.is_empty() { - return Err(Error::BadRequest("No data was uploaded".to_string())); + return Err(Error::bad_request("No data was uploaded")); } let headers = match object_version.state { @@ -574,8 +572,8 @@ pub async fn handle_complete_multipart_upload( .map(|x| x.part_number) .eq(block_parts.into_iter()); if !same_parts { - return Err(Error::BadRequest( - "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(), + return Err(Error::bad_request( + "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again." )); } diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index 0525c649..44f581ff 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -1,5 +1,3 @@ -use crate::error::{Error, OkOrBadRequest}; - use std::borrow::Cow; use hyper::header::HeaderValue; @@ -7,6 +5,7 @@ use hyper::{HeaderMap, Method, Request}; use crate::helpers::Authorization; use crate::router_macros::{generateQueryParameters, router_match}; +use crate::s3::error::*; router_match! {@func @@ -343,7 +342,7 @@ impl Endpoint { Method::POST => Self::from_post(key, &mut query)?, Method::PUT => Self::from_put(key, &mut query, req.headers())?, Method::DELETE => Self::from_delete(key, &mut query)?, - _ => return Err(Error::BadRequest("Unknown method".to_owned())), + _ => return Err(Error::bad_request("Unknown method")), }; if let Some(message) = query.nonempty_message() { diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 561130dc..77738971 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -4,13 +4,12 @@ use std::sync::Arc; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::error::*; +use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::*; use garage_model::garage::Garage; -use garage_table::*; use garage_util::data::*; pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> { @@ -47,14 +46,11 @@ pub async fn handle_delete_website( bucket_id: Uuid, ) -> Result<Response<Body>, Error> { let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); param.website_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -77,14 +73,11 @@ pub async fn handle_put_website( } let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; @@ -176,8 +169,8 @@ impl WebsiteConfiguration { || self.index_document.is_some() || self.routing_rules.is_some()) { - return Err(Error::BadRequest( - "Bad XML: can't have RedirectAllRequestsTo and other fields".to_owned(), + return Err(Error::bad_request( + "Bad XML: can't have RedirectAllRequestsTo and other fields", )); } if let Some(ref ed) = self.error_document { @@ -222,8 +215,8 @@ impl WebsiteConfiguration { impl Key { pub fn validate(&self) -> Result<(), Error> { if self.key.0.is_empty() { - Err(Error::BadRequest( - "Bad XML: error document specified but empty".to_owned(), + Err(Error::bad_request( + "Bad XML: error document specified but empty", )) } else { Ok(()) @@ -234,8 +227,8 @@ impl Key { impl Suffix { pub fn validate(&self) -> Result<(), Error> { if self.suffix.0.is_empty() | self.suffix.0.contains('/') { - Err(Error::BadRequest( - "Bad XML: index document is empty or contains /".to_owned(), + Err(Error::bad_request( + "Bad XML: index document is empty or contains /", )) } else { Ok(()) @@ -247,7 +240,7 @@ impl Target { pub fn validate(&self) -> Result<(), Error> { if let Some(ref protocol) = self.protocol { if protocol.0 != "http" && protocol.0 != "https" { - return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned())); + return Err(Error::bad_request("Bad XML: invalid protocol")); } } Ok(()) @@ -269,19 +262,19 @@ impl Redirect { pub fn validate(&self, has_prefix: bool) -> Result<(), Error> { if self.replace_prefix.is_some() { if self.replace_full.is_some() { - return Err(Error::BadRequest( - "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set".to_owned(), + return Err(Error::bad_request( + "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set", )); } if !has_prefix { - return Err(Error::BadRequest( - "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't".to_owned(), + return Err(Error::bad_request( + "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't", )); } } if let Some(ref protocol) = self.protocol { if protocol.0 != "http" && protocol.0 != "https" { - return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned())); + return Err(Error::bad_request("Bad XML: invalid protocol")); } } // TODO there are probably more invalide cases, but which ones? diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs index 75ec4559..111657a0 100644 --- a/src/api/s3/xml.rs +++ b/src/api/s3/xml.rs @@ -1,7 +1,7 @@ use quick_xml::se::to_string; use serde::{Deserialize, Serialize, Serializer}; -use crate::Error as ApiError; +use crate::s3::error::Error as ApiError; pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> { let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string(); diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs new file mode 100644 index 00000000..3ef5cdcd --- /dev/null +++ b/src/api/signature/error.rs @@ -0,0 +1,36 @@ +use err_derive::Error; + +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "{}", _0)] + /// Error from common error + CommonError(CommonError), + + /// Authorization Header Malformed + #[error(display = "Authorization header malformed, expected scope: {}", _0)] + AuthorizationHeaderMalformed(String), + + // Category: bad request + /// The request contained an invalid UTF-8 sequence in its path or in other parameters + #[error(display = "Invalid UTF-8: {}", _0)] + InvalidUtf8Str(#[error(source)] std::str::Utf8Error), + + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), +} + +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::CommonError(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index 5646f4fa..dd5b590c 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -4,11 +4,12 @@ use sha2::Sha256; use garage_util::data::{sha256sum, Hash}; -use crate::error::*; - +pub mod error; pub mod payload; pub mod streaming; +use error::*; + pub const SHORT_DATE: &str = "%Y%m%d"; pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; @@ -16,7 +17,7 @@ type HmacSha256 = Hmac<Sha256>; pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest( + return Err(Error::bad_request( "Request content hash does not match signed hash".to_string(), )); } diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 9137dd2d..4c7934e5 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -15,7 +15,7 @@ use super::LONG_DATETIME; use super::{compute_scope, signing_hmac}; use crate::encoding::uri_encode; -use crate::error::*; +use crate::signature::error::*; pub async fn check_payload_signature( garage: &Garage, @@ -105,7 +105,7 @@ fn parse_authorization( let (auth_kind, rest) = authorization.split_at(first_space); if auth_kind != "AWS4-HMAC-SHA256" { - return Err(Error::BadRequest("Unsupported authorization method".into())); + return Err(Error::bad_request("Unsupported authorization method")); } let mut auth_params = HashMap::new(); @@ -129,10 +129,11 @@ fn parse_authorization( let date = headers .get("x-amz-date") .ok_or_bad_request("Missing X-Amz-Date field") + .map_err(Error::from) .and_then(|d| parse_date(d))?; if Utc::now() - date > Duration::hours(24) { - return Err(Error::BadRequest("Date is too old".to_string())); + return Err(Error::bad_request("Date is too old".to_string())); } let auth = Authorization { @@ -156,7 +157,7 @@ fn parse_query_authorization( headers: &HashMap<String, String>, ) -> Result<Authorization, Error> { if algorithm != "AWS4-HMAC-SHA256" { - return Err(Error::BadRequest( + return Err(Error::bad_request( "Unsupported authorization method".to_string(), )); } @@ -179,10 +180,10 @@ fn parse_query_authorization( .get("x-amz-expires") .ok_or_bad_request("X-Amz-Expires not found in query parameters")? .parse() - .map_err(|_| Error::BadRequest("X-Amz-Expires is not a number".to_string()))?; + .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?; if duration > 7 * 24 * 3600 { - return Err(Error::BadRequest( + return Err(Error::bad_request( "X-Amz-Exprires may not exceed a week".to_string(), )); } @@ -190,10 +191,11 @@ fn parse_query_authorization( let date = headers .get("x-amz-date") .ok_or_bad_request("Missing X-Amz-Date field") + .map_err(Error::from) .and_then(|d| parse_date(d))?; if Utc::now() - date > Duration::seconds(duration) { - return Err(Error::BadRequest("Date is too old".to_string())); + return Err(Error::bad_request("Date is too old".to_string())); } Ok(Authorization { @@ -301,7 +303,7 @@ pub async fn verify_v4( .get(&EmptyKey, &key_id) .await? .filter(|k| !k.state.is_deleted()) - .ok_or_else(|| Error::Forbidden(format!("No such key: {}", &key_id)))?; + .ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?; let key_p = key.params().unwrap(); let mut hmac = signing_hmac( @@ -314,7 +316,7 @@ pub async fn verify_v4( hmac.update(payload); let our_signature = hex::encode(hmac.finalize().into_bytes()); if signature != our_signature { - return Err(Error::Forbidden("Invalid signature".to_string())); + return Err(Error::forbidden("Invalid signature".to_string())); } Ok(key) diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index ded9d993..c8358c4f 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -12,7 +12,7 @@ use garage_util::data::Hash; use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; -use crate::error::*; +use crate::signature::error::*; pub fn parse_streaming_body( api_key: &Key, @@ -87,7 +87,7 @@ fn compute_streaming_payload_signature( let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); - Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") + Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?) } mod payload { @@ -163,10 +163,10 @@ impl From<SignedPayloadStreamError> for Error { match err { SignedPayloadStreamError::Stream(e) => e, SignedPayloadStreamError::InvalidSignature => { - Error::BadRequest("Invalid payload signature".into()) + Error::bad_request("Invalid payload signature") } SignedPayloadStreamError::Message(e) => { - Error::BadRequest(format!("Chunk format error: {}", e)) + Error::bad_request(format!("Chunk format error: {}", e)) } } } diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 3b69d7bc..902f67f8 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -27,10 +27,8 @@ garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } -garage_admin = { version = "0.7.0", path = "../admin" } bytes = "1.0" -git-version = "0.3.4" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" @@ -54,6 +52,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi #netapp = { version = "0.4", path = "../../../netapp" } netapp = "0.4" +opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } +opentelemetry-prometheus = "0.10" +opentelemetry-otlp = "0.10" +prometheus = "0.13" + [dev-dependencies] aws-sdk-s3 = "0.8" chrono = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index af0c3f22..c1ba297b 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -261,6 +261,7 @@ impl AdminRpcHandler { async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); let bucket_id = helper .resolve_global_bucket_name(&query.existing_bucket) @@ -268,7 +269,7 @@ impl AdminRpcHandler { .ok_or_bad_request("Bucket not found")?; if let Some(key_pattern) = &query.local { - let key = helper.get_existing_matching_key(key_pattern).await?; + let key = key_helper.get_existing_matching_key(key_pattern).await?; helper .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name) @@ -290,9 +291,10 @@ impl AdminRpcHandler { async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); if let Some(key_pattern) = &query.local { - let key = helper.get_existing_matching_key(key_pattern).await?; + let key = key_helper.get_existing_matching_key(key_pattern).await?; let bucket_id = key .state @@ -331,12 +333,15 @@ impl AdminRpcHandler { async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); let bucket_id = helper .resolve_global_bucket_name(&query.bucket) .await? .ok_or_bad_request("Bucket not found")?; - let key = helper.get_existing_matching_key(&query.key_pattern).await?; + let key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; let allow_read = query.read || key.allow_read(&bucket_id); let allow_write = query.write || key.allow_write(&bucket_id); @@ -363,12 +368,15 @@ impl AdminRpcHandler { async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); let bucket_id = helper .resolve_global_bucket_name(&query.bucket) .await? .ok_or_bad_request("Bucket not found")?; - let key = helper.get_existing_matching_key(&query.key_pattern).await?; + let key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; let allow_read = !query.read && key.allow_read(&bucket_id); let allow_write = !query.write && key.allow_write(&bucket_id); @@ -469,7 +477,7 @@ impl AdminRpcHandler { async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> { let key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; self.key_info_result(key).await @@ -484,7 +492,7 @@ impl AdminRpcHandler { async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> { let mut key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; key.params_mut() @@ -496,9 +504,11 @@ impl AdminRpcHandler { } async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); - let mut key = helper.get_existing_matching_key(&query.key_pattern).await?; + let mut key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; if !query.yes { return Err(Error::BadRequest( @@ -506,32 +516,7 @@ impl AdminRpcHandler { )); } - let state = key.state.as_option_mut().unwrap(); - - // --- done checking, now commit --- - // (the step at unset_local_bucket_alias will fail if a bucket - // does not have another alias, the deletion will be - // interrupted in the middle if that happens) - - // 1. Delete local aliases - for (alias, _, to) in state.local_aliases.items().iter() { - if let Some(bucket_id) = to { - helper - .unset_local_bucket_alias(*bucket_id, &key.key_id, alias) - .await?; - } - } - - // 2. Remove permissions on all authorized buckets - for (ab_id, _auth) in state.authorized_buckets.items().iter() { - helper - .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS) - .await?; - } - - // 3. Actually delete key - key.state = Deletable::delete(); - self.garage.key_table.insert(&key).await?; + key_helper.delete_key(&mut key).await?; Ok(AdminRpc::Ok(format!( "Key {} was deleted successfully.", @@ -542,7 +527,7 @@ impl AdminRpcHandler { async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> { let mut key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; if query.create_bucket { @@ -555,7 +540,7 @@ impl AdminRpcHandler { async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> { let mut key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; if query.create_bucket { @@ -696,11 +681,7 @@ impl AdminRpcHandler { writeln!( &mut ret, "\nGarage version: {}", - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - )) + self.garage.system.garage_version(), ) .unwrap(); diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 88941d78..cdd3869b 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,5 +1,4 @@ use garage_util::crdt::Crdt; -use garage_util::data::*; use garage_util::error::*; use garage_rpc::layout::*; @@ -211,31 +210,9 @@ pub async fn cmd_apply_layout( rpc_host: NodeID, apply_opt: ApplyLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match apply_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.roles.merge(&layout.staging); - - if !layout.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); - - layout.version += 1; + let layout = layout.apply_staged_changes(apply_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -250,25 +227,9 @@ pub async fn cmd_revert_layout( rpc_host: NodeID, revert_opt: RevertLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match revert_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.version += 1; + let layout = layout.revert_staged_changes(revert_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; diff --git a/src/garage/main.rs b/src/garage/main.rs index e898e680..bd09b6ea 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -8,6 +8,7 @@ mod admin; mod cli; mod repair; mod server; +mod tracing_setup; use std::net::SocketAddr; use std::path::PathBuf; @@ -141,6 +142,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await { Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))), Err(HelperError::BadRequest(b)) => Err(Error::Message(b)), + Err(e) => Err(Error::Message(format!("{}", e))), Ok(x) => Ok(x), } } diff --git a/src/garage/server.rs b/src/garage/server.rs index 24bb25b3..b58ad286 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -6,8 +6,7 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_admin::metrics::*; -use garage_admin::tracing_setup::*; +use garage_api::admin::api_server::AdminApiServer; use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::run_web_server; @@ -16,6 +15,7 @@ use garage_web::run_web_server; use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; +use crate::tracing_setup::*; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { @@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .open() .expect("Unable to open sled DB"); - info!("Initialize admin web server and metric backend..."); - let admin_server_init = AdminServer::init(); - info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); @@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { init_tracing(&export_to, garage.system.id)?; } + info!("Initialize Admin API server and metrics collector..."); + let admin_server = AdminApiServer::new(garage.clone()); + let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); @@ -80,39 +80,41 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { wait_from(watch_cancel.clone()), )); - let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr { - info!("Configure and run admin web server..."); - Some(tokio::spawn( - admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())), - )) - } else { - None - }; + info!("Launching Admin API server..."); + let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone()))); // Stuff runs // When a cancel signal is sent, stuff stops if let Err(e) = s3_api_server.await? { warn!("S3 API server exited with error: {}", e); + } else { + info!("S3 API server exited without error."); } #[cfg(feature = "k2v")] if let Err(e) = k2v_api_server.await? { warn!("K2V API server exited with error: {}", e); + } else { + info!("K2V API server exited without error."); } if let Err(e) = web_server.await? { warn!("Web server exited with error: {}", e); + } else { + info!("Web server exited without error."); } - if let Some(a) = admin_server { - if let Err(e) = a.await? { - warn!("Admin web server exited with error: {}", e); - } + if let Err(e) = admin_server.await? { + warn!("Admin web server exited with error: {}", e); + } else { + info!("Admin API server exited without error."); } // Remove RPC handlers for system to break reference cycles garage.system.netapp.drop_all_handlers(); + opentelemetry::global::shutdown_tracer_provider(); // Await for netapp RPC system to end run_system.await?; + info!("Netapp exited"); // Drop all references so that stuff can terminate properly drop(garage); diff --git a/src/admin/tracing_setup.rs b/src/garage/tracing_setup.rs index 55fc4094..55fc4094 100644 --- a/src/admin/tracing_setup.rs +++ b/src/garage/tracing_setup.rs diff --git a/src/model/garage.rs b/src/model/garage.rs index 03e21f8a..2f99bd68 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -191,6 +191,10 @@ impl Garage { pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } + + pub fn key_helper(&self) -> helper::key::KeyHelper { + helper::key::KeyHelper(self) + } } #[cfg(feature = "k2v")] diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 54d2f97b..734cb40e 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -8,7 +8,8 @@ use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::error::*; -use crate::key_table::{Key, KeyFilter}; +use crate::helper::key::KeyHelper; +use crate::key_table::*; use crate::permission::BucketKeyPerm; pub struct BucketHelper<'a>(pub(crate) &'a Garage); @@ -49,6 +50,23 @@ impl<'a> BucketHelper<'a> { } } + #[allow(clippy::ptr_arg)] + pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result<Uuid, Error> { + let api_key_params = api_key + .state + .as_option() + .ok_or_message("Key should not be deleted at this point")?; + + if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { + Ok(*bucket_id) + } else { + Ok(self + .resolve_global_bucket_name(bucket_name) + .await? + .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?) + } + } + /// Returns a Bucket if it is present in bucket table, /// even if it is in deleted state. Querying a non-existing /// bucket ID returns an internal error. @@ -71,64 +89,7 @@ impl<'a> BucketHelper<'a> { .get(&EmptyKey, &bucket_id) .await? .filter(|b| !b.is_deleted()) - .ok_or_bad_request(format!( - "Bucket {:?} does not exist or has been deleted", - bucket_id - )) - } - - /// Returns a Key if it is present in key table, - /// even if it is in deleted state. Querying a non-existing - /// key ID returns an internal error. - pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> { - Ok(self - .0 - .key_table - .get(&EmptyKey, key_id) - .await? - .ok_or_message(format!("Key {} does not exist", key_id))?) - } - - /// Returns a Key if it is present in key table, - /// only if it is in non-deleted state. - /// Querying a non-existing key ID or a deleted key - /// returns a bad request error. - pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> { - self.0 - .key_table - .get(&EmptyKey, key_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id)) - } - - /// Returns a Key if it is present in key table, - /// looking it up by key ID or by a match on its name, - /// only if it is in non-deleted state. - /// Querying a non-existing key ID or a deleted key - /// returns a bad request error. - pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> { - let candidates = self - .0 - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), - 10, - EnumerationOrder::Forward, - ) - .await? - .into_iter() - .collect::<Vec<_>>(); - if candidates.len() != 1 { - Err(Error::BadRequest(format!( - "{} matching keys", - candidates.len() - ))) - } else { - Ok(candidates.into_iter().next().unwrap()) - } + .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id))) } /// Sets a new alias for a bucket in global namespace. @@ -142,10 +103,7 @@ impl<'a> BucketHelper<'a> { alias_name: &String, ) -> Result<(), Error> { if !is_valid_bucket_name(alias_name) { - return Err(Error::BadRequest(format!( - "{}: {}", - alias_name, INVALID_BUCKET_NAME_MESSAGE - ))); + return Err(Error::InvalidBucketName(alias_name.to_string())); } let mut bucket = self.get_existing_bucket(bucket_id).await?; @@ -176,7 +134,7 @@ impl<'a> BucketHelper<'a> { let alias = match alias { None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id)) - .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?, + .ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?, Some(mut a) => { a.state = Lww::raw(alias_ts, Some(bucket_id)); a @@ -264,7 +222,7 @@ impl<'a> BucketHelper<'a> { .bucket_alias_table .get(&EmptyKey, alias_name) .await? - .ok_or_message(format!("Alias {} not found", alias_name))?; + .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?; // Checks ok, remove alias let alias_ts = match bucket.state.as_option() { @@ -303,15 +261,14 @@ impl<'a> BucketHelper<'a> { key_id: &String, alias_name: &String, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + if !is_valid_bucket_name(alias_name) { - return Err(Error::BadRequest(format!( - "{}: {}", - alias_name, INVALID_BUCKET_NAME_MESSAGE - ))); + return Err(Error::InvalidBucketName(alias_name.to_string())); } let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut key = self.get_existing_key(key_id).await?; + let mut key = key_helper.get_existing_key(key_id).await?; let mut key_param = key.state.as_option_mut().unwrap(); @@ -360,8 +317,10 @@ impl<'a> BucketHelper<'a> { key_id: &String, alias_name: &String, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut key = self.get_existing_key(key_id).await?; + let mut key = key_helper.get_existing_key(key_id).await?; let mut bucket_p = bucket.state.as_option_mut().unwrap(); @@ -429,8 +388,10 @@ impl<'a> BucketHelper<'a> { key_id: &String, mut perm: BucketKeyPerm, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + let mut bucket = self.get_internal_bucket(bucket_id).await?; - let mut key = self.get_internal_key(key_id).await?; + let mut key = key_helper.get_internal_key(key_id).await?; if let Some(bstate) = bucket.state.as_option() { if let Some(kp) = bstate.authorized_keys.get(key_id) { diff --git a/src/model/helper/error.rs b/src/model/helper/error.rs index 30b2ba32..3ca8f55c 100644 --- a/src/model/helper/error.rs +++ b/src/model/helper/error.rs @@ -10,6 +10,16 @@ pub enum Error { #[error(display = "Bad request: {}", _0)] BadRequest(String), + + /// Bucket name is not valid according to AWS S3 specs + #[error(display = "Invalid bucket name: {}", _0)] + InvalidBucketName(String), + + #[error(display = "Access key not found: {}", _0)] + NoSuchAccessKey(String), + + #[error(display = "Bucket not found: {}", _0)] + NoSuchBucket(String), } impl From<netapp::error::Error> for Error { diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs new file mode 100644 index 00000000..c1a8e974 --- /dev/null +++ b/src/model/helper/key.rs @@ -0,0 +1,102 @@ +use garage_table::util::*; +use garage_util::crdt::*; +use garage_util::error::OkOrMessage; + +use crate::garage::Garage; +use crate::helper::bucket::BucketHelper; +use crate::helper::error::*; +use crate::key_table::{Key, KeyFilter}; +use crate::permission::BucketKeyPerm; + +pub struct KeyHelper<'a>(pub(crate) &'a Garage); + +#[allow(clippy::ptr_arg)] +impl<'a> KeyHelper<'a> { + /// Returns a Key if it is present in key table, + /// even if it is in deleted state. Querying a non-existing + /// key ID returns an internal error. + pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> { + Ok(self + .0 + .key_table + .get(&EmptyKey, key_id) + .await? + .ok_or_message(format!("Key {} does not exist", key_id))?) + } + + /// Returns a Key if it is present in key table, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> { + self.0 + .key_table + .get(&EmptyKey, key_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string())) + } + + /// Returns a Key if it is present in key table, + /// looking it up by key ID or by a match on its name, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> { + let candidates = self + .0 + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), + 10, + EnumerationOrder::Forward, + ) + .await? + .into_iter() + .collect::<Vec<_>>(); + if candidates.len() != 1 { + Err(Error::BadRequest(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } + } + + /// Deletes an API access key + pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> { + let bucket_helper = BucketHelper(self.0); + + let state = key.state.as_option_mut().unwrap(); + + // --- done checking, now commit --- + // (the step at unset_local_bucket_alias will fail if a bucket + // does not have another alias, the deletion will be + // interrupted in the middle if that happens) + + // 1. Delete local aliases + for (alias, _, to) in state.local_aliases.items().iter() { + if let Some(bucket_id) = to { + bucket_helper + .unset_local_bucket_alias(*bucket_id, &key.key_id, alias) + .await?; + } + } + + // 2. Remove permissions on all authorized buckets + for (ab_id, _auth) in state.authorized_buckets.items().iter() { + bucket_helper + .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + + // 3. Actually delete key + key.state = Deletable::delete(); + self.0.key_table.insert(key).await?; + + Ok(()) + } +} diff --git a/src/model/helper/mod.rs b/src/model/helper/mod.rs index 2f4e8898..dd947c86 100644 --- a/src/model/helper/mod.rs +++ b/src/model/helper/mod.rs @@ -1,2 +1,3 @@ pub mod bucket; pub mod error; +pub mod key; diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index bed7f44a..73328993 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,11 +15,11 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.7.0", path = "../util" } -garage_admin = { version = "0.7.0", path = "../admin" } arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" +git-version = "0.3.4" hex = "0.4" tracing = "0.1.30" rand = "0.8" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b9c02c21..f517f36f 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::error::*; use crate::ring::*; @@ -100,6 +101,61 @@ impl ClusterLayout { } } + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.roles.merge(&self.staging); + self.roles.retain(|(_, _, v)| v.0.is_some()); + + if !self.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + + pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + /// Returns a list of IDs of nodes that currently have /// a role in the cluster pub fn node_ids(&self) -> &[Uuid] { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68d94ea5..eb2f2e42 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -312,6 +312,83 @@ impl System { ); } + // ---- Administrative operations (directly available and + // also available through RPC) ---- + + pub fn garage_version(&self) -> &'static str { + option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + )) + } + + pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { + let node_status = self.node_status.read().unwrap(); + let known_nodes = self + .fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), + status: node_status + .get(&n.id.into()) + .cloned() + .map(|(_, st)| st) + .unwrap_or(NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + }), + }) + .collect::<Vec<_>>(); + known_nodes + } + + pub fn get_cluster_layout(&self) -> ClusterLayout { + self.ring.borrow().layout.clone() + } + + pub async fn update_cluster_layout( + self: &Arc<Self>, + layout: &ClusterLayout, + ) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub async fn connect(&self, node: &str) -> Result<(), Error> { + let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self + .netapp + .clone() + .try_connect(*ip, pubkey) + .await + .err_context(CONNECT_ERROR_MESSAGE) + { + Ok(()) => return Ok(()), + Err(e) => { + errors.push((*ip, e)); + } + } + } + return Err(Error::Message(format!( + "Could not connect to specified peers. Errors: {:?}", + errors + ))); + } + // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { @@ -384,32 +461,11 @@ impl System { self.local_status.swap(Arc::new(new_si)); } + // --- RPC HANDLERS --- + async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; - let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { - Ok(()) => return Ok(SystemRpc::Ok), - Err(e) => { - errors.push((*ip, e)); - } - } - } - return Err(Error::Message(format!( - "Could not connect to specified peers. Errors: {:?}", - errors - ))); + self.connect(node).await?; + Ok(SystemRpc::Ok) } fn handle_pull_cluster_layout(&self) -> SystemRpc { @@ -418,28 +474,7 @@ impl System { } fn handle_get_known_nodes(&self) -> SystemRpc { - let node_status = self.node_status.read().unwrap(); - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| KnownNodeInfo { - id: n.id.into(), - addr: n.addr, - is_up: n.is_up(), - last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), - status: node_status - .get(&n.id.into()) - .cloned() - .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), - }) - .collect::<Vec<_>>(); + let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) } @@ -476,7 +511,7 @@ impl System { } async fn handle_advertise_cluster_layout( - self: Arc<Self>, + self: &Arc<Self>, adv: &ClusterLayout, ) -> Result<SystemRpc, Error> { let update_ring = self.update_ring.lock().await; diff --git a/src/util/config.rs b/src/util/config.rs index 4d66bfe4..99ebce31 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -121,6 +121,10 @@ pub struct WebConfig { pub struct AdminConfig { /// Address and port to bind for admin API serving pub api_bind_addr: Option<SocketAddr>, + /// Bearer token to use to scrape metrics + pub metrics_token: Option<String>, + /// Bearer token to use to access Admin API endpoints + pub admin_token: Option<String>, /// OTLP server to where to export traces pub trace_sink: Option<String>, } diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs index c155c3a8..91d24c7f 100644 --- a/src/util/crdt/lww_map.rs +++ b/src/util/crdt/lww_map.rs @@ -140,6 +140,11 @@ where self.vals.clear(); } + /// Retain only values that match a certain predicate + pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) { + self.vals.retain(pred); + } + /// Get a reference to the value assigned to a key pub fn get(&self, k: &K) -> Option<&V> { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { diff --git a/src/web/error.rs b/src/web/error.rs index 55990e9d..bd8f17b5 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -2,57 +2,47 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use garage_util::error::Error as GarageError; +use garage_api::generic_server::ApiError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { /// An error received from the API crate #[error(display = "API error: {}", _0)] - ApiError(#[error(source)] garage_api::Error), - - // Category: internal error - /// Error internal to garage - #[error(display = "Internal error: {}", _0)] - InternalError(#[error(source)] GarageError), + ApiError(garage_api::s3::error::Error), /// The file does not exist #[error(display = "Not found")] NotFound, - /// The request contained an invalid UTF-8 sequence in its path or in other parameters - #[error(display = "Invalid UTF-8: {}", _0)] - InvalidUtf8(#[error(source)] std::str::Utf8Error), - - /// The client send a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client sent a request without host, or with unsupported method #[error(display = "Bad request: {}", _0)] BadRequest(String), } +impl<T> From<T> for Error +where + garage_api::s3::error::Error: From<T>, +{ + fn from(err: T) -> Self { + Error::ApiError(garage_api::s3::error::Error::from(err)) + } +} + impl Error { /// Transform errors into http status code pub fn http_status_code(&self) -> StatusCode { match self { Error::NotFound => StatusCode::NOT_FOUND, Error::ApiError(e) => e.http_status_code(), - Error::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => StatusCode::SERVICE_UNAVAILABLE, - Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, - _ => StatusCode::BAD_REQUEST, + Error::BadRequest(_) => StatusCode::BAD_REQUEST, } } pub fn add_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { #[allow(clippy::single_match)] match self { - Error::ApiError(e) => e.add_headers(header_map), + Error::ApiError(e) => e.add_http_headers(header_map), _ => (), } } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 867adc51..c30d8957 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -18,9 +18,11 @@ use opentelemetry::{ use crate::error::*; -use garage_api::error::{Error as ApiError, OkOrBadRequest, OkOrInternalError}; use garage_api::helpers::{authority_to_host, host_to_bucket}; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; +use garage_api::s3::error::{ + CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, +}; use garage_api::s3::get::{handle_get, handle_head}; use garage_model::garage::Garage; @@ -207,7 +209,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response Method::OPTIONS => handle_options_for_bucket(req, &bucket), Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await, Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await, - _ => Err(ApiError::BadRequest("HTTP method not supported".into())), + _ => Err(ApiError::bad_request("HTTP method not supported")), } .map_err(Error::from); @@ -290,9 +292,7 @@ fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> { let path_utf8 = percent_encoding::percent_decode_str(path).decode_utf8()?; if !path_utf8.starts_with('/') { - return Err(Error::BadRequest( - "Path must start with a / (slash)".to_string(), - )); + return Err(Error::BadRequest("Path must start with a / (slash)".into())); } match path_utf8.chars().last() { |