diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 27 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 209 | ||||
-rw-r--r-- | src/api/admin/bucket.rs | 580 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 193 | ||||
-rw-r--r-- | src/api/admin/error.rs | 97 | ||||
-rw-r--r-- | src/api/admin/key.rs | 256 | ||||
-rw-r--r-- | src/api/admin/mod.rs | 7 | ||||
-rw-r--r-- | src/api/admin/router.rs | 145 | ||||
-rw-r--r-- | src/api/api_server.rs | 645 | ||||
-rw-r--r-- | src/api/common_error.rs | 177 | ||||
-rw-r--r-- | src/api/generic_server.rs | 211 | ||||
-rw-r--r-- | src/api/helpers.rs | 191 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 190 | ||||
-rw-r--r-- | src/api/k2v/batch.rs | 363 | ||||
-rw-r--r-- | src/api/k2v/error.rs | 135 | ||||
-rw-r--r-- | src/api/k2v/index.rs | 100 | ||||
-rw-r--r-- | src/api/k2v/item.rs | 230 | ||||
-rw-r--r-- | src/api/k2v/mod.rs | 9 | ||||
-rw-r--r-- | src/api/k2v/range.rs | 100 | ||||
-rw-r--r-- | src/api/k2v/router.rs | 252 | ||||
-rw-r--r-- | src/api/lib.rs | 26 | ||||
-rw-r--r-- | src/api/router_macros.rs | 213 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 390 | ||||
-rw-r--r-- | src/api/s3/bucket.rs (renamed from src/api/s3_bucket.rs) | 21 | ||||
-rw-r--r-- | src/api/s3/copy.rs (renamed from src/api/s3_copy.rs) | 66 | ||||
-rw-r--r-- | src/api/s3/cors.rs (renamed from src/api/s3_cors.rs) | 36 | ||||
-rw-r--r-- | src/api/s3/delete.rs (renamed from src/api/s3_delete.rs) | 21 | ||||
-rw-r--r-- | src/api/s3/error.rs (renamed from src/api/error.rs) | 243 | ||||
-rw-r--r-- | src/api/s3/get.rs (renamed from src/api/s3_get.rs) | 180 | ||||
-rw-r--r-- | src/api/s3/list.rs (renamed from src/api/s3_list.rs) | 106 | ||||
-rw-r--r-- | src/api/s3/mod.rs | 15 | ||||
-rw-r--r-- | src/api/s3/post_object.rs (renamed from src/api/s3_post_object.rs) | 89 | ||||
-rw-r--r-- | src/api/s3/put.rs (renamed from src/api/s3_put.rs) | 217 | ||||
-rw-r--r-- | src/api/s3/router.rs (renamed from src/api/s3_router.rs) | 225 | ||||
-rw-r--r-- | src/api/s3/website.rs (renamed from src/api/s3_website.rs) | 51 | ||||
-rw-r--r-- | src/api/s3/xml.rs (renamed from src/api/s3_xml.rs) | 44 | ||||
-rw-r--r-- | src/api/signature/error.rs | 36 | ||||
-rw-r--r-- | src/api/signature/mod.rs | 30 | ||||
-rw-r--r-- | src/api/signature/payload.rs | 35 | ||||
-rw-r--r-- | src/api/signature/streaming.rs | 71 |
40 files changed, 4787 insertions, 1445 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5e96b081..7c3ed43b 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,28 +14,31 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.7.0", path = "../model" } -garage_table = { version = "0.7.0", path = "../table" } -garage_block = { version = "0.7.0", path = "../block" } -garage_util = { version = "0.7.0", path = "../util" } +garage_model = { version = "0.8.0", path = "../model" } +garage_table = { version = "0.8.0", path = "../table" } +garage_block = { version = "0.8.0", path = "../block" } +garage_util = { version = "0.8.0", path = "../util" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +async-trait = "0.1.7" base64 = "0.13" bytes = "1.0" chrono = "0.4" -crypto-mac = "0.10" +crypto-common = "0.1" err-derive = "0.3" hex = "0.4" -hmac = "0.10" +hmac = "0.12" idna = "0.2" tracing = "0.1.30" -md-5 = "0.9" +md-5 = "0.10" nom = "7.1" -sha2 = "0.9" +sha2 = "0.10" futures = "0.3" futures-util = "0.3" pin-project = "1.0" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-stream = "0.1" form_urlencoded = "1.0.0" http = "0.2" @@ -52,3 +55,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] } url = "2.1" opentelemetry = "0.17" +opentelemetry-prometheus = { version = "0.10", optional = true } +prometheus = { version = "0.13", optional = true } + +[features] +k2v = [ "garage_util/k2v", "garage_model/k2v" ] +metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs new file mode 100644 index 00000000..0816bda1 --- /dev/null +++ b/src/api/admin/api_server.rs @@ -0,0 +1,209 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; +use hyper::{Body, Request, Response}; + +use opentelemetry::trace::SpanRef; + +#[cfg(feature = "metrics")] +use opentelemetry_prometheus::PrometheusExporter; +#[cfg(feature = "metrics")] +use prometheus::{Encoder, TextEncoder}; + +use garage_model::garage::Garage; +use garage_util::error::Error as GarageError; + +use crate::generic_server::*; + +use crate::admin::bucket::*; +use crate::admin::cluster::*; +use crate::admin::error::*; +use crate::admin::key::*; +use crate::admin::router::{Authorization, Endpoint}; + +pub struct AdminApiServer { + garage: Arc<Garage>, + #[cfg(feature = "metrics")] + exporter: PrometheusExporter, + metrics_token: Option<String>, + admin_token: Option<String>, +} + +impl AdminApiServer { + pub fn new( + garage: Arc<Garage>, + #[cfg(feature = "metrics")] exporter: PrometheusExporter, + ) -> Self { + let cfg = &garage.config.admin; + let metrics_token = cfg + .metrics_token + .as_ref() + .map(|tok| format!("Bearer {}", tok)); + let admin_token = cfg + .admin_token + .as_ref() + .map(|tok| format!("Bearer {}", tok)); + Self { + garage, + #[cfg(feature = "metrics")] + exporter, + metrics_token, + admin_token, + } + } + + pub async fn run( + self, + bind_addr: SocketAddr, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let region = self.garage.config.s3_api.s3_region.clone(); + ApiServer::new(region, self) + .run_server(bind_addr, shutdown_signal) + .await + } + + fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { + Ok(Response::builder() + .status(204) + .header(ALLOW, "OPTIONS, GET, POST") + .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .body(Body::empty())?) + } + + fn handle_metrics(&self) -> Result<Response<Body>, Error> { + #[cfg(feature = "metrics")] + { + use opentelemetry::trace::Tracer; + + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + self.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(200) + .header(http::header::CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer))?) + } + #[cfg(not(feature = "metrics"))] + Err(Error::bad_request( + "Garage was built without the metrics feature".to_string(), + )) + } +} + +#[async_trait] +impl ApiHandler for AdminApiServer { + const API_NAME: &'static str = "admin"; + const API_NAME_DISPLAY: &'static str = "Admin"; + + type Endpoint = Endpoint; + type Error = Error; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> { + Endpoint::from_request(req) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: Endpoint, + ) -> Result<Response<Body>, Error> { + let expected_auth_header = + match endpoint.authorization_type() { + Authorization::MetricsToken => self.metrics_token.as_ref(), + Authorization::AdminToken => match &self.admin_token { + None => return Err(Error::forbidden( + "Admin token isn't configured, admin API access is disabled for security.", + )), + Some(t) => Some(t), + }, + }; + + if let Some(h) = expected_auth_header { + match req.headers().get("Authorization") { + None => return Err(Error::forbidden("Authorization token must be provided")), + Some(v) => { + let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false); + if !authorized { + return Err(Error::forbidden("Invalid authorization token provided")); + } + } + } + } + + match endpoint { + Endpoint::Options => self.handle_options(&req), + Endpoint::Metrics => self.handle_metrics(), + Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, + Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await, + // Layout + Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, + Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, + Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + // Keys + Endpoint::ListKeys => handle_list_keys(&self.garage).await, + Endpoint::GetKeyInfo { id, search } => { + handle_get_key_info(&self.garage, id, search).await + } + Endpoint::CreateKey => handle_create_key(&self.garage, req).await, + Endpoint::ImportKey => handle_import_key(&self.garage, req).await, + Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await, + Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await, + // Buckets + Endpoint::ListBuckets => handle_list_buckets(&self.garage).await, + Endpoint::GetBucketInfo { id, global_alias } => { + handle_get_bucket_info(&self.garage, id, global_alias).await + } + Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await, + Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await, + Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await, + // Bucket-key permissions + Endpoint::BucketAllowKey => { + handle_bucket_change_key_perm(&self.garage, req, true).await + } + Endpoint::BucketDenyKey => { + handle_bucket_change_key_perm(&self.garage, req, false).await + } + // Bucket aliasing + Endpoint::GlobalAliasBucket { id, alias } => { + handle_global_alias_bucket(&self.garage, id, alias).await + } + Endpoint::GlobalUnaliasBucket { id, alias } => { + handle_global_unalias_bucket(&self.garage, id, alias).await + } + Endpoint::LocalAliasBucket { + id, + access_key_id, + alias, + } => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await, + Endpoint::LocalUnaliasBucket { + id, + access_key_id, + alias, + } => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await, + } + } +} + +impl ApiEndpoint for Endpoint { + fn name(&self) -> &'static str { + Endpoint::name(self) + } + + fn add_span_attributes(&self, _span: SpanRef<'_>) {} +} diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs new file mode 100644 index 00000000..ac8a8a40 --- /dev/null +++ b/src/api/admin/bucket.rs @@ -0,0 +1,580 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::time::*; + +use garage_table::*; + +use garage_model::bucket_alias_table::*; +use garage_model::bucket_table::*; +use garage_model::garage::Garage; +use garage_model::permission::*; +use garage_model::s3::object_table::*; + +use crate::admin::error::*; +use crate::admin::key::ApiBucketKeyPerm; +use crate::common_error::CommonError; +use crate::helpers::{json_ok_response, parse_json_body}; + +pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let buckets = garage + .bucket_table + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) + .await?; + + let res = buckets + .into_iter() + .map(|b| { + let state = b.state.as_option().unwrap(); + ListBucketResultItem { + id: hex::encode(b.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + local_aliases: state + .local_aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|((k, n), _, _)| BucketLocalAlias { + access_key_id: k.to_string(), + alias: n.to_string(), + }) + .collect::<Vec<_>>(), + } + }) + .collect::<Vec<_>>(); + + Ok(json_ok_response(&res)?) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct ListBucketResultItem { + id: String, + global_aliases: Vec<String>, + local_aliases: Vec<BucketLocalAlias>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct BucketLocalAlias { + access_key_id: String, + alias: String, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ApiBucketQuotas { + max_size: Option<u64>, + max_objects: Option<u64>, +} + +pub async fn handle_get_bucket_info( + garage: &Arc<Garage>, + id: Option<String>, + global_alias: Option<String>, +) -> Result<Response<Body>, Error> { + let bucket_id = match (id, global_alias) { + (Some(id), None) => parse_bucket_id(&id)?, + (None, Some(ga)) => garage + .bucket_helper() + .resolve_global_bucket_name(&ga) + .await? + .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?, + _ => { + return Err(Error::bad_request( + "Either id or globalAlias must be provided (but not both)", + )); + } + }; + + bucket_info_results(garage, bucket_id).await +} + +async fn bucket_info_results( + garage: &Arc<Garage>, + bucket_id: Uuid, +) -> Result<Response<Body>, Error> { + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let counters = garage + .object_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .unwrap_or_default(); + + let mut relevant_keys = HashMap::new(); + for (k, _) in bucket + .state + .as_option() + .unwrap() + .authorized_keys + .items() + .iter() + { + if let Some(key) = garage + .key_table + .get(&EmptyKey, k) + .await? + .filter(|k| !k.is_deleted()) + { + if !key.state.is_deleted() { + relevant_keys.insert(k.clone(), key); + } + } + } + for ((k, _), _, _) in bucket + .state + .as_option() + .unwrap() + .local_aliases + .items() + .iter() + { + if relevant_keys.contains_key(k) { + continue; + } + if let Some(key) = garage.key_table.get(&EmptyKey, k).await? { + if !key.state.is_deleted() { + relevant_keys.insert(k.clone(), key); + } + } + } + + let state = bucket.state.as_option().unwrap(); + + let quotas = state.quotas.get(); + let res = + GetBucketInfoResult { + id: hex::encode(&bucket.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + website_access: state.website_config.get().is_some(), + website_config: state.website_config.get().clone().map(|wsc| { + GetBucketInfoWebsiteResult { + index_document: wsc.index_document, + error_document: wsc.error_document, + } + }), + keys: relevant_keys + .into_iter() + .map(|(_, key)| { + let p = key.state.as_option().unwrap(); + GetBucketInfoKey { + access_key_id: key.key_id, + name: p.name.get().to_string(), + permissions: p + .authorized_buckets + .get(&bucket.id) + .map(|p| ApiBucketKeyPerm { + read: p.allow_read, + write: p.allow_write, + owner: p.allow_owner, + }) + .unwrap_or_default(), + bucket_local_aliases: p + .local_aliases + .items() + .iter() + .filter(|(_, _, b)| *b == Some(bucket.id)) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + } + }) + .collect::<Vec<_>>(), + objects: counters.get(OBJECTS).cloned().unwrap_or_default(), + bytes: counters.get(BYTES).cloned().unwrap_or_default(), + unfinshed_uploads: counters + .get(UNFINISHED_UPLOADS) + .cloned() + .unwrap_or_default(), + quotas: ApiBucketQuotas { + max_size: quotas.max_size, + max_objects: quotas.max_objects, + }, + }; + + Ok(json_ok_response(&res)?) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GetBucketInfoResult { + id: String, + global_aliases: Vec<String>, + website_access: bool, + #[serde(default)] + website_config: Option<GetBucketInfoWebsiteResult>, + keys: Vec<GetBucketInfoKey>, + objects: i64, + bytes: i64, + unfinshed_uploads: i64, + quotas: ApiBucketQuotas, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GetBucketInfoWebsiteResult { + index_document: String, + error_document: Option<String>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GetBucketInfoKey { + access_key_id: String, + name: String, + permissions: ApiBucketKeyPerm, + bucket_local_aliases: Vec<String>, +} + +pub async fn handle_create_bucket( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<CreateBucketRequest>(req).await?; + + if let Some(ga) = &req.global_alias { + if !is_valid_bucket_name(ga) { + return Err(Error::bad_request(format!( + "{}: {}", + ga, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? { + if alias.state.get().is_some() { + return Err(CommonError::BucketAlreadyExists.into()); + } + } + } + + if let Some(la) = &req.local_alias { + if !is_valid_bucket_name(&la.alias) { + return Err(Error::bad_request(format!( + "{}: {}", + la.alias, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let key = garage + .key_helper() + .get_existing_key(&la.access_key_id) + .await?; + let state = key.state.as_option().unwrap(); + if matches!(state.local_aliases.get(&la.alias), Some(_)) { + return Err(Error::bad_request("Local alias already exists")); + } + } + + let bucket = Bucket::new(); + garage.bucket_table.insert(&bucket).await?; + + if let Some(ga) = &req.global_alias { + garage + .bucket_helper() + .set_global_bucket_alias(bucket.id, ga) + .await?; + } + + if let Some(la) = &req.local_alias { + garage + .bucket_helper() + .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias) + .await?; + + if la.allow.read || la.allow.write || la.allow.owner { + garage + .bucket_helper() + .set_bucket_key_permissions( + bucket.id, + &la.access_key_id, + BucketKeyPerm { + timestamp: now_msec(), + allow_read: la.allow.read, + allow_write: la.allow.write, + allow_owner: la.allow.owner, + }, + ) + .await?; + } + } + + bucket_info_results(garage, bucket.id).await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateBucketRequest { + global_alias: Option<String>, + local_alias: Option<CreateBucketLocalAlias>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateBucketLocalAlias { + access_key_id: String, + alias: String, + #[serde(default)] + allow: ApiBucketKeyPerm, +} + +pub async fn handle_delete_bucket( + garage: &Arc<Garage>, + id: String, +) -> Result<Response<Body>, Error> { + let helper = garage.bucket_helper(); + + let bucket_id = parse_bucket_id(&id)?; + + let mut bucket = helper.get_existing_bucket(bucket_id).await?; + let state = bucket.state.as_option().unwrap(); + + // Check bucket is empty + if !helper.is_bucket_empty(bucket_id).await? { + return Err(CommonError::BucketNotEmpty.into()); + } + + // --- done checking, now commit --- + // 1. delete authorization from keys that had access + for (key_id, perm) in bucket.authorized_keys() { + if perm.is_any() { + helper + .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + } + // 2. delete all local aliases + for ((key_id, alias), _, active) in state.local_aliases.items().iter() { + if *active { + helper + .unset_local_bucket_alias(bucket.id, key_id, alias) + .await?; + } + } + // 3. delete all global aliases + for (alias, _, active) in state.aliases.items().iter() { + if *active { + helper.purge_global_bucket_alias(bucket.id, alias).await?; + } + } + + // 4. delete bucket + bucket.state = Deletable::delete(); + garage.bucket_table.insert(&bucket).await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +pub async fn handle_update_bucket( + garage: &Arc<Garage>, + id: String, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<UpdateBucketRequest>(req).await?; + let bucket_id = parse_bucket_id(&id)?; + + let mut bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let state = bucket.state.as_option_mut().unwrap(); + + if let Some(wa) = req.website_access { + if wa.enabled { + state.website_config.update(Some(WebsiteConfig { + index_document: wa.index_document.ok_or_bad_request( + "Please specify indexDocument when enabling website access.", + )?, + error_document: wa.error_document, + })); + } else { + if wa.index_document.is_some() || wa.error_document.is_some() { + return Err(Error::bad_request( + "Cannot specify indexDocument or errorDocument when disabling website access.", + )); + } + state.website_config.update(None); + } + } + + if let Some(q) = req.quotas { + state.quotas.update(BucketQuotas { + max_size: q.max_size, + max_objects: q.max_objects, + }); + } + + garage.bucket_table.insert(&bucket).await?; + + bucket_info_results(garage, bucket_id).await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct UpdateBucketRequest { + website_access: Option<UpdateBucketWebsiteAccess>, + quotas: Option<ApiBucketQuotas>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct UpdateBucketWebsiteAccess { + enabled: bool, + index_document: Option<String>, + error_document: Option<String>, +} + +// ---- BUCKET/KEY PERMISSIONS ---- + +pub async fn handle_bucket_change_key_perm( + garage: &Arc<Garage>, + req: Request<Body>, + new_perm_flag: bool, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?; + + let bucket_id = parse_bucket_id(&req.bucket_id)?; + + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let state = bucket.state.as_option().unwrap(); + + let key = garage + .key_helper() + .get_existing_key(&req.access_key_id) + .await?; + + let mut perm = state + .authorized_keys + .get(&key.key_id) + .cloned() + .unwrap_or(BucketKeyPerm::NO_PERMISSIONS); + + if req.permissions.read { + perm.allow_read = new_perm_flag; + } + if req.permissions.write { + perm.allow_write = new_perm_flag; + } + if req.permissions.owner { + perm.allow_owner = new_perm_flag; + } + + garage + .bucket_helper() + .set_bucket_key_permissions(bucket.id, &key.key_id, perm) + .await?; + + bucket_info_results(garage, bucket.id).await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct BucketKeyPermChangeRequest { + bucket_id: String, + access_key_id: String, + permissions: ApiBucketKeyPerm, +} + +// ---- BUCKET ALIASES ---- + +pub async fn handle_global_alias_bucket( + garage: &Arc<Garage>, + bucket_id: String, + alias: String, +) -> Result<Response<Body>, Error> { + let bucket_id = parse_bucket_id(&bucket_id)?; + + garage + .bucket_helper() + .set_global_bucket_alias(bucket_id, &alias) + .await?; + + bucket_info_results(garage, bucket_id).await +} + +pub async fn handle_global_unalias_bucket( + garage: &Arc<Garage>, + bucket_id: String, + alias: String, +) -> Result<Response<Body>, Error> { + let bucket_id = parse_bucket_id(&bucket_id)?; + + garage + .bucket_helper() + .unset_global_bucket_alias(bucket_id, &alias) + .await?; + + bucket_info_results(garage, bucket_id).await +} + +pub async fn handle_local_alias_bucket( + garage: &Arc<Garage>, + bucket_id: String, + access_key_id: String, + alias: String, +) -> Result<Response<Body>, Error> { + let bucket_id = parse_bucket_id(&bucket_id)?; + + garage + .bucket_helper() + .set_local_bucket_alias(bucket_id, &access_key_id, &alias) + .await?; + + bucket_info_results(garage, bucket_id).await +} + +pub async fn handle_local_unalias_bucket( + garage: &Arc<Garage>, + bucket_id: String, + access_key_id: String, + alias: String, +) -> Result<Response<Body>, Error> { + let bucket_id = parse_bucket_id(&bucket_id)?; + + garage + .bucket_helper() + .unset_local_bucket_alias(bucket_id, &access_key_id, &alias) + .await?; + + bucket_info_results(garage, bucket_id).await +} + +// ---- HELPER ---- + +fn parse_bucket_id(id: &str) -> Result<Uuid, Error> { + let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?; + Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?) +} diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs new file mode 100644 index 00000000..99c6e332 --- /dev/null +++ b/src/api/admin/cluster.rs @@ -0,0 +1,193 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::*; +use garage_util::data::*; + +use garage_rpc::layout::*; + +use garage_model::garage::Garage; + +use crate::admin::error::*; +use crate::helpers::{json_ok_response, parse_json_body}; + +pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let res = GetClusterStatusResponse { + node: hex::encode(garage.system.id), + garage_version: garage_util::version::garage_version(), + garage_features: garage_util::version::garage_features(), + db_engine: garage.db.engine(), + known_nodes: garage + .system + .get_known_nodes() + .into_iter() + .map(|i| { + ( + hex::encode(i.id), + KnownNodeResp { + addr: i.addr, + is_up: i.is_up, + last_seen_secs_ago: i.last_seen_secs_ago, + hostname: i.status.hostname, + }, + ) + }) + .collect(), + layout: get_cluster_layout(garage), + }; + + Ok(json_ok_response(&res)?) +} + +pub async fn handle_connect_cluster_nodes( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<Vec<String>>(req).await?; + + let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) + .await + .into_iter() + .map(|r| match r { + Ok(()) => ConnectClusterNodesResponse { + success: true, + error: None, + }, + Err(e) => ConnectClusterNodesResponse { + success: false, + error: Some(format!("{}", e)), + }, + }) + .collect::<Vec<_>>(); + + Ok(json_ok_response(&res)?) +} + +pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let res = get_cluster_layout(garage); + + Ok(json_ok_response(&res)?) +} + +fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse { + let layout = garage.system.get_cluster_layout(); + + GetClusterLayoutResponse { + version: layout.version, + roles: layout + .roles + .items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(k, _, v)| (hex::encode(k), v.0.clone())) + .collect(), + staged_role_changes: layout + .staging + .items() + .iter() + .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) + .map(|(k, _, v)| (hex::encode(k), v.0.clone())) + .collect(), + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GetClusterStatusResponse { + node: String, + garage_version: &'static str, + garage_features: Option<&'static [&'static str]>, + db_engine: String, + known_nodes: HashMap<String, KnownNodeResp>, + layout: GetClusterLayoutResponse, +} + +#[derive(Serialize)] +struct ConnectClusterNodesResponse { + success: bool, + error: Option<String>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GetClusterLayoutResponse { + version: u64, + roles: HashMap<String, Option<NodeRole>>, + staged_role_changes: HashMap<String, Option<NodeRole>>, +} + +#[derive(Serialize)] +struct KnownNodeResp { + addr: SocketAddr, + is_up: bool, + last_seen_secs_ago: Option<u64>, + hostname: String, +} + +pub async fn handle_update_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; + + let mut layout = garage.system.get_cluster_layout(); + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + for (node, role) in updates { + let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?; + let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; + + layout + .staging + .merge(&roles.update_mutator(node, NodeRoleV(role))); + } + + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_apply_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.apply_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_revert_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.revert_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>; + +#[derive(Deserialize)] +struct ApplyRevertLayoutRequest { + version: u64, +} diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs new file mode 100644 index 00000000..ed1a07bd --- /dev/null +++ b/src/api/admin/error.rs @@ -0,0 +1,97 @@ +use err_derive::Error; +use hyper::header::HeaderValue; +use hyper::{Body, HeaderMap, StatusCode}; + +pub use garage_model::helper::error::Error as HelperError; + +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; +use crate::generic_server::ApiError; +use crate::helpers::CustomApiErrorBody; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "{}", _0)] + /// Error from common error + Common(CommonError), + + // Category: cannot process + /// The API access key does not exist + #[error(display = "Access key not found: {}", _0)] + NoSuchAccessKey(String), + + /// In Import key, the key already exists + #[error( + display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", + _0 + )] + KeyAlreadyExists(String), +} + +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::Common(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} + +impl From<HelperError> for Error { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), + HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), + HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), + HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), + HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n), + } + } +} + +impl Error { + fn code(&self) -> &'static str { + match self { + Error::Common(c) => c.aws_code(), + Error::NoSuchAccessKey(_) => "NoSuchAccessKey", + Error::KeyAlreadyExists(_) => "KeyAlreadyExists", + } + } +} + +impl ApiError for Error { + /// Get the HTTP status code that best represents the meaning of the error for the client + fn http_status_code(&self) -> StatusCode { + match self { + Error::Common(c) => c.http_status_code(), + Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND, + Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, + } + } + + fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { + use hyper::header; + header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); + } + + fn http_body(&self, garage_region: &str, path: &str) -> Body { + let error = CustomApiErrorBody { + code: self.code().to_string(), + message: format!("{}", self), + path: path.to_string(), + region: garage_region.to_string(), + }; + Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + r#" +{ + "code": "InternalError", + "message": "JSON encoding of error failed" +} + "# + .into() + })) + } +} diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs new file mode 100644 index 00000000..2bbabb7b --- /dev/null +++ b/src/api/admin/key.rs @@ -0,0 +1,256 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_table::*; + +use garage_model::garage::Garage; +use garage_model::key_table::*; + +use crate::admin::error::*; +use crate::helpers::{json_ok_response, parse_json_body}; + +pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { + let res = garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + EnumerationOrder::Forward, + ) + .await? + .iter() + .map(|k| ListKeyResultItem { + id: k.key_id.to_string(), + name: k.params().unwrap().name.get().clone(), + }) + .collect::<Vec<_>>(); + + Ok(json_ok_response(&res)?) +} + +#[derive(Serialize)] +struct ListKeyResultItem { + id: String, + name: String, +} + +pub async fn handle_get_key_info( + garage: &Arc<Garage>, + id: Option<String>, + search: Option<String>, +) -> Result<Response<Body>, Error> { + let key = if let Some(id) = id { + garage.key_helper().get_existing_key(&id).await? + } else if let Some(search) = search { + garage + .key_helper() + .get_existing_matching_key(&search) + .await? + } else { + unreachable!(); + }; + + key_info_results(garage, key).await +} + +pub async fn handle_create_key( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<CreateKeyRequest>(req).await?; + + let key = Key::new(&req.name); + garage.key_table.insert(&key).await?; + + key_info_results(garage, key).await +} + +#[derive(Deserialize)] +struct CreateKeyRequest { + name: String, +} + +pub async fn handle_import_key( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<ImportKeyRequest>(req).await?; + + let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; + if prev_key.is_some() { + return Err(Error::KeyAlreadyExists(req.access_key_id.to_string())); + } + + let imported_key = Key::import(&req.access_key_id, &req.secret_access_key, &req.name); + garage.key_table.insert(&imported_key).await?; + + key_info_results(garage, imported_key).await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ImportKeyRequest { + access_key_id: String, + secret_access_key: String, + name: String, +} + +pub async fn handle_update_key( + garage: &Arc<Garage>, + id: String, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let req = parse_json_body::<UpdateKeyRequest>(req).await?; + + let mut key = garage.key_helper().get_existing_key(&id).await?; + + let key_state = key.state.as_option_mut().unwrap(); + + if let Some(new_name) = req.name { + key_state.name.update(new_name); + } + if let Some(allow) = req.allow { + if allow.create_bucket { + key_state.allow_create_bucket.update(true); + } + } + if let Some(deny) = req.deny { + if deny.create_bucket { + key_state.allow_create_bucket.update(false); + } + } + + garage.key_table.insert(&key).await?; + + key_info_results(garage, key).await +} + +#[derive(Deserialize)] +struct UpdateKeyRequest { + name: Option<String>, + allow: Option<KeyPerm>, + deny: Option<KeyPerm>, +} + +pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> { + let mut key = garage.key_helper().get_existing_key(&id).await?; + + key.state.as_option().unwrap(); + + garage.key_helper().delete_key(&mut key).await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Body>, Error> { + let mut relevant_buckets = HashMap::new(); + + let key_state = key.state.as_option().unwrap(); + + for id in key_state + .authorized_buckets + .items() + .iter() + .map(|(id, _)| id) + .chain( + key_state + .local_aliases + .items() + .iter() + .filter_map(|(_, _, v)| v.as_ref()), + ) { + if !relevant_buckets.contains_key(id) { + if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? { + if b.state.as_option().is_some() { + relevant_buckets.insert(*id, b); + } + } + } + } + + let res = GetKeyInfoResult { + name: key_state.name.get().clone(), + access_key_id: key.key_id.clone(), + secret_access_key: key_state.secret_key.clone(), + permissions: KeyPerm { + create_bucket: *key_state.allow_create_bucket.get(), + }, + buckets: relevant_buckets + .into_iter() + .map(|(_, bucket)| { + let state = bucket.state.as_option().unwrap(); + KeyInfoBucketResult { + id: hex::encode(bucket.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + local_aliases: state + .local_aliases + .items() + .iter() + .filter(|((k, _), _, a)| *a && *k == key.key_id) + .map(|((_, n), _, _)| n.to_string()) + .collect::<Vec<_>>(), + permissions: key_state + .authorized_buckets + .get(&bucket.id) + .map(|p| ApiBucketKeyPerm { + read: p.allow_read, + write: p.allow_write, + owner: p.allow_owner, + }) + .unwrap_or_default(), + } + }) + .collect::<Vec<_>>(), + }; + + Ok(json_ok_response(&res)?) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GetKeyInfoResult { + name: String, + access_key_id: String, + secret_access_key: String, + permissions: KeyPerm, + buckets: Vec<KeyInfoBucketResult>, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct KeyPerm { + #[serde(default)] + create_bucket: bool, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct KeyInfoBucketResult { + id: String, + global_aliases: Vec<String>, + local_aliases: Vec<String>, + permissions: ApiBucketKeyPerm, +} + +#[derive(Serialize, Deserialize, Default)] +pub(crate) struct ApiBucketKeyPerm { + #[serde(default)] + pub(crate) read: bool, + #[serde(default)] + pub(crate) write: bool, + #[serde(default)] + pub(crate) owner: bool, +} diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs new file mode 100644 index 00000000..c4857c10 --- /dev/null +++ b/src/api/admin/mod.rs @@ -0,0 +1,7 @@ +pub mod api_server; +mod error; +mod router; + +mod bucket; +mod cluster; +mod key; diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs new file mode 100644 index 00000000..3eee8b67 --- /dev/null +++ b/src/api/admin/router.rs @@ -0,0 +1,145 @@ +use std::borrow::Cow; + +use hyper::{Method, Request}; + +use crate::admin::error::*; +use crate::router_macros::*; + +pub enum Authorization { + MetricsToken, + AdminToken, +} + +router_match! {@func + +/// List of all Admin API endpoints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Endpoint { + Options, + Metrics, + GetClusterStatus, + ConnectClusterNodes, + // Layout + GetClusterLayout, + UpdateClusterLayout, + ApplyClusterLayout, + RevertClusterLayout, + // Keys + ListKeys, + CreateKey, + ImportKey, + GetKeyInfo { + id: Option<String>, + search: Option<String>, + }, + DeleteKey { + id: String, + }, + UpdateKey { + id: String, + }, + // Buckets + ListBuckets, + CreateBucket, + GetBucketInfo { + id: Option<String>, + global_alias: Option<String>, + }, + DeleteBucket { + id: String, + }, + UpdateBucket { + id: String, + }, + // Bucket-Key Permissions + BucketAllowKey, + BucketDenyKey, + // Bucket aliases + GlobalAliasBucket { + id: String, + alias: String, + }, + GlobalUnaliasBucket { + id: String, + alias: String, + }, + LocalAliasBucket { + id: String, + access_key_id: String, + alias: String, + }, + LocalUnaliasBucket { + id: String, + access_key_id: String, + alias: String, + }, +}} + +impl Endpoint { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> { + let uri = req.uri(); + let path = uri.path(); + let query = uri.query(); + + let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + + let res = router_match!(@gen_path_parser (req.method(), path, query) [ + OPTIONS _ => Options, + GET "/metrics" => Metrics, + GET "/v0/status" => GetClusterStatus, + POST "/v0/connect" => ConnectClusterNodes, + // Layout endpoints + GET "/v0/layout" => GetClusterLayout, + POST "/v0/layout" => UpdateClusterLayout, + POST "/v0/layout/apply" => ApplyClusterLayout, + POST "/v0/layout/revert" => RevertClusterLayout, + // API key endpoints + GET "/v0/key" if id => GetKeyInfo (query_opt::id, query_opt::search), + GET "/v0/key" if search => GetKeyInfo (query_opt::id, query_opt::search), + POST "/v0/key" if id => UpdateKey (query::id), + POST "/v0/key" => CreateKey, + POST "/v0/key/import" => ImportKey, + DELETE "/v0/key" if id => DeleteKey (query::id), + GET "/v0/key" => ListKeys, + // Bucket endpoints + GET "/v0/bucket" if id => GetBucketInfo (query_opt::id, query_opt::global_alias), + GET "/v0/bucket" if global_alias => GetBucketInfo (query_opt::id, query_opt::global_alias), + GET "/v0/bucket" => ListBuckets, + POST "/v0/bucket" => CreateBucket, + DELETE "/v0/bucket" if id => DeleteBucket (query::id), + PUT "/v0/bucket" if id => UpdateBucket (query::id), + // Bucket-key permissions + POST "/v0/bucket/allow" => BucketAllowKey, + POST "/v0/bucket/deny" => BucketDenyKey, + // Bucket aliases + PUT "/v0/bucket/alias/global" => GlobalAliasBucket (query::id, query::alias), + DELETE "/v0/bucket/alias/global" => GlobalUnaliasBucket (query::id, query::alias), + PUT "/v0/bucket/alias/local" => LocalAliasBucket (query::id, query::access_key_id, query::alias), + DELETE "/v0/bucket/alias/local" => LocalUnaliasBucket (query::id, query::access_key_id, query::alias), + ]); + + if let Some(message) = query.nonempty_message() { + debug!("Unused query parameter: {}", message) + } + + Ok(res) + } + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + match self { + Self::Metrics => Authorization::MetricsToken, + _ => Authorization::AdminToken, + } + } +} + +generateQueryParameters! { + "id" => id, + "search" => search, + "globalAlias" => global_alias, + "alias" => alias, + "accessKeyId" => access_key_id +} diff --git a/src/api/api_server.rs b/src/api/api_server.rs deleted file mode 100644 index e7b86d9e..00000000 --- a/src/api/api_server.rs +++ /dev/null @@ -1,645 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use chrono::{DateTime, NaiveDateTime, Utc}; -use futures::future::Future; -use futures::prelude::*; -use hyper::header; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Request, Response, Server}; - -use opentelemetry::{ - global, - metrics::{Counter, ValueRecorder}, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, KeyValue, -}; - -use garage_util::data::*; -use garage_util::error::Error as GarageError; -use garage_util::metrics::{gen_trace_id, RecordDuration}; - -use garage_model::garage::Garage; -use garage_model::key_table::Key; - -use garage_table::util::*; - -use crate::error::*; -use crate::signature::compute_scope; -use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::SignedPayloadStream; -use crate::signature::LONG_DATETIME; - -use crate::helpers::*; -use crate::s3_bucket::*; -use crate::s3_copy::*; -use crate::s3_cors::*; -use crate::s3_delete::*; -use crate::s3_get::*; -use crate::s3_list::*; -use crate::s3_post_object::handle_post_object; -use crate::s3_put::*; -use crate::s3_router::{Authorization, Endpoint}; -use crate::s3_website::*; - -struct ApiMetrics { - request_counter: Counter<u64>, - error_counter: Counter<u64>, - request_duration: ValueRecorder<f64>, -} - -impl ApiMetrics { - fn new() -> Self { - let meter = global::meter("garage/api"); - Self { - request_counter: meter - .u64_counter("api.request_counter") - .with_description("Number of API calls to the various S3 API endpoints") - .init(), - error_counter: meter - .u64_counter("api.error_counter") - .with_description( - "Number of API calls to the various S3 API endpoints that resulted in errors", - ) - .init(), - request_duration: meter - .f64_value_recorder("api.request_duration") - .with_description("Duration of API calls to the various S3 API endpoints") - .init(), - } - } -} - -/// Run the S3 API server -pub async fn run_api_server( - garage: Arc<Garage>, - shutdown_signal: impl Future<Output = ()>, -) -> Result<(), GarageError> { - let addr = &garage.config.s3_api.api_bind_addr; - - let metrics = Arc::new(ApiMetrics::new()); - - let service = make_service_fn(|conn: &AddrStream| { - let garage = garage.clone(); - let metrics = metrics.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - let metrics = metrics.clone(); - - handler(garage, metrics, req, client_addr) - })) - } - }); - - let server = Server::bind(addr).serve(service); - - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("API server listening on http://{}", addr); - - graceful.await?; - Ok(()) -} - -async fn handler( - garage: Arc<Garage>, - metrics: Arc<ApiMetrics>, - req: Request<Body>, - addr: SocketAddr, -) -> Result<Response<Body>, GarageError> { - let uri = req.uri().clone(); - info!("{} {} {}", addr, req.method(), uri); - debug!("{:?}", req); - - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder("S3 API call (unknown)") - .with_trace_id(gen_trace_id()) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().to_string()), - ]) - .start(&tracer); - - let res = handler_stage2(garage.clone(), metrics, req) - .with_context(Context::current_with_span(span)) - .await; - - match res { - Ok(x) => { - debug!("{} {:?}", x.status(), x.headers()); - Ok(x) - } - Err(e) => { - let body: Body = Body::from(e.aws_xml(&garage.config.s3_api.s3_region, uri.path())); - let mut http_error_builder = Response::builder() - .status(e.http_status_code()) - .header("Content-Type", "application/xml"); - - if let Some(header_map) = http_error_builder.headers_mut() { - e.add_headers(header_map) - } - - let http_error = http_error_builder.body(body)?; - - if e.http_status_code().is_server_error() { - warn!("Response: error {}, {}", e.http_status_code(), e); - } else { - info!("Response: error {}, {}", e.http_status_code(), e); - } - Ok(http_error) - } - } -} - -async fn handler_stage2( - garage: Arc<Garage>, - metrics: Arc<ApiMetrics>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let authority = req - .headers() - .get(header::HOST) - .ok_or_bad_request("Host header required")? - .to_str()?; - - let host = authority_to_host(authority)?; - - let bucket_name = garage - .config - .s3_api - .root_domain - .as_ref() - .and_then(|root_domain| host_to_bucket(&host, root_domain)); - - let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?; - debug!("Endpoint: {:?}", endpoint); - - let current_context = Context::current(); - let current_span = current_context.span(); - current_span.update_name::<String>(format!("S3 API {}", endpoint.name())); - current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); - current_span.set_attribute(KeyValue::new( - "bucket", - bucket_name.clone().unwrap_or_default(), - )); - - let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; - - let res = handler_stage3(garage, req, endpoint, bucket_name) - .record_duration(&metrics.request_duration, &metrics_tags[..]) - .await; - - metrics.request_counter.add(1, &metrics_tags[..]); - - let status_code = match &res { - Ok(r) => r.status(), - Err(e) => e.http_status_code(), - }; - if status_code.is_client_error() || status_code.is_server_error() { - metrics.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", status_code.as_str().to_string()), - ], - ); - } - - res -} - -async fn handler_stage3( - garage: Arc<Garage>, - req: Request<Body>, - endpoint: Endpoint, - bucket_name: Option<String>, -) -> Result<Response<Body>, Error> { - // Some endpoints are processed early, before we even check for an API key - if let Endpoint::PostObject = endpoint { - return handle_post_object(garage, req, bucket_name.unwrap()).await; - } - if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, bucket_name).await; - } - - let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?; - let api_key = api_key.ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })?; - - let req = match req.headers().get("x-amz-content-sha256") { - Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { - let signature = content_sha256 - .take() - .ok_or_bad_request("No signature provided")?; - - let secret_key = &api_key - .state - .as_option() - .ok_or_internal_error("Deleted key state")? - .secret_key; - - let date = req - .headers() - .get("x-amz-date") - .ok_or_bad_request("Missing X-Amz-Date field")? - .to_str()?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")?; - let date: DateTime<Utc> = DateTime::from_utc(date, Utc); - - let scope = compute_scope(&date, &garage.config.s3_api.s3_region); - let signing_hmac = crate::signature::signing_hmac( - &date, - secret_key, - &garage.config.s3_api.s3_region, - "s3", - ) - .ok_or_internal_error("Unable to build signing HMAC")?; - - req.map(move |body| { - Body::wrap_stream( - SignedPayloadStream::new( - body.map_err(Error::from), - signing_hmac, - date, - &scope, - signature, - ) - .map_err(Error::from), - ) - }) - } - _ => req, - }; - - let bucket_name = match bucket_name { - None => return handle_request_without_bucket(garage, req, api_key, endpoint).await, - Some(bucket) => bucket.to_string(), - }; - - // Special code path for CreateBucket API endpoint - if let Endpoint::CreateBucket {} = endpoint { - return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await; - } - - let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; - let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; - - let allowed = match endpoint.authorization_type() { - Authorization::Read => api_key.allow_read(&bucket_id), - Authorization::Write => api_key.allow_write(&bucket_id), - Authorization::Owner => api_key.allow_owner(&bucket_id), - _ => unreachable!(), - }; - - if !allowed { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); - } - - // Look up what CORS rule might apply to response. - // Requests for methods different than GET, HEAD or POST - // are always preflighted, i.e. the browser should make - // an OPTIONS call before to check it is allowed - let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, - _ => None, - }; - - let resp = match endpoint { - Endpoint::HeadObject { - key, part_number, .. - } => handle_head(garage, &req, bucket_id, &key, part_number).await, - Endpoint::GetObject { - key, part_number, .. - } => handle_get(garage, &req, bucket_id, &key, part_number).await, - Endpoint::UploadPart { - key, - part_number, - upload_id, - } => { - handle_put_part( - garage, - req, - bucket_id, - &key, - part_number, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CopyObject { key } => handle_copy(garage, &api_key, &req, bucket_id, &key).await, - Endpoint::UploadPartCopy { - key, - part_number, - upload_id, - } => { - handle_upload_part_copy( - garage, - &api_key, - &req, - bucket_id, - &key, - part_number, - &upload_id, - ) - .await - } - Endpoint::PutObject { key } => { - handle_put(garage, req, bucket_id, &key, content_sha256).await - } - Endpoint::AbortMultipartUpload { key, upload_id } => { - handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await - } - Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, - Endpoint::CreateMultipartUpload { key } => { - handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await - } - Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload( - garage, - req, - &bucket_name, - bucket_id, - &key, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CreateBucket {} => unreachable!(), - Endpoint::HeadBucket {} => { - let empty_body: Body = Body::from(vec![]); - let response = Response::builder().body(empty_body).unwrap(); - Ok(response) - } - Endpoint::DeleteBucket {} => { - handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await - } - Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), - Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), - Endpoint::ListObjects { - delimiter, - encoding_type, - marker, - max_keys, - prefix, - } => { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - is_v2: false, - marker, - continuation_token: None, - start_after: None, - }, - ) - .await - } - Endpoint::ListObjectsV2 { - delimiter, - encoding_type, - max_keys, - prefix, - continuation_token, - start_after, - list_type, - .. - } => { - if list_type == "2" { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - prefix: prefix.unwrap_or_default(), - }, - is_v2: true, - marker: None, - continuation_token, - start_after, - }, - ) - .await - } else { - Err(Error::BadRequest(format!( - "Invalid endpoint: list-type={}", - list_type - ))) - } - } - Endpoint::ListMultipartUploads { - delimiter, - encoding_type, - key_marker, - max_uploads, - prefix, - upload_id_marker, - } => { - handle_list_multipart_upload( - garage, - &ListMultipartUploadsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter: delimiter.map(|d| d.to_string()), - page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - key_marker, - upload_id_marker, - }, - ) - .await - } - Endpoint::ListParts { - key, - max_parts, - part_number_marker, - upload_id, - } => { - handle_list_parts( - garage, - &ListPartsQuery { - bucket_name, - bucket_id, - key, - upload_id, - part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), - max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - }, - ) - .await - } - Endpoint::DeleteObjects {} => { - handle_delete_objects(garage, bucket_id, req, content_sha256).await - } - Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, - Endpoint::PutBucketWebsite {} => { - handle_put_website(garage, bucket_id, req, content_sha256).await - } - Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await, - Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, - Endpoint::PutBucketCors {} => handle_put_cors(garage, bucket_id, req, content_sha256).await, - Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await, - endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), - }; - - // If request was a success and we have a CORS rule that applies to it, - // add the corresponding CORS headers to the response - let mut resp_ok = resp?; - if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) - .ok_or_internal_error("Invalid bucket CORS configuration")?; - } - - Ok(resp_ok) -} - -async fn handle_request_without_bucket( - garage: Arc<Garage>, - _req: Request<Body>, - api_key: Key, - endpoint: Endpoint, -) -> Result<Response<Body>, Error> { - match endpoint { - Endpoint::ListBuckets => handle_list_buckets(&garage, &api_key).await, - endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), - } -} - -#[allow(clippy::ptr_arg)] -pub async fn resolve_bucket( - garage: &Garage, - bucket_name: &String, - api_key: &Key, -) -> Result<Uuid, Error> { - let api_key_params = api_key - .state - .as_option() - .ok_or_internal_error("Key should not be deleted at this point")?; - - if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { - Ok(*bucket_id) - } else { - Ok(garage - .bucket_helper() - .resolve_global_bucket_name(bucket_name) - .await? - .ok_or(Error::NoSuchBucket)?) - } -} - -/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in -/// the host header of the request -/// -/// S3 internally manages only buckets and keys. This function splits -/// an HTTP path to get the corresponding bucket name and key. -pub fn parse_bucket_key<'a>( - path: &'a str, - host_bucket: Option<&'a str>, -) -> Result<(&'a str, Option<&'a str>), Error> { - let path = path.trim_start_matches('/'); - - if let Some(bucket) = host_bucket { - if !path.is_empty() { - return Ok((bucket, Some(path))); - } else { - return Ok((bucket, None)); - } - } - - let (bucket, key) = match path.find('/') { - Some(i) => { - let key = &path[i + 1..]; - if !key.is_empty() { - (&path[..i], Some(key)) - } else { - (&path[..i], None) - } - } - None => (path, None), - }; - if bucket.is_empty() { - return Err(Error::BadRequest("No bucket specified".to_string())); - } - Ok((bucket, key)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parse_bucket_containing_a_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?; - assert_eq!(bucket, "my_bucket"); - assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); - Ok(()) - } - - #[test] - fn parse_bucket_containing_no_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("/my_bucket/", None)?; - assert_eq!(bucket, "my_bucket"); - assert!(key.is_none()); - let (bucket, key) = parse_bucket_key("/my_bucket", None)?; - assert_eq!(bucket, "my_bucket"); - assert!(key.is_none()); - Ok(()) - } - - #[test] - fn parse_bucket_containing_no_bucket() { - let parsed = parse_bucket_key("", None); - assert!(parsed.is_err()); - let parsed = parse_bucket_key("/", None); - assert!(parsed.is_err()); - let parsed = parse_bucket_key("////", None); - assert!(parsed.is_err()); - } - - #[test] - fn parse_bucket_with_vhost_and_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?; - assert_eq!(bucket, "my-bucket"); - assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); - Ok(()) - } - - #[test] - fn parse_bucket_with_vhost_no_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?; - assert_eq!(bucket, "my-bucket"); - assert!(key.is_none()); - let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?; - assert_eq!(bucket, "my-bucket"); - assert!(key.is_none()); - Ok(()) - } -} diff --git a/src/api/common_error.rs b/src/api/common_error.rs new file mode 100644 index 00000000..20f9f266 --- /dev/null +++ b/src/api/common_error.rs @@ -0,0 +1,177 @@ +use err_derive::Error; +use hyper::StatusCode; + +use garage_util::error::Error as GarageError; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum CommonError { + // ---- INTERNAL ERRORS ---- + /// Error related to deeper parts of Garage + #[error(display = "Internal error: {}", _0)] + InternalError(#[error(source)] GarageError), + + /// Error related to Hyper + #[error(display = "Internal error (Hyper error): {}", _0)] + Hyper(#[error(source)] hyper::Error), + + /// Error related to HTTP + #[error(display = "Internal error (HTTP error): {}", _0)] + Http(#[error(source)] http::Error), + + // ---- GENERIC CLIENT ERRORS ---- + /// Proper authentication was not provided + #[error(display = "Forbidden: {}", _0)] + Forbidden(String), + + /// Generic bad request response with custom message + #[error(display = "Bad request: {}", _0)] + BadRequest(String), + + // ---- SPECIFIC ERROR CONDITIONS ---- + // These have to be error codes referenced in the S3 spec here: + // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList + /// The bucket requested don't exists + #[error(display = "Bucket not found: {}", _0)] + NoSuchBucket(String), + + /// Tried to create a bucket that already exist + #[error(display = "Bucket already exists")] + BucketAlreadyExists, + + /// Tried to delete a non-empty bucket + #[error(display = "Tried to delete a non-empty bucket")] + BucketNotEmpty, + + // Category: bad request + /// Bucket name is not valid according to AWS S3 specs + #[error(display = "Invalid bucket name: {}", _0)] + InvalidBucketName(String), +} + +impl CommonError { + pub fn http_status_code(&self) -> StatusCode { + match self { + CommonError::InternalError( + GarageError::Timeout + | GarageError::RemoteError(_) + | GarageError::Quorum(_, _, _, _), + ) => StatusCode::SERVICE_UNAVAILABLE, + CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } + CommonError::BadRequest(_) => StatusCode::BAD_REQUEST, + CommonError::Forbidden(_) => StatusCode::FORBIDDEN, + CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND, + CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT, + CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST, + } + } + + pub fn aws_code(&self) -> &'static str { + match self { + CommonError::Forbidden(_) => "AccessDenied", + CommonError::InternalError( + GarageError::Timeout + | GarageError::RemoteError(_) + | GarageError::Quorum(_, _, _, _), + ) => "ServiceUnavailable", + CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { + "InternalError" + } + CommonError::BadRequest(_) => "InvalidRequest", + CommonError::NoSuchBucket(_) => "NoSuchBucket", + CommonError::BucketAlreadyExists => "BucketAlreadyExists", + CommonError::BucketNotEmpty => "BucketNotEmpty", + CommonError::InvalidBucketName(_) => "InvalidBucketName", + } + } + + pub fn bad_request<M: ToString>(msg: M) -> Self { + CommonError::BadRequest(msg.to_string()) + } +} + +pub trait CommonErrorDerivative: From<CommonError> { + fn internal_error<M: ToString>(msg: M) -> Self { + Self::from(CommonError::InternalError(GarageError::Message( + msg.to_string(), + ))) + } + + fn bad_request<M: ToString>(msg: M) -> Self { + Self::from(CommonError::BadRequest(msg.to_string())) + } + + fn forbidden<M: ToString>(msg: M) -> Self { + Self::from(CommonError::Forbidden(msg.to_string())) + } +} + +/// Trait to map error to the Bad Request error code +pub trait OkOrBadRequest { + type S; + fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>; +} + +impl<T, E> OkOrBadRequest for Result<T, E> +where + E: std::fmt::Display, +{ + type S = T; + fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Ok(x) => Ok(x), + Err(e) => Err(CommonError::BadRequest(format!( + "{}: {}", + reason.as_ref(), + e + ))), + } + } +} + +impl<T> OkOrBadRequest for Option<T> { + type S = T; + fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Some(x) => Ok(x), + None => Err(CommonError::BadRequest(reason.as_ref().to_string())), + } + } +} + +/// Trait to map an error to an Internal Error code +pub trait OkOrInternalError { + type S; + fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>; +} + +impl<T, E> OkOrInternalError for Result<T, E> +where + E: std::fmt::Display, +{ + type S = T; + fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Ok(x) => Ok(x), + Err(e) => Err(CommonError::InternalError(GarageError::Message(format!( + "{}: {}", + reason.as_ref(), + e + )))), + } + } +} + +impl<T> OkOrInternalError for Option<T> { + type S = T; + fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> { + match self { + Some(x) => Ok(x), + None => Err(CommonError::InternalError(GarageError::Message( + reason.as_ref().to_string(), + ))), + } + } +} diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs new file mode 100644 index 00000000..62fe4e5a --- /dev/null +++ b/src/api/generic_server.rs @@ -0,0 +1,211 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; + +use hyper::header::HeaderValue; +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server}; +use hyper::{HeaderMap, StatusCode}; + +use opentelemetry::{ + global, + metrics::{Counter, ValueRecorder}, + trace::{FutureExt, SpanRef, TraceContextExt, Tracer}, + Context, KeyValue, +}; + +use garage_util::error::Error as GarageError; +use garage_util::metrics::{gen_trace_id, RecordDuration}; + +pub(crate) trait ApiEndpoint: Send + Sync + 'static { + fn name(&self) -> &'static str; + fn add_span_attributes(&self, span: SpanRef<'_>); +} + +pub trait ApiError: std::error::Error + Send + Sync + 'static { + fn http_status_code(&self) -> StatusCode; + fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>); + fn http_body(&self, garage_region: &str, path: &str) -> Body; +} + +#[async_trait] +pub(crate) trait ApiHandler: Send + Sync + 'static { + const API_NAME: &'static str; + const API_NAME_DISPLAY: &'static str; + + type Endpoint: ApiEndpoint; + type Error: ApiError; + + fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>; + async fn handle( + &self, + req: Request<Body>, + endpoint: Self::Endpoint, + ) -> Result<Response<Body>, Self::Error>; +} + +pub(crate) struct ApiServer<A: ApiHandler> { + region: String, + api_handler: A, + + // Metrics + request_counter: Counter<u64>, + error_counter: Counter<u64>, + request_duration: ValueRecorder<f64>, +} + +impl<A: ApiHandler> ApiServer<A> { + pub fn new(region: String, api_handler: A) -> Arc<Self> { + let meter = global::meter("garage/api"); + Arc::new(Self { + region, + api_handler, + request_counter: meter + .u64_counter(format!("api.{}.request_counter", A::API_NAME)) + .with_description(format!( + "Number of API calls to the various {} API endpoints", + A::API_NAME_DISPLAY + )) + .init(), + error_counter: meter + .u64_counter(format!("api.{}.error_counter", A::API_NAME)) + .with_description(format!( + "Number of API calls to the various {} API endpoints that resulted in errors", + A::API_NAME_DISPLAY + )) + .init(), + request_duration: meter + .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME)) + .with_description(format!( + "Duration of API calls to the various {} API endpoints", + A::API_NAME_DISPLAY + )) + .init(), + }) + } + + pub async fn run_server( + self: Arc<Self>, + bind_addr: SocketAddr, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let service = make_service_fn(|conn: &AddrStream| { + let this = self.clone(); + + let client_addr = conn.remote_addr(); + async move { + Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { + let this = this.clone(); + + this.handler(req, client_addr) + })) + } + }); + + let server = Server::bind(&bind_addr).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!( + "{} API server listening on http://{}", + A::API_NAME_DISPLAY, + bind_addr + ); + + graceful.await?; + Ok(()) + } + + async fn handler( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, GarageError> { + let uri = req.uri().clone(); + info!("{} {} {}", addr, req.method(), uri); + debug!("{:?}", req); + + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY)) + .with_trace_id(gen_trace_id()) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().to_string()), + ]) + .start(&tracer); + + let res = self + .handler_stage2(req) + .with_context(Context::current_with_span(span)) + .await; + + match res { + Ok(x) => { + debug!("{} {:?}", x.status(), x.headers()); + Ok(x) + } + Err(e) => { + let body: Body = e.http_body(&self.region, uri.path()); + let mut http_error_builder = Response::builder().status(e.http_status_code()); + + if let Some(header_map) = http_error_builder.headers_mut() { + e.add_http_headers(header_map) + } + + let http_error = http_error_builder.body(body)?; + + if e.http_status_code().is_server_error() { + warn!("Response: error {}, {}", e.http_status_code(), e); + } else { + info!("Response: error {}, {}", e.http_status_code(), e); + } + Ok(http_error) + } + } + } + + async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> { + let endpoint = self.api_handler.parse_endpoint(&req)?; + debug!("Endpoint: {}", endpoint.name()); + + let current_context = Context::current(); + let current_span = current_context.span(); + current_span.update_name::<String>(format!( + "{} API {}", + A::API_NAME_DISPLAY, + endpoint.name() + )); + current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); + endpoint.add_span_attributes(current_span); + + let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; + + let res = self + .api_handler + .handle(req, endpoint) + .record_duration(&self.request_duration, &metrics_tags[..]) + .await; + + self.request_counter.add(1, &metrics_tags[..]); + + let status_code = match &res { + Ok(r) => r.status(), + Err(e) => e.http_status_code(), + }; + if status_code.is_client_error() || status_code.is_server_error() { + self.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", status_code.as_str().to_string()), + ], + ); + } + + res + } +} diff --git a/src/api/helpers.rs b/src/api/helpers.rs index c2709bb3..642dbc42 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,5 +1,21 @@ -use crate::Error; +use hyper::{Body, Request, Response}; use idna::domain_to_unicode; +use serde::{Deserialize, Serialize}; + +use crate::common_error::{CommonError as Error, *}; + +/// What kind of authorization is required to perform a given action +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Authorization { + /// No authorization is required + None, + /// Having Read permission on bucket + Read, + /// Having Write permission on bucket + Write, + /// Having Owner permission on bucket + Owner, +} /// Host to bucket /// @@ -31,7 +47,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { let mut iter = authority.chars().enumerate(); let (_, first_char) = iter .next() - .ok_or_else(|| Error::BadRequest("Authority is empty".to_string()))?; + .ok_or_else(|| Error::bad_request("Authority is empty".to_string()))?; let split = match first_char { '[' => { @@ -39,7 +55,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { match iter.next() { Some((_, ']')) => iter.next(), _ => { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Authority {} has an illegal format", authority ))) @@ -52,7 +68,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { let authority = match split { Some((i, ':')) => Ok(&authority[..i]), None => Ok(authority), - Some((_, _)) => Err(Error::BadRequest(format!( + Some((_, _)) => Err(Error::bad_request(format!( "Authority {} has an illegal format", authority ))), @@ -60,11 +76,135 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { authority.map(|h| domain_to_unicode(h).0) } +/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in +/// the host header of the request +/// +/// S3 internally manages only buckets and keys. This function splits +/// an HTTP path to get the corresponding bucket name and key. +pub fn parse_bucket_key<'a>( + path: &'a str, + host_bucket: Option<&'a str>, +) -> Result<(&'a str, Option<&'a str>), Error> { + let path = path.trim_start_matches('/'); + + if let Some(bucket) = host_bucket { + if !path.is_empty() { + return Ok((bucket, Some(path))); + } else { + return Ok((bucket, None)); + } + } + + let (bucket, key) = match path.find('/') { + Some(i) => { + let key = &path[i + 1..]; + if !key.is_empty() { + (&path[..i], Some(key)) + } else { + (&path[..i], None) + } + } + None => (path, None), + }; + if bucket.is_empty() { + return Err(Error::bad_request("No bucket specified")); + } + Ok((bucket, key)) +} + +const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}'; + +/// Compute the key after the prefix +pub fn key_after_prefix(pfx: &str) -> Option<String> { + let mut next = pfx.to_string(); + while !next.is_empty() { + let tail = next.pop().unwrap(); + if tail >= char::MAX { + continue; + } + + // Circumvent a limitation of RangeFrom that overflow earlier than needed + // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html + let new_tail = if tail == UTF8_BEFORE_LAST_CHAR { + char::MAX + } else { + (tail..).nth(1).unwrap() + }; + + next.push(new_tail); + return Some(next); + } + + None +} + +pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) +} + +pub fn json_ok_response<T: Serialize>(res: &T) -> Result<Response<Body>, Error> { + let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; + Ok(Response::builder() + .status(hyper::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(resp_json))?) +} + #[cfg(test)] mod tests { use super::*; #[test] + fn parse_bucket_containing_a_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?; + assert_eq!(bucket, "my_bucket"); + assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); + Ok(()) + } + + #[test] + fn parse_bucket_containing_no_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("/my_bucket/", None)?; + assert_eq!(bucket, "my_bucket"); + assert!(key.is_none()); + let (bucket, key) = parse_bucket_key("/my_bucket", None)?; + assert_eq!(bucket, "my_bucket"); + assert!(key.is_none()); + Ok(()) + } + + #[test] + fn parse_bucket_containing_no_bucket() { + let parsed = parse_bucket_key("", None); + assert!(parsed.is_err()); + let parsed = parse_bucket_key("/", None); + assert!(parsed.is_err()); + let parsed = parse_bucket_key("////", None); + assert!(parsed.is_err()); + } + + #[test] + fn parse_bucket_with_vhost_and_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?; + assert_eq!(bucket, "my-bucket"); + assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); + Ok(()) + } + + #[test] + fn parse_bucket_with_vhost_no_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?; + assert_eq!(bucket, "my-bucket"); + assert!(key.is_none()); + let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?; + assert_eq!(bucket, "my-bucket"); + assert!(key.is_none()); + Ok(()) + } + + #[test] fn authority_to_host_with_port() -> Result<(), Error> { let domain = authority_to_host("[::1]:3902")?; assert_eq!(domain, "[::1]"); @@ -111,4 +251,47 @@ mod tests { assert_eq!(host_to_bucket("not-garage.tld", "garage.tld"), None); assert_eq!(host_to_bucket("not-garage.tld", ".garage.tld"), None); } + + #[test] + fn test_key_after_prefix() { + use std::iter::FromIterator; + + assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1); + assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0"); + assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭"); + assert_eq!( + key_after_prefix("").unwrap().as_str(), + String::from(char::from_u32(0x10FFFE).unwrap()) + ); + + // When the last character is the biggest UTF8 char + let a = String::from_iter(['a', char::MAX].iter()); + assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b"); + + // When all characters are the biggest UTF8 char + let b = String::from_iter([char::MAX; 3].iter()); + assert!(key_after_prefix(b.as_str()).is_none()); + + // Check utf8 surrogates + let c = String::from('\u{D7FF}'); + assert_eq!( + key_after_prefix(c.as_str()).unwrap().as_str(), + String::from('\u{E000}') + ); + + // Check the character before the biggest one + let d = String::from('\u{10FFFE}'); + assert_eq!( + key_after_prefix(d.as_str()).unwrap().as_str(), + String::from(char::MAX) + ); + } +} + +#[derive(Serialize)] +pub(crate) struct CustomApiErrorBody { + pub(crate) code: String, + pub(crate) message: String, + pub(crate) region: String, + pub(crate) path: String, } diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs new file mode 100644 index 00000000..084867b5 --- /dev/null +++ b/src/api/k2v/api_server.rs @@ -0,0 +1,190 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use hyper::{Body, Method, Request, Response}; + +use opentelemetry::{trace::SpanRef, KeyValue}; + +use garage_util::error::Error as GarageError; + +use garage_model::garage::Garage; + +use crate::generic_server::*; +use crate::k2v::error::*; + +use crate::signature::payload::check_payload_signature; +use crate::signature::streaming::*; + +use crate::helpers::*; +use crate::k2v::batch::*; +use crate::k2v::index::*; +use crate::k2v::item::*; +use crate::k2v::router::Endpoint; +use crate::s3::cors::*; + +pub struct K2VApiServer { + garage: Arc<Garage>, +} + +pub(crate) struct K2VApiEndpoint { + bucket_name: String, + endpoint: Endpoint, +} + +impl K2VApiServer { + pub async fn run( + garage: Arc<Garage>, + bind_addr: SocketAddr, + s3_region: String, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + ApiServer::new(s3_region, K2VApiServer { garage }) + .run_server(bind_addr, shutdown_signal) + .await + } +} + +#[async_trait] +impl ApiHandler for K2VApiServer { + const API_NAME: &'static str = "k2v"; + const API_NAME_DISPLAY: &'static str = "K2V"; + + type Endpoint = K2VApiEndpoint; + type Error = Error; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> { + let (endpoint, bucket_name) = Endpoint::from_request(req)?; + + Ok(K2VApiEndpoint { + bucket_name, + endpoint, + }) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: K2VApiEndpoint, + ) -> Result<Response<Body>, Error> { + let K2VApiEndpoint { + bucket_name, + endpoint, + } = endpoint; + let garage = self.garage.clone(); + + // The OPTIONS method is procesed early, before we even check for an API key + if let Endpoint::Options = endpoint { + return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) + .await + .ok_or_bad_request("Error handling OPTIONS")?); + } + + let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; + let api_key = api_key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; + + let req = parse_streaming_body( + &api_key, + req, + &mut content_sha256, + &garage.config.s3_api.s3_region, + "k2v", + )?; + + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket_name, &api_key) + .await?; + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let allowed = match endpoint.authorization_type() { + Authorization::Read => api_key.allow_read(&bucket_id), + Authorization::Write => api_key.allow_write(&bucket_id), + Authorization::Owner => api_key.allow_owner(&bucket_id), + _ => unreachable!(), + }; + + if !allowed { + return Err(Error::forbidden("Operation is not allowed for this key.")); + } + + // Look up what CORS rule might apply to response. + // Requests for methods different than GET, HEAD or POST + // are always preflighted, i.e. the browser should make + // an OPTIONS call before to check it is allowed + let matching_cors_rule = match *req.method() { + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req) + .ok_or_internal_error("Error looking up CORS rule")?, + _ => None, + }; + + let resp = match endpoint { + Endpoint::DeleteItem { + partition_key, + sort_key, + } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await, + Endpoint::InsertItem { + partition_key, + sort_key, + } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await, + Endpoint::ReadItem { + partition_key, + sort_key, + } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + Endpoint::PollItem { + partition_key, + sort_key, + causality_token, + timeout, + } => { + handle_poll_item( + garage, + &req, + bucket_id, + partition_key, + sort_key, + causality_token, + timeout, + ) + .await + } + Endpoint::ReadIndex { + prefix, + start, + end, + limit, + reverse, + } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await, + Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, + Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::Options => unreachable!(), + }; + + // If request was a success and we have a CORS rule that applies to it, + // add the corresponding CORS headers to the response + let mut resp_ok = resp?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp_ok, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + + Ok(resp_ok) + } +} + +impl ApiEndpoint for K2VApiEndpoint { + fn name(&self) -> &'static str { + self.endpoint.name() + } + + fn add_span_attributes(&self, span: SpanRef<'_>) { + span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone())); + } +} diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs new file mode 100644 index 00000000..db9901cf --- /dev/null +++ b/src/api/k2v/batch.rs @@ -0,0 +1,363 @@ +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::{EnumerationOrder, TableSchema}; + +use garage_model::garage::Garage; +use garage_model::k2v::causality::*; +use garage_model::k2v::item_table::*; + +use crate::helpers::*; +use crate::k2v::error::*; +use crate::k2v::range::read_range; + +pub async fn handle_insert_batch( + garage: Arc<Garage>, + bucket_id: Uuid, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; + + let mut items2 = vec![]; + for it in items { + let ct = it + .ct + .map(|s| CausalContext::parse(&s)) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + let v = match it.v { + Some(vs) => { + DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) + } + None => DvvsValue::Deleted, + }; + items2.push((it.pk, it.sk, ct, v)); + } + + garage.k2v.rpc.insert_batch(bucket_id, items2).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_read_batch( + garage: Arc<Garage>, + bucket_id: Uuid, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; + + let resp_results = futures::future::join_all( + queries + .into_iter() + .map(|q| handle_read_batch_query(&garage, bucket_id, q)), + ) + .await; + + let mut resps: Vec<ReadBatchResponse> = vec![]; + for resp in resp_results { + resps.push(resp?); + } + + let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +async fn handle_read_batch_query( + garage: &Arc<Garage>, + bucket_id: Uuid, + query: ReadBatchQuery, +) -> Result<ReadBatchResponse, Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key: query.partition_key.clone(), + }; + + let filter = ItemFilter { + exclude_only_tombstones: !query.tombstones, + conflicts_only: query.conflicts_only, + }; + + let (items, more, next_start) = if query.single_item { + if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse { + return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.")); + } + let sk = query + .start + .as_ref() + .ok_or_bad_request("start should be specified if single_item is set")?; + let item = garage + .k2v + .item_table + .get(&partition, sk) + .await? + .filter(|e| K2VItemTable::matches_filter(e, &filter)); + match item { + Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None), + None => (vec![], false, None), + } + } else { + let (items, more, next_start) = read_range( + &garage.k2v.item_table, + &partition, + &query.prefix, + &query.start, + &query.end, + query.limit, + Some(filter), + EnumerationOrder::from_reverse(query.reverse), + ) + .await?; + + let items = items + .into_iter() + .map(ReadBatchResponseItem::from) + .collect::<Vec<_>>(); + + (items, more, next_start) + }; + + Ok(ReadBatchResponse { + partition_key: query.partition_key, + prefix: query.prefix, + start: query.start, + end: query.end, + limit: query.limit, + reverse: query.reverse, + single_item: query.single_item, + conflicts_only: query.conflicts_only, + tombstones: query.tombstones, + items, + more, + next_start, + }) +} + +pub async fn handle_delete_batch( + garage: Arc<Garage>, + bucket_id: Uuid, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; + + let resp_results = futures::future::join_all( + queries + .into_iter() + .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + ) + .await; + + let mut resps: Vec<DeleteBatchResponse> = vec![]; + for resp in resp_results { + resps.push(resp?); + } + + let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +async fn handle_delete_batch_query( + garage: &Arc<Garage>, + bucket_id: Uuid, + query: DeleteBatchQuery, +) -> Result<DeleteBatchResponse, Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key: query.partition_key.clone(), + }; + + let filter = ItemFilter { + exclude_only_tombstones: true, + conflicts_only: false, + }; + + let deleted_items = if query.single_item { + if query.prefix.is_some() || query.end.is_some() { + return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.")); + } + let sk = query + .start + .as_ref() + .ok_or_bad_request("start should be specified if single_item is set")?; + let item = garage + .k2v + .item_table + .get(&partition, sk) + .await? + .filter(|e| K2VItemTable::matches_filter(e, &filter)); + match item { + Some(i) => { + let cc = i.causal_context(); + garage + .k2v + .rpc + .insert( + bucket_id, + i.partition.partition_key, + i.sort_key, + Some(cc), + DvvsValue::Deleted, + ) + .await?; + 1 + } + None => 0, + } + } else { + let (items, more, _next_start) = read_range( + &garage.k2v.item_table, + &partition, + &query.prefix, + &query.start, + &query.end, + None, + Some(filter), + EnumerationOrder::Forward, + ) + .await?; + assert!(!more); + + // TODO delete items + let items = items + .into_iter() + .map(|i| { + let cc = i.causal_context(); + ( + i.partition.partition_key, + i.sort_key, + Some(cc), + DvvsValue::Deleted, + ) + }) + .collect::<Vec<_>>(); + let n = items.len(); + + garage.k2v.rpc.insert_batch(bucket_id, items).await?; + + n + }; + + Ok(DeleteBatchResponse { + partition_key: query.partition_key, + prefix: query.prefix, + start: query.start, + end: query.end, + single_item: query.single_item, + deleted_items, + }) +} + +#[derive(Deserialize)] +struct InsertBatchItem { + pk: String, + sk: String, + ct: Option<String>, + v: Option<String>, +} + +#[derive(Deserialize)] +struct ReadBatchQuery { + #[serde(rename = "partitionKey")] + partition_key: String, + #[serde(default)] + prefix: Option<String>, + #[serde(default)] + start: Option<String>, + #[serde(default)] + end: Option<String>, + #[serde(default)] + limit: Option<u64>, + #[serde(default)] + reverse: bool, + #[serde(default, rename = "singleItem")] + single_item: bool, + #[serde(default, rename = "conflictsOnly")] + conflicts_only: bool, + #[serde(default)] + tombstones: bool, +} + +#[derive(Serialize)] +struct ReadBatchResponse { + #[serde(rename = "partitionKey")] + partition_key: String, + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: bool, + #[serde(rename = "singleItem")] + single_item: bool, + #[serde(rename = "conflictsOnly")] + conflicts_only: bool, + tombstones: bool, + + items: Vec<ReadBatchResponseItem>, + more: bool, + #[serde(rename = "nextStart")] + next_start: Option<String>, +} + +#[derive(Serialize)] +struct ReadBatchResponseItem { + sk: String, + ct: String, + v: Vec<Option<String>>, +} + +impl ReadBatchResponseItem { + fn from(i: K2VItem) -> Self { + let ct = i.causal_context().serialize(); + let v = i + .values() + .iter() + .map(|v| match v { + DvvsValue::Value(x) => Some(base64::encode(x)), + DvvsValue::Deleted => None, + }) + .collect::<Vec<_>>(); + Self { + sk: i.sort_key, + ct, + v, + } + } +} + +#[derive(Deserialize)] +struct DeleteBatchQuery { + #[serde(rename = "partitionKey")] + partition_key: String, + #[serde(default)] + prefix: Option<String>, + #[serde(default)] + start: Option<String>, + #[serde(default)] + end: Option<String>, + #[serde(default, rename = "singleItem")] + single_item: bool, +} + +#[derive(Serialize)] +struct DeleteBatchResponse { + #[serde(rename = "partitionKey")] + partition_key: String, + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + #[serde(rename = "singleItem")] + single_item: bool, + + #[serde(rename = "deletedItems")] + deleted_items: usize, +} diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs new file mode 100644 index 00000000..42491466 --- /dev/null +++ b/src/api/k2v/error.rs @@ -0,0 +1,135 @@ +use err_derive::Error; +use hyper::header::HeaderValue; +use hyper::{Body, HeaderMap, StatusCode}; + +use garage_model::helper::error::Error as HelperError; + +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; +use crate::generic_server::ApiError; +use crate::helpers::CustomApiErrorBody; +use crate::signature::error::Error as SignatureError; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "{}", _0)] + /// Error from common error + Common(CommonError), + + // Category: cannot process + /// Authorization Header Malformed + #[error(display = "Authorization header malformed, expected scope: {}", _0)] + AuthorizationHeaderMalformed(String), + + /// The object requested don't exists + #[error(display = "Key not found")] + NoSuchKey, + + /// Some base64 encoded data was badly encoded + #[error(display = "Invalid base64: {}", _0)] + InvalidBase64(#[error(source)] base64::DecodeError), + + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), + + /// The client asked for an invalid return format (invalid Accept header) + #[error(display = "Not acceptable: {}", _0)] + NotAcceptable(String), + + /// The request contained an invalid UTF-8 sequence in its path or in other parameters + #[error(display = "Invalid UTF-8: {}", _0)] + InvalidUtf8Str(#[error(source)] std::str::Utf8Error), +} + +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::Common(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} + +impl From<HelperError> for Error { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), + HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), + HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), + HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), + e => Self::Common(CommonError::BadRequest(format!("{}", e))), + } + } +} + +impl From<SignatureError> for Error { + fn from(err: SignatureError) -> Self { + match err { + SignatureError::Common(c) => Self::Common(c), + SignatureError::AuthorizationHeaderMalformed(c) => { + Self::AuthorizationHeaderMalformed(c) + } + SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), + } + } +} + +impl Error { + /// This returns a keyword for the corresponding error. + /// Here, these keywords are not necessarily those from AWS S3, + /// as we are building a custom API + fn code(&self) -> &'static str { + match self { + Error::Common(c) => c.aws_code(), + Error::NoSuchKey => "NoSuchKey", + Error::NotAcceptable(_) => "NotAcceptable", + Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", + Error::InvalidBase64(_) => "InvalidBase64", + Error::InvalidHeader(_) => "InvalidHeaderValue", + Error::InvalidUtf8Str(_) => "InvalidUtf8String", + } + } +} + +impl ApiError for Error { + /// Get the HTTP status code that best represents the meaning of the error for the client + fn http_status_code(&self) -> StatusCode { + match self { + Error::Common(c) => c.http_status_code(), + Error::NoSuchKey => StatusCode::NOT_FOUND, + Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, + Error::AuthorizationHeaderMalformed(_) + | Error::InvalidBase64(_) + | Error::InvalidHeader(_) + | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, + } + } + + fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { + use hyper::header; + header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); + } + + fn http_body(&self, garage_region: &str, path: &str) -> Body { + let error = CustomApiErrorBody { + code: self.code().to_string(), + message: format!("{}", self), + path: path.to_string(), + region: garage_region.to_string(), + }; + Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + r#" +{ + "code": "InternalError", + "message": "JSON encoding of error failed" +} + "# + .into() + })) + } +} diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs new file mode 100644 index 00000000..210950bf --- /dev/null +++ b/src/api/k2v/index.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use hyper::{Body, Response, StatusCode}; +use serde::Serialize; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_rpc::ring::Ring; +use garage_table::util::*; + +use garage_model::garage::Garage; +use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; + +use crate::k2v::error::*; +use crate::k2v::range::read_range; + +pub async fn handle_read_index( + garage: Arc<Garage>, + bucket_id: Uuid, + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: Option<bool>, +) -> Result<Response<Body>, Error> { + let reverse = reverse.unwrap_or(false); + + let ring: Arc<Ring> = garage.system.ring.borrow().clone(); + + let (partition_keys, more, next_start) = read_range( + &garage.k2v.counter_table.table, + &bucket_id, + &prefix, + &start, + &end, + limit, + Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + EnumerationOrder::from_reverse(reverse), + ) + .await?; + + let s_entries = ENTRIES.to_string(); + let s_conflicts = CONFLICTS.to_string(); + let s_values = VALUES.to_string(); + let s_bytes = BYTES.to_string(); + + let resp = ReadIndexResponse { + prefix, + start, + end, + limit, + reverse, + partition_keys: partition_keys + .into_iter() + .map(|part| { + let vals = part.filtered_values(&ring); + ReadIndexResponseEntry { + pk: part.sk, + entries: *vals.get(&s_entries).unwrap_or(&0), + conflicts: *vals.get(&s_conflicts).unwrap_or(&0), + values: *vals.get(&s_values).unwrap_or(&0), + bytes: *vals.get(&s_bytes).unwrap_or(&0), + } + }) + .collect::<Vec<_>>(), + more, + next_start, + }; + + let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct ReadIndexResponse { + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: bool, + + #[serde(rename = "partitionKeys")] + partition_keys: Vec<ReadIndexResponseEntry>, + + more: bool, + #[serde(rename = "nextStart")] + next_start: Option<String>, +} + +#[derive(Serialize)] +struct ReadIndexResponseEntry { + pk: String, + entries: i64, + conflicts: i64, + values: i64, + bytes: i64, +} diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs new file mode 100644 index 00000000..836d386f --- /dev/null +++ b/src/api/k2v/item.rs @@ -0,0 +1,230 @@ +use std::sync::Arc; + +use http::header; + +use hyper::{Body, Request, Response, StatusCode}; + +use garage_util::data::*; + +use garage_model::garage::Garage; +use garage_model::k2v::causality::*; +use garage_model::k2v::item_table::*; + +use crate::k2v::error::*; + +pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; + +pub enum ReturnFormat { + Json, + Binary, + Either, +} + +impl ReturnFormat { + pub fn from(req: &Request<Body>) -> Result<Self, Error> { + let accept = match req.headers().get(header::ACCEPT) { + Some(a) => a.to_str()?, + None => return Ok(Self::Json), + }; + + let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>(); + let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*"); + let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*"); + + match (accept_json, accept_binary) { + (true, true) => Ok(Self::Either), + (true, false) => Ok(Self::Json), + (false, true) => Ok(Self::Binary), + (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())), + } + } + + pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> { + let vals = item.values(); + + if vals.is_empty() { + return Err(Error::NoSuchKey); + } + + let ct = item.causal_context().serialize(); + match self { + Self::Binary if vals.len() > 1 => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .status(StatusCode::CONFLICT) + .body(Body::empty())?), + Self::Binary => { + assert!(vals.len() == 1); + Self::make_binary_response(ct, vals[0]) + } + Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]), + _ => Self::make_json_response(ct, &vals[..]), + } + } + + fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> { + match v { + DvvsValue::Deleted => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/octet-stream") + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?), + DvvsValue::Value(v) => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/octet-stream") + .status(StatusCode::OK) + .body(Body::from(v.to_vec()))?), + } + } + + fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> { + let items = v + .iter() + .map(|v| match v { + DvvsValue::Deleted => serde_json::Value::Null, + DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)), + }) + .collect::<Vec<_>>(); + let json_body = + serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?; + Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/json") + .status(StatusCode::OK) + .body(Body::from(json_body))?) + } +} + +/// Handle ReadItem request +#[allow(clippy::ptr_arg)] +pub async fn handle_read_item( + garage: Arc<Garage>, + req: &Request<Body>, + bucket_id: Uuid, + partition_key: &str, + sort_key: &String, +) -> Result<Response<Body>, Error> { + let format = ReturnFormat::from(req)?; + + let item = garage + .k2v + .item_table + .get( + &K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + sort_key, + ) + .await? + .ok_or(Error::NoSuchKey)?; + + format.make_response(&item) +} + +pub async fn handle_insert_item( + garage: Arc<Garage>, + req: Request<Body>, + bucket_id: Uuid, + partition_key: &str, + sort_key: &str, +) -> Result<Response<Body>, Error> { + let causal_context = req + .headers() + .get(X_GARAGE_CAUSALITY_TOKEN) + .map(|s| s.to_str()) + .transpose()? + .map(CausalContext::parse) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + + let body = hyper::body::to_bytes(req.into_body()).await?; + let value = DvvsValue::Value(body.to_vec()); + + garage + .k2v + .rpc + .insert( + bucket_id, + partition_key.to_string(), + sort_key.to_string(), + causal_context, + value, + ) + .await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_delete_item( + garage: Arc<Garage>, + req: Request<Body>, + bucket_id: Uuid, + partition_key: &str, + sort_key: &str, +) -> Result<Response<Body>, Error> { + let causal_context = req + .headers() + .get(X_GARAGE_CAUSALITY_TOKEN) + .map(|s| s.to_str()) + .transpose()? + .map(CausalContext::parse) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + + let value = DvvsValue::Deleted; + + garage + .k2v + .rpc + .insert( + bucket_id, + partition_key.to_string(), + sort_key.to_string(), + causal_context, + value, + ) + .await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +/// Handle ReadItem request +#[allow(clippy::ptr_arg)] +pub async fn handle_poll_item( + garage: Arc<Garage>, + req: &Request<Body>, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causality_token: String, + timeout_secs: Option<u64>, +) -> Result<Response<Body>, Error> { + let format = ReturnFormat::from(req)?; + + let causal_context = + CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + + let item = garage + .k2v + .rpc + .poll( + bucket_id, + partition_key, + sort_key, + causal_context, + timeout_secs.unwrap_or(300) * 1000, + ) + .await?; + + if let Some(item) = item { + format.make_response(&item) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs new file mode 100644 index 00000000..b6a8c5cf --- /dev/null +++ b/src/api/k2v/mod.rs @@ -0,0 +1,9 @@ +pub mod api_server; +mod error; +mod router; + +mod batch; +mod index; +mod item; + +mod range; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs new file mode 100644 index 00000000..bb9d3be5 --- /dev/null +++ b/src/api/k2v/range.rs @@ -0,0 +1,100 @@ +//! Utility module for retrieving ranges of items in Garage tables +//! Implements parameters (prefix, start, end, limit) as specified +//! for endpoints ReadIndex, ReadBatch and DeleteBatch + +use std::sync::Arc; + +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::helpers::key_after_prefix; +use crate::k2v::error::*; + +/// Read range in a Garage table. +/// Returns (entries, more?, nextStart) +#[allow(clippy::too_many_arguments)] +pub(crate) async fn read_range<F>( + table: &Arc<Table<F, TableShardedReplication>>, + partition_key: &F::P, + prefix: &Option<String>, + start: &Option<String>, + end: &Option<String>, + limit: Option<u64>, + filter: Option<F::Filter>, + enumeration_order: EnumerationOrder, +) -> Result<(Vec<F::E>, bool, Option<String>), Error> +where + F: TableSchema<S = String> + 'static, +{ + let (mut start, mut start_ignore) = match (prefix, start) { + (None, None) => (None, false), + (None, Some(s)) => (Some(s.clone()), false), + (Some(p), Some(s)) => { + if !s.starts_with(p) { + return Err(Error::bad_request(format!( + "Start key '{}' does not start with prefix '{}'", + s, p + ))); + } + (Some(s.clone()), false) + } + (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => { + let start = key_after_prefix(p) + .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?; + (Some(start), true) + } + (Some(p), None) => (Some(p.clone()), false), + }; + + let mut entries = vec![]; + loop { + let n_get = std::cmp::min( + 1000, + limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2, + ); + let get_ret = table + .get_range( + partition_key, + start.clone(), + filter.clone(), + n_get, + enumeration_order, + ) + .await?; + + let get_ret_len = get_ret.len(); + + for entry in get_ret { + if start_ignore && Some(entry.sort_key()) == start.as_ref() { + continue; + } + if let Some(p) = prefix { + if !entry.sort_key().starts_with(p) { + return Ok((entries, false, None)); + } + } + if let Some(e) = end { + let is_finished = match enumeration_order { + EnumerationOrder::Forward => entry.sort_key() >= e, + EnumerationOrder::Reverse => entry.sort_key() <= e, + }; + if is_finished { + return Ok((entries, false, None)); + } + } + if let Some(l) = limit { + if entries.len() >= l as usize { + return Ok((entries, true, Some(entry.sort_key().clone()))); + } + } + entries.push(entry); + } + + if get_ret_len < n_get { + return Ok((entries, false, None)); + } + + start = Some(entries.last().unwrap().sort_key().clone()); + start_ignore = true; + } +} diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs new file mode 100644 index 00000000..50e6965b --- /dev/null +++ b/src/api/k2v/router.rs @@ -0,0 +1,252 @@ +use crate::k2v::error::*; + +use std::borrow::Cow; + +use hyper::{Method, Request}; + +use crate::helpers::Authorization; +use crate::router_macros::{generateQueryParameters, router_match}; + +router_match! {@func + + +/// List of all K2V API endpoints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Endpoint { + DeleteBatch { + }, + DeleteItem { + partition_key: String, + sort_key: String, + }, + InsertBatch { + }, + InsertItem { + partition_key: String, + sort_key: String, + }, + Options, + PollItem { + partition_key: String, + sort_key: String, + causality_token: String, + timeout: Option<u64>, + }, + ReadBatch { + }, + ReadIndex { + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: Option<bool>, + }, + ReadItem { + partition_key: String, + sort_key: String, + }, +}} + +impl Endpoint { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> { + let uri = req.uri(); + let path = uri.path().trim_start_matches('/'); + let query = uri.query(); + + let (bucket, partition_key) = path + .split_once('/') + .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/'))) + .unwrap_or((path.to_owned(), "")); + + if bucket.is_empty() { + return Err(Error::bad_request("Missing bucket name")); + } + + if *req.method() == Method::OPTIONS { + return Ok((Self::Options, bucket)); + } + + let partition_key = percent_encoding::percent_decode_str(partition_key) + .decode_utf8()? + .into_owned(); + + let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + + let method_search = Method::from_bytes(b"SEARCH").unwrap(); + let res = match *req.method() { + Method::GET => Self::from_get(partition_key, &mut query)?, + //&Method::HEAD => Self::from_head(partition_key, &mut query)?, + Method::POST => Self::from_post(partition_key, &mut query)?, + Method::PUT => Self::from_put(partition_key, &mut query)?, + Method::DELETE => Self::from_delete(partition_key, &mut query)?, + _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?, + _ => return Err(Error::bad_request("Unknown method")), + }; + + if let Some(message) = query.nonempty_message() { + debug!("Unused query parameter: {}", message) + } + Ok((res, bucket)) + } + + /// Determine which endpoint a request is for, knowing it is a GET. + fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout), + EMPTY => ReadItem (query::sort_key), + ], + no_key: [ + EMPTY => ReadIndex (query_opt::prefix, query_opt::start, query_opt::end, opt_parse::limit, opt_parse::reverse), + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a SEARCH. + fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + ], + no_key: [ + EMPTY => ReadBatch, + ] + } + } + + /* + /// Determine which endpoint a request is for, knowing it is a HEAD. + fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id), + ], + no_key: [ + EMPTY => HeadBucket, + ] + } + } + */ + + /// Determine which endpoint a request is for, knowing it is a POST. + fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + ], + no_key: [ + EMPTY => InsertBatch, + DELETE => DeleteBatch, + SEARCH => ReadBatch, + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a PUT. + fn from_put(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => InsertItem (query::sort_key), + + ], + no_key: [ + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a DELETE. + fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => DeleteItem (query::sort_key), + ], + no_key: [ + ] + } + } + + /// Get the partition key the request target. Returns None for requests which don't use a partition key. + #[allow(dead_code)] + pub fn get_partition_key(&self) -> Option<&str> { + router_match! { + @extract + self, + partition_key, + [ + DeleteItem, + InsertItem, + PollItem, + ReadItem, + ] + } + } + + /// Get the sort key the request target. Returns None for requests which don't use a sort key. + #[allow(dead_code)] + pub fn get_sort_key(&self) -> Option<&str> { + router_match! { + @extract + self, + sort_key, + [ + DeleteItem, + InsertItem, + PollItem, + ReadItem, + ] + } + } + + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + let readonly = router_match! { + @match + self, + [ + PollItem, + ReadBatch, + ReadIndex, + ReadItem, + ] + }; + if readonly { + Authorization::Read + } else { + Authorization::Write + } + } +} + +// parameter name => struct field +generateQueryParameters! { + "prefix" => prefix, + "start" => start, + "causality_token" => causality_token, + "end" => end, + "limit" => limit, + "reverse" => reverse, + "sort_key" => sort_key, + "timeout" => timeout +} + +mod keywords { + //! This module contain all query parameters with no associated value + //! used to differentiate endpoints. + pub const EMPTY: &str = ""; + + pub const DELETE: &str = "delete"; + pub const SEARCH: &str = "search"; +} diff --git a/src/api/lib.rs b/src/api/lib.rs index de60ec53..370dfd7a 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -2,26 +2,16 @@ #[macro_use] extern crate tracing; -pub mod error; -pub use error::Error; +pub mod common_error; mod encoding; - -mod api_server; -pub use api_server::run_api_server; - +pub mod generic_server; +pub mod helpers; +mod router_macros; /// This mode is public only to help testing. Don't expect stability here pub mod signature; -pub mod helpers; -mod s3_bucket; -mod s3_copy; -pub mod s3_cors; -mod s3_delete; -pub mod s3_get; -mod s3_list; -mod s3_post_object; -mod s3_put; -mod s3_router; -mod s3_website; -mod s3_xml; +pub mod admin; +#[cfg(feature = "k2v")] +pub mod k2v; +pub mod s3; diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs new file mode 100644 index 00000000..4c593300 --- /dev/null +++ b/src/api/router_macros.rs @@ -0,0 +1,213 @@ +/// This macro is used to generate very repetitive match {} blocks in this module +/// It is _not_ made to be used anywhere else +macro_rules! router_match { + (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ + // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } + // returns true if the variant was one of the listed variants, false otherwise. + use Endpoint::*; + match $enum { + $( + $endpoint { .. } => true, + )* + _ => false + } + }}; + (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{ + // usage: router_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] } + // returns Some(field_value), or None if the variant was not one of the listed variants. + use Endpoint::*; + match $enum { + $( + $endpoint {$param, ..} => Some($param), + )* + _ => None + } + }}; + (@gen_path_parser ($method:expr, $reqpath:expr, $query:expr) + [ + $($meth:ident $path:pat $(if $required:ident)? => $api:ident $(($($conv:ident :: $param:ident),*))?,)* + ]) => {{ + { + use Endpoint::*; + match ($method, $reqpath) { + $( + (&Method::$meth, $path) if true $(&& $query.$required.is_some())? => $api { + $($( + $param: router_match!(@@parse_param $query, $conv, $param), + )*)? + }, + )* + (m, p) => { + return Err(Error::bad_request(format!( + "Unknown API endpoint: {} {}", + m, p + ))) + } + } + } + }}; + (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr), + key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], + no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ + // usage: router_match {@gen_parser (keyword, key, query, header), + // key: [ + // SOME_KEYWORD => VariantWithKey, + // ... + // ], + // no_key: [ + // SOME_KEYWORD => VariantWithoutKey, + // ... + // ] + // } + // See in from_{method} for more detailed usage. + use Endpoint::*; + use keywords::*; + match ($keyword, !$key.is_empty()){ + $( + ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k { + $key, + $($( + $param_k: router_match!(@@parse_param $query, $conv_k, $param_k), + )*)? + }), + )* + $( + ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk { + $($( + $param_nk: router_match!(@@parse_param $query, $conv_nk, $param_nk), + )*)? + }), + )* + (kw, _) => Err(Error::bad_request(format!("Invalid endpoint: {}", kw))) + } + }}; + + (@@parse_param $query:expr, query_opt, $param:ident) => {{ + // extract optional query parameter + $query.$param.take().map(|param| param.into_owned()) + }}; + (@@parse_param $query:expr, query, $param:ident) => {{ + // extract mendatory query parameter + $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned() + }}; + (@@parse_param $query:expr, opt_parse, $param:ident) => {{ + // extract and parse optional query parameter + // missing parameter is file, however parse error is reported as an error + $query.$param + .take() + .map(|param| param.parse()) + .transpose() + .map_err(|_| Error::bad_request("Failed to parse query parameter"))? + }}; + (@@parse_param $query:expr, parse, $param:ident) => {{ + // extract and parse mandatory query parameter + // both missing and un-parseable parameters are reported as errors + $query.$param.take().ok_or_bad_request("Missing argument for endpoint")? + .parse() + .map_err(|_| Error::bad_request("Failed to parse query parameter"))? + }}; + (@func + $(#[$doc:meta])* + pub enum Endpoint { + $( + $(#[$outer:meta])* + $variant:ident $({ + $($name:ident: $ty:ty,)* + })?, + )* + }) => { + $(#[$doc])* + pub enum Endpoint { + $( + $(#[$outer])* + $variant $({ + $($name: $ty, )* + })?, + )* + } + impl Endpoint { + pub fn name(&self) -> &'static str { + match self { + $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)* + } + } + } + }; + (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => { + $($then)* + }; + (@if () then ($($then:tt)*) else ($($else:tt)*)) => { + $($else)* + }; +} + +/// This macro is used to generate part of the code in this module. It must be called only one, and +/// is useless outside of this module. +macro_rules! generateQueryParameters { + ( $($rest:expr => $name:ident),* ) => { + /// Struct containing all query parameters used in endpoints. Think of it as an HashMap, + /// but with keys statically known. + #[derive(Debug, Default)] + struct QueryParameters<'a> { + keyword: Option<Cow<'a, str>>, + $( + $name: Option<Cow<'a, str>>, + )* + } + + impl<'a> QueryParameters<'a> { + /// Build this struct from the query part of an URI. + fn from_query(query: &'a str) -> Result<Self, Error> { + let mut res: Self = Default::default(); + for (k, v) in url::form_urlencoded::parse(query.as_bytes()) { + let repeated = match k.as_ref() { + $( + $rest => if !v.is_empty() { + res.$name.replace(v).is_some() + } else { + false + }, + )* + _ => { + if k.starts_with("response-") || k.starts_with("X-Amz-") { + false + } else if v.as_ref().is_empty() { + if res.keyword.replace(k).is_some() { + return Err(Error::bad_request("Multiple keywords")); + } + continue; + } else { + debug!("Received an unknown query parameter: '{}'", k); + false + } + } + }; + if repeated { + return Err(Error::bad_request(format!( + "Query parameter repeated: '{}'", + k + ))); + } + } + Ok(res) + } + + /// Get an error message in case not all parameters where used when extracting them to + /// build an Enpoint variant + fn nonempty_message(&self) -> Option<&str> { + if self.keyword.is_some() { + Some("Keyword not used") + } $( + else if self.$name.is_some() { + Some(concat!("'", $rest, "'")) + } + )* else { + None + } + } + } + } +} + +pub(crate) use generateQueryParameters; +pub(crate) use router_match; diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs new file mode 100644 index 00000000..27837297 --- /dev/null +++ b/src/api/s3/api_server.rs @@ -0,0 +1,390 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use hyper::header; +use hyper::{Body, Request, Response}; + +use opentelemetry::{trace::SpanRef, KeyValue}; + +use garage_util::error::Error as GarageError; + +use garage_model::garage::Garage; +use garage_model::key_table::Key; + +use crate::generic_server::*; +use crate::s3::error::*; + +use crate::signature::payload::check_payload_signature; +use crate::signature::streaming::*; + +use crate::helpers::*; +use crate::s3::bucket::*; +use crate::s3::copy::*; +use crate::s3::cors::*; +use crate::s3::delete::*; +use crate::s3::get::*; +use crate::s3::list::*; +use crate::s3::post_object::handle_post_object; +use crate::s3::put::*; +use crate::s3::router::Endpoint; +use crate::s3::website::*; + +pub struct S3ApiServer { + garage: Arc<Garage>, +} + +pub(crate) struct S3ApiEndpoint { + bucket_name: Option<String>, + endpoint: Endpoint, +} + +impl S3ApiServer { + pub async fn run( + garage: Arc<Garage>, + addr: SocketAddr, + s3_region: String, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + ApiServer::new(s3_region, S3ApiServer { garage }) + .run_server(addr, shutdown_signal) + .await + } + + async fn handle_request_without_bucket( + &self, + _req: Request<Body>, + api_key: Key, + endpoint: Endpoint, + ) -> Result<Response<Body>, Error> { + match endpoint { + Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await, + endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + } + } +} + +#[async_trait] +impl ApiHandler for S3ApiServer { + const API_NAME: &'static str = "s3"; + const API_NAME_DISPLAY: &'static str = "S3"; + + type Endpoint = S3ApiEndpoint; + type Error = Error; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> { + let authority = req + .headers() + .get(header::HOST) + .ok_or_bad_request("Host header required")? + .to_str()?; + + let host = authority_to_host(authority)?; + + let bucket_name = self + .garage + .config + .s3_api + .root_domain + .as_ref() + .and_then(|root_domain| host_to_bucket(&host, root_domain)); + + let (endpoint, bucket_name) = + Endpoint::from_request(req, bucket_name.map(ToOwned::to_owned))?; + + Ok(S3ApiEndpoint { + bucket_name, + endpoint, + }) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: S3ApiEndpoint, + ) -> Result<Response<Body>, Error> { + let S3ApiEndpoint { + bucket_name, + endpoint, + } = endpoint; + let garage = self.garage.clone(); + + // Some endpoints are processed early, before we even check for an API key + if let Endpoint::PostObject = endpoint { + return handle_post_object(garage, req, bucket_name.unwrap()).await; + } + if let Endpoint::Options = endpoint { + return handle_options_s3api(garage, &req, bucket_name).await; + } + + let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; + let api_key = api_key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; + + let req = parse_streaming_body( + &api_key, + req, + &mut content_sha256, + &garage.config.s3_api.s3_region, + "s3", + )?; + + let bucket_name = match bucket_name { + None => { + return self + .handle_request_without_bucket(req, api_key, endpoint) + .await + } + Some(bucket) => bucket.to_string(), + }; + + // Special code path for CreateBucket API endpoint + if let Endpoint::CreateBucket {} = endpoint { + return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await; + } + + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket_name, &api_key) + .await?; + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let allowed = match endpoint.authorization_type() { + Authorization::Read => api_key.allow_read(&bucket_id), + Authorization::Write => api_key.allow_write(&bucket_id), + Authorization::Owner => api_key.allow_owner(&bucket_id), + _ => unreachable!(), + }; + + if !allowed { + return Err(Error::forbidden("Operation is not allowed for this key.")); + } + + let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?; + + let resp = match endpoint { + Endpoint::HeadObject { + key, part_number, .. + } => handle_head(garage, &req, bucket_id, &key, part_number).await, + Endpoint::GetObject { + key, part_number, .. + } => handle_get(garage, &req, bucket_id, &key, part_number).await, + Endpoint::UploadPart { + key, + part_number, + upload_id, + } => { + handle_put_part( + garage, + req, + bucket_id, + &key, + part_number, + &upload_id, + content_sha256, + ) + .await + } + Endpoint::CopyObject { key } => { + handle_copy(garage, &api_key, &req, bucket_id, &key).await + } + Endpoint::UploadPartCopy { + key, + part_number, + upload_id, + } => { + handle_upload_part_copy( + garage, + &api_key, + &req, + bucket_id, + &key, + part_number, + &upload_id, + ) + .await + } + Endpoint::PutObject { key } => { + handle_put(garage, req, &bucket, &key, content_sha256).await + } + Endpoint::AbortMultipartUpload { key, upload_id } => { + handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await + } + Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, + Endpoint::CreateMultipartUpload { key } => { + handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await + } + Endpoint::CompleteMultipartUpload { key, upload_id } => { + handle_complete_multipart_upload( + garage, + req, + &bucket_name, + &bucket, + &key, + &upload_id, + content_sha256, + ) + .await + } + Endpoint::CreateBucket {} => unreachable!(), + Endpoint::HeadBucket {} => { + let empty_body: Body = Body::from(vec![]); + let response = Response::builder().body(empty_body).unwrap(); + Ok(response) + } + Endpoint::DeleteBucket {} => { + handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await + } + Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), + Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), + Endpoint::ListObjects { + delimiter, + encoding_type, + marker, + max_keys, + prefix, + } => { + handle_list( + garage, + &ListObjectsQuery { + common: ListQueryCommon { + bucket_name, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + }, + is_v2: false, + marker, + continuation_token: None, + start_after: None, + }, + ) + .await + } + Endpoint::ListObjectsV2 { + delimiter, + encoding_type, + max_keys, + prefix, + continuation_token, + start_after, + list_type, + .. + } => { + if list_type == "2" { + handle_list( + garage, + &ListObjectsQuery { + common: ListQueryCommon { + bucket_name, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + prefix: prefix.unwrap_or_default(), + }, + is_v2: true, + marker: None, + continuation_token, + start_after, + }, + ) + .await + } else { + Err(Error::bad_request(format!( + "Invalid endpoint: list-type={}", + list_type + ))) + } + } + Endpoint::ListMultipartUploads { + delimiter, + encoding_type, + key_marker, + max_uploads, + prefix, + upload_id_marker, + } => { + handle_list_multipart_upload( + garage, + &ListMultipartUploadsQuery { + common: ListQueryCommon { + bucket_name, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + }, + key_marker, + upload_id_marker, + }, + ) + .await + } + Endpoint::ListParts { + key, + max_parts, + part_number_marker, + upload_id, + } => { + handle_list_parts( + garage, + &ListPartsQuery { + bucket_name, + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), + max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + }, + ) + .await + } + Endpoint::DeleteObjects {} => { + handle_delete_objects(garage, bucket_id, req, content_sha256).await + } + Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, + Endpoint::PutBucketWebsite {} => { + handle_put_website(garage, bucket_id, req, content_sha256).await + } + Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await, + Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, + Endpoint::PutBucketCors {} => { + handle_put_cors(garage, bucket_id, req, content_sha256).await + } + Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await, + endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + }; + + // If request was a success and we have a CORS rule that applies to it, + // add the corresponding CORS headers to the response + let mut resp_ok = resp?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp_ok, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + + Ok(resp_ok) + } +} + +impl ApiEndpoint for S3ApiEndpoint { + fn name(&self) -> &'static str { + self.endpoint.name() + } + + fn add_span_attributes(&self, span: SpanRef<'_>) { + span.set_attribute(KeyValue::new( + "bucket", + self.bucket_name.clone().unwrap_or_default(), + )); + } +} diff --git a/src/api/s3_bucket.rs b/src/api/s3/bucket.rs index 8a5407d3..3ac6a6ec 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3/bucket.rs @@ -7,15 +7,15 @@ use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::key_table::Key; -use garage_model::object_table::ObjectFilter; use garage_model::permission::BucketKeyPerm; use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::time::*; -use crate::error::*; -use crate::s3_xml; +use crate::common_error::CommonError; +use crate::s3::error::*; +use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> { @@ -130,7 +130,7 @@ pub async fn handle_create_bucket( if let Some(location_constraint) = cmd { if location_constraint != garage.config.s3_api.s3_region { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`", location_constraint, garage.config.s3_api.s3_region @@ -158,12 +158,12 @@ pub async fn handle_create_bucket( // otherwise return a forbidden error. let kp = api_key.bucket_permissions(&bucket_id); if !(kp.allow_write || kp.allow_owner) { - return Err(Error::BucketAlreadyExists); + return Err(CommonError::BucketAlreadyExists.into()); } } else { // Create the bucket! if !is_valid_bucket_name(&bucket_name) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "{}: {}", bucket_name, INVALID_BUCKET_NAME_MESSAGE ))); @@ -228,12 +228,8 @@ pub async fn handle_delete_bucket( // Delete bucket // Check bucket is empty - let objects = garage - .object_table - .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) - .await?; - if !objects.is_empty() { - return Err(Error::BucketNotEmpty); + if !garage.bucket_helper().is_bucket_empty(bucket_id).await? { + return Err(CommonError::BucketNotEmpty.into()); } // --- done checking, now commit --- @@ -299,7 +295,6 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> { let mut ret = None; for item in cbc.children() { - println!("{:?}", item); if item.has_tag_name("LocationConstraint") { if ret != None { return None; diff --git a/src/api/s3_copy.rs b/src/api/s3/copy.rs index fc4707e2..7eb6459d 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3/copy.rs @@ -5,23 +5,26 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; +use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::netapp::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; -use garage_model::block_ref_table::*; use garage_model::garage::Garage; use garage_model::key_table::Key; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; -use crate::api_server::{parse_bucket_key, resolve_bucket}; -use crate::error::*; -use crate::s3_put::{decode_upload_id, get_headers}; -use crate::s3_xml::{self, xmlns_tag}; +use crate::helpers::parse_bucket_key; +use crate::s3::error::*; +use crate::s3::put::{decode_upload_id, get_headers}; +use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc<Garage>, @@ -201,8 +204,8 @@ pub async fn handle_upload_part_copy( let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size) .map_err(|e| (e, source_version_meta.size))?; if ranges.len() != 1 { - return Err(Error::BadRequest( - "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(), + return Err(Error::bad_request( + "Invalid x-amz-copy-source-range header: exactly 1 range must be given", )); } else { ranges.pop().unwrap() @@ -230,8 +233,8 @@ pub async fn handle_upload_part_copy( // This is only for small files, we don't bother handling this. // (in AWS UploadPartCopy works for parts at least 5MB which // is never the case of an inline object) - return Err(Error::BadRequest( - "Source object is too small (minimum part size is 5Mb)".into(), + return Err(Error::bad_request( + "Source object is too small (minimum part size is 5Mb)", )); } ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (), @@ -250,7 +253,7 @@ pub async fn handle_upload_part_copy( // Check this part number hasn't yet been uploaded if let Some(dv) = dest_version { if dv.has_part_number(part_number) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Part number {} has already been uploaded", part_number ))); @@ -305,13 +308,18 @@ pub async fn handle_upload_part_copy( // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); + let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { + .enumerate() + .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + let data = garage3 + .block_manager + .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + .await?; match range_to_copy { - Some(r) => Ok((data[r].to_vec(), None)), + Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), } }) @@ -413,10 +421,13 @@ async fn get_copy_source( let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; - let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?; + let source_bucket_id = garage + .bucket_helper() + .resolve_bucket(&source_bucket.to_string(), api_key) + .await?; if !api_key.allow_read(&source_bucket_id) { - return Err(Error::Forbidden(format!( + return Err(Error::forbidden(format!( "Reading from bucket {} not allowed for this key", source_bucket ))); @@ -536,8 +547,8 @@ impl CopyPreconditionHeaders { (None, None, None, Some(ims)) => v_date > *ims, (None, None, None, None) => true, _ => { - return Err(Error::BadRequest( - "Invalid combination of x-amz-copy-source-if-xxxxx headers".into(), + return Err(Error::bad_request( + "Invalid combination of x-amz-copy-source-if-xxxxx headers", )) } }; @@ -550,13 +561,13 @@ impl CopyPreconditionHeaders { } } -type BlockStreamItemOk = (Vec<u8>, Option<Hash>); +type BlockStreamItemOk = (Bytes, Option<Hash>); type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>; struct Defragmenter<S: Stream<Item = BlockStreamItem>> { block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>, - buffer: Vec<u8>, + buffer: BytesBuf, hash: Option<Hash>, } @@ -565,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { Self { block_size, block_stream, - buffer: vec![], + buffer: BytesBuf::new(), hash: None, } } @@ -583,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block; + self.buffer.extend(next_block); self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -594,11 +605,11 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { } } - Ok((std::mem::take(&mut self.buffer), self.hash.take())) + Ok((self.buffer.take_all(), self.hash.take())) } } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct CopyObjectResult { #[serde(rename = "LastModified")] pub last_modified: s3_xml::Value, @@ -606,7 +617,7 @@ pub struct CopyObjectResult { pub etag: s3_xml::Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct CopyPartResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -619,7 +630,7 @@ pub struct CopyPartResult { #[cfg(test)] mod tests { use super::*; - use crate::s3_xml::to_xml_with_header; + use crate::s3::xml::to_xml_with_header; #[test] fn copy_object_result() -> Result<(), Error> { @@ -651,7 +662,6 @@ mod tests { last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()), etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()), }; - println!("{}", to_xml_with_header(&v)?); assert_eq!(to_xml_with_header(&v)?, expected_retval); diff --git a/src/api/s3_cors.rs b/src/api/s3/cors.rs index ab77e23a..c7273464 100644 --- a/src/api/s3_cors.rs +++ b/src/api/s3/cors.rs @@ -9,13 +9,12 @@ use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::error::*; -use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; +use crate::s3::error::*; +use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; -use garage_table::*; use garage_util::data::*; pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { @@ -48,14 +47,11 @@ pub async fn handle_delete_cors( bucket_id: Uuid, ) -> Result<Response<Body>, Error> { let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); param.cors_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -78,14 +74,11 @@ pub async fn handle_put_cors( } let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; @@ -119,12 +112,7 @@ pub async fn handle_options_s3api( let helper = garage.bucket_helper(); let bucket_id = helper.resolve_global_bucket_name(&bn).await?; if let Some(id) = bucket_id { - let bucket = garage - .bucket_table - .get(&EmptyKey, &id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; + let bucket = garage.bucket_helper().get_existing_bucket(id).await?; handle_options_for_bucket(req, &bucket) } else { // If there is a bucket name in the request, but that name @@ -185,7 +173,7 @@ pub fn handle_options_for_bucket( } } - Err(Error::Forbidden("This CORS request is not allowed.".into())) + Err(Error::forbidden("This CORS request is not allowed.")) } pub fn find_matching_cors_rule<'a>( diff --git a/src/api/s3_delete.rs b/src/api/s3/delete.rs index b243d982..b337155f 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3/delete.rs @@ -6,10 +6,10 @@ use garage_util::data::*; use garage_util::time::*; use garage_model::garage::Garage; -use garage_model::object_table::*; +use garage_model::s3::object_table::*; -use crate::error::*; -use crate::s3_xml; +use crate::s3::error::*; +use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; async fn handle_delete_internal( @@ -64,14 +64,13 @@ pub async fn handle_delete( bucket_id: Uuid, key: &str, ) -> Result<Response<Body>, Error> { - let (_deleted_version, delete_marker_version) = - handle_delete_internal(&garage, bucket_id, key).await?; - - Ok(Response::builder() - .header("x-amz-version-id", hex::encode(delete_marker_version)) - .status(StatusCode::NO_CONTENT) - .body(Body::from(vec![])) - .unwrap()) + match handle_delete_internal(&garage, bucket_id, key).await { + Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::from(vec![])) + .unwrap()), + Err(e) => Err(e), + } } pub async fn handle_delete_objects( diff --git a/src/api/error.rs b/src/api/s3/error.rs index f53ed1fd..67009d63 100644 --- a/src/api/error.rs +++ b/src/api/s3/error.rs @@ -2,34 +2,24 @@ use std::convert::TryInto; use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{HeaderMap, StatusCode}; +use hyper::{Body, HeaderMap, StatusCode}; use garage_model::helper::error::Error as HelperError; -use garage_util::error::Error as GarageError; -use crate::s3_xml; +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; +use crate::generic_server::ApiError; +use crate::s3::xml as s3_xml; +use crate::signature::error::Error as SignatureError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { - // Category: internal error - /// Error related to deeper parts of Garage - #[error(display = "Internal error: {}", _0)] - InternalError(#[error(source)] GarageError), - - /// Error related to Hyper - #[error(display = "Internal error (Hyper error): {}", _0)] - Hyper(#[error(source)] hyper::Error), - - /// Error related to HTTP - #[error(display = "Internal error (HTTP error): {}", _0)] - Http(#[error(source)] http::Error), + #[error(display = "{}", _0)] + /// Error from common error + Common(CommonError), // Category: cannot process - /// No proper api key was used, or the signature was invalid - #[error(display = "Forbidden: {}", _0)] - Forbidden(String), - /// Authorization Header Malformed #[error(display = "Authorization header malformed, expected scope: {}", _0)] AuthorizationHeaderMalformed(String), @@ -38,22 +28,10 @@ pub enum Error { #[error(display = "Key not found")] NoSuchKey, - /// The bucket requested don't exists - #[error(display = "Bucket not found")] - NoSuchBucket, - /// The multipart upload requested don't exists #[error(display = "Upload not found")] NoSuchUpload, - /// Tried to create a bucket that already exist - #[error(display = "Bucket already exists")] - BucketAlreadyExists, - - /// Tried to delete a non-empty bucket - #[error(display = "Tried to delete a non-empty bucket")] - BucketNotEmpty, - /// Precondition failed (e.g. x-amz-copy-source-if-match) #[error(display = "At least one of the preconditions you specified did not hold")] PreconditionFailed, @@ -80,10 +58,6 @@ pub enum Error { #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8String(#[error(source)] std::string::FromUtf8Error), - /// Some base64 encoded data was badly encoded - #[error(display = "Invalid base64: {}", _0)] - InvalidBase64(#[error(source)] base64::DecodeError), - /// The client sent invalid XML data #[error(display = "Invalid XML: {}", _0)] InvalidXml(String), @@ -96,15 +70,34 @@ pub enum Error { #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), - /// The client sent an invalid request - #[error(display = "Bad request: {}", _0)] - BadRequest(String), - /// The client sent a request for an action not supported by garage #[error(display = "Unimplemented action: {}", _0)] NotImplemented(String), } +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::Common(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} + +impl From<HelperError> for Error { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), + HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), + HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), + HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), + e => Self::bad_request(format!("{}", e)), + } + } +} + impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -117,88 +110,71 @@ impl From<quick_xml::de::DeError> for Error { } } -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { +impl From<SignatureError> for Error { + fn from(err: SignatureError) -> Self { match err { - HelperError::Internal(i) => Self::InternalError(i), - HelperError::BadRequest(b) => Self::BadRequest(b), + SignatureError::Common(c) => Self::Common(c), + SignatureError::AuthorizationHeaderMalformed(c) => { + Self::AuthorizationHeaderMalformed(c) + } + SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } impl From<multer::Error> for Error { fn from(err: multer::Error) -> Self { - Self::BadRequest(err.to_string()) + Self::bad_request(err) } } impl Error { - /// Get the HTTP status code that best represents the meaning of the error for the client - pub fn http_status_code(&self) -> StatusCode { - match self { - Error::NoSuchKey | Error::NoSuchBucket | Error::NoSuchUpload => StatusCode::NOT_FOUND, - Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT, - Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, - Error::Forbidden(_) => StatusCode::FORBIDDEN, - Error::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => StatusCode::SERVICE_UNAVAILABLE, - Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } - Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE, - Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED, - _ => StatusCode::BAD_REQUEST, - } - } - pub fn aws_code(&self) -> &'static str { match self { + Error::Common(c) => c.aws_code(), Error::NoSuchKey => "NoSuchKey", - Error::NoSuchBucket => "NoSuchBucket", Error::NoSuchUpload => "NoSuchUpload", - Error::BucketAlreadyExists => "BucketAlreadyExists", - Error::BucketNotEmpty => "BucketNotEmpty", Error::PreconditionFailed => "PreconditionFailed", Error::InvalidPart => "InvalidPart", Error::InvalidPartOrder => "InvalidPartOrder", Error::EntityTooSmall => "EntityTooSmall", - Error::Forbidden(_) => "AccessDenied", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::NotImplemented(_) => "NotImplemented", - Error::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => "ServiceUnavailable", - Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError", - _ => "InvalidRequest", + Error::InvalidXml(_) => "MalformedXML", + Error::InvalidRange(_) => "InvalidRange", + Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { + "InvalidRequest" + } } } +} - pub fn aws_xml(&self, garage_region: &str, path: &str) -> String { - let error = s3_xml::Error { - code: s3_xml::Value(self.aws_code().to_string()), - message: s3_xml::Value(format!("{}", self)), - resource: Some(s3_xml::Value(path.to_string())), - region: Some(s3_xml::Value(garage_region.to_string())), - }; - s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { - r#" -<?xml version="1.0" encoding="UTF-8"?> -<Error> - <Code>InternalError</Code> - <Message>XML encoding of error failed</Message> -</Error> - "# - .into() - }) +impl ApiError for Error { + /// Get the HTTP status code that best represents the meaning of the error for the client + fn http_status_code(&self) -> StatusCode { + match self { + Error::Common(c) => c.http_status_code(), + Error::NoSuchKey | Error::NoSuchUpload => StatusCode::NOT_FOUND, + Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, + Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE, + Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED, + Error::AuthorizationHeaderMalformed(_) + | Error::InvalidPart + | Error::InvalidPartOrder + | Error::EntityTooSmall + | Error::InvalidXml(_) + | Error::InvalidUtf8Str(_) + | Error::InvalidUtf8String(_) + | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST, + } } - pub fn add_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { + fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) { use hyper::header; + + header_map.append(header::CONTENT_TYPE, "application/xml".parse().unwrap()); + #[allow(clippy::single_match)] match self { Error::InvalidRange((_, len)) => { @@ -212,68 +188,23 @@ impl Error { _ => (), } } -} - -/// Trait to map error to the Bad Request error code -pub trait OkOrBadRequest { - type S; - fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>; -} - -impl<T, E> OkOrBadRequest for Result<T, E> -where - E: std::fmt::Display, -{ - type S = T; - fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Ok(x) => Ok(x), - Err(e) => Err(Error::BadRequest(format!("{}: {}", reason.as_ref(), e))), - } - } -} - -impl<T> OkOrBadRequest for Option<T> { - type S = T; - fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Some(x) => Ok(x), - None => Err(Error::BadRequest(reason.as_ref().to_string())), - } - } -} - -/// Trait to map an error to an Internal Error code -pub trait OkOrInternalError { - type S; - fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>; -} - -impl<T, E> OkOrInternalError for Result<T, E> -where - E: std::fmt::Display, -{ - type S = T; - fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Ok(x) => Ok(x), - Err(e) => Err(Error::InternalError(GarageError::Message(format!( - "{}: {}", - reason.as_ref(), - e - )))), - } - } -} -impl<T> OkOrInternalError for Option<T> { - type S = T; - fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> { - match self { - Some(x) => Ok(x), - None => Err(Error::InternalError(GarageError::Message( - reason.as_ref().to_string(), - ))), - } + fn http_body(&self, garage_region: &str, path: &str) -> Body { + let error = s3_xml::Error { + code: s3_xml::Value(self.aws_code().to_string()), + message: s3_xml::Value(format!("{}", self)), + resource: Some(s3_xml::Value(path.to_string())), + region: Some(s3_xml::Value(garage_region.to_string())), + }; + Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { + r#" +<?xml version="1.0" encoding="UTF-8"?> +<Error> + <Code>InternalError</Code> + <Message>XML encoding of error failed</Message> +</Error> + "# + .into() + })) } } diff --git a/src/api/s3_get.rs b/src/api/s3/get.rs index 7f647e15..2a99551a 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3/get.rs @@ -2,22 +2,25 @@ use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; -use futures::stream::*; +use futures::future; +use futures::stream::{self, StreamExt}; use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; +use tokio::sync::mpsc; +use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; -use crate::error::*; +use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -210,8 +213,8 @@ pub async fn handle_get( match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => { - return Err(Error::BadRequest( - "Cannot specify both partNumber and Range header".into(), + return Err(Error::bad_request( + "Cannot specify both partNumber and Range header", )); } (Some(pn), None) => { @@ -242,36 +245,56 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let read_first_block = garage.block_manager.rpc_get_block(first_block_hash); - let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); - - let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; - let version = version.ok_or(Error::NoSuchKey)?; + let (tx, rx) = mpsc::channel(2); + + let order_stream = OrderTag::stream(); + let first_block_hash = *first_block_hash; + let version_uuid = last_v.uuid; + + tokio::spawn(async move { + match async { + let garage2 = garage.clone(); + let version_fut = tokio::spawn(async move { + garage2.version_table.get(&version_uuid, &EmptyKey).await + }); + + let stream_block_0 = garage + .block_manager + .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + .await?; + tx.send(stream_block_0) + .await + .ok_or_message("channel closed")?; + + let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; + for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { + let stream_block_i = garage + .block_manager + .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + .await?; + tx.send(stream_block_i) + .await + .ok_or_message("channel closed")?; + } - let mut blocks = version - .blocks - .items() - .iter() - .map(|(_, vb)| (vb.hash, None)) - .collect::<Vec<_>>(); - blocks[0].1 = Some(first_block); - - let body_stream = futures::stream::iter(blocks) - .map(move |(hash, data_opt)| { - let garage = garage.clone(); - async move { - if let Some(data) = data_opt { - Ok(Bytes::from(data)) - } else { - garage - .block_manager - .rpc_get_block(&hash) - .await - .map(Bytes::from) - } + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + let _ = tx + .send(Box::pin(stream::once(future::ready(Err(err))))) + .await; } - }) - .buffered(2); + } + }); + + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) @@ -302,9 +325,9 @@ async fn handle_get_range( let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); Ok(resp_builder.body(body)?) } else { - None.ok_or_internal_error( + Err(Error::internal_error( "Requested range not present in inline bytes when it should have been", - ) + )) } } ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { @@ -422,40 +445,79 @@ fn body_from_blocks_range( all_blocks.len(), 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, )); - let mut true_offset = 0; + let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { - if true_offset >= end { + if block_offset >= end { break; } // Keep only blocks that have an intersection with the requested range - if true_offset < end && true_offset + b.size > begin { - blocks.push((*b, true_offset)); + if block_offset < end && block_offset + b.size > begin { + blocks.push((*b, block_offset)); } - true_offset += b.size; + block_offset += b.size as u64; } + let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) - .map(move |(block, true_offset)| { + .enumerate() + .map(move |(i, (block, block_offset))| { let garage = garage.clone(); async move { - let data = garage.block_manager.rpc_get_block(&block.hash).await?; - let data = Bytes::from(data); - let start_in_block = if true_offset > begin { - 0 - } else { - begin - true_offset - }; - let end_in_block = if true_offset + block.size < end { - block.size - } else { - end - true_offset - }; - Result::<Bytes, Error>::Ok( - data.slice(start_in_block as usize..end_in_block as usize), - ) + garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) } }) - .buffered(2); + .buffered(2) + .flatten(); hyper::body::Body::wrap_stream(body_stream) } + +fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Could not get block {}: {}", i, e), + )) + })) +} diff --git a/src/api/s3_list.rs b/src/api/s3/list.rs index 5852fc1b..e5f486c8 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3/list.rs @@ -10,15 +10,16 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::Version; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::Version; -use garage_table::EmptyKey; +use garage_table::{EmptyKey, EnumerationOrder}; use crate::encoding::*; -use crate::error::*; -use crate::s3_put; -use crate::s3_xml; +use crate::helpers::key_after_prefix; +use crate::s3::error::*; +use crate::s3::put as s3_put; +use crate::s3::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; const DUMMY_KEY: &str = "GKDummyKey"; @@ -66,8 +67,14 @@ pub async fn handle_list( let io = |bucket, key, count| { let t = &garage.object_table; async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsData), count) - .await + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsData), + count, + EnumerationOrder::Forward, + ) + .await } }; @@ -165,8 +172,14 @@ pub async fn handle_list_multipart_upload( let io = |bucket, key, count| { let t = &garage.object_table; async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count) - .await + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsUploading), + count, + EnumerationOrder::Forward, + ) + .await } }; @@ -569,13 +582,19 @@ impl ListObjectsQuery { // representing the key to start with. (Some(token), _) => match &token[..1] { "[" => Ok(RangeBegin::IncludingKey { - key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + key: String::from_utf8( + base64::decode(token[1..].as_bytes()) + .ok_or_bad_request("Invalid continuation token")?, + )?, fallback_key: None, }), "]" => Ok(RangeBegin::AfterKey { - key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + key: String::from_utf8( + base64::decode(token[1..].as_bytes()) + .ok_or_bad_request("Invalid continuation token")?, + )?, }), - _ => Err(Error::BadRequest("Invalid continuation token".to_string())), + _ => Err(Error::bad_request("Invalid continuation token")), }, // StartAfter has defined semantics in the spec: @@ -923,39 +942,13 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { } } -const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}'; - -/// Compute the key after the prefix -fn key_after_prefix(pfx: &str) -> Option<String> { - let mut next = pfx.to_string(); - while !next.is_empty() { - let tail = next.pop().unwrap(); - if tail >= char::MAX { - continue; - } - - // Circumvent a limitation of RangeFrom that overflow earlier than needed - // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html - let new_tail = if tail == UTF8_BEFORE_LAST_CHAR { - char::MAX - } else { - (tail..).nth(1).unwrap() - }; - - next.push(new_tail); - return Some(next); - } - - None -} - /* * Unit tests of this module */ #[cfg(test)] mod tests { use super::*; - use garage_model::version_table::*; + use garage_model::s3::version_table::*; use garage_util::*; use std::iter::FromIterator; @@ -1003,39 +996,6 @@ mod tests { } #[test] - fn test_key_after_prefix() { - assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1); - assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0"); - assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭"); - assert_eq!( - key_after_prefix("").unwrap().as_str(), - String::from(char::from_u32(0x10FFFE).unwrap()) - ); - - // When the last character is the biggest UTF8 char - let a = String::from_iter(['a', char::MAX].iter()); - assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b"); - - // When all characters are the biggest UTF8 char - let b = String::from_iter([char::MAX; 3].iter()); - assert!(key_after_prefix(b.as_str()).is_none()); - - // Check utf8 surrogates - let c = String::from('\u{D7FF}'); - assert_eq!( - key_after_prefix(c.as_str()).unwrap().as_str(), - String::from('\u{E000}') - ); - - // Check the character before the biggest one - let d = String::from('\u{10FFFE}'); - assert_eq!( - key_after_prefix(d.as_str()).unwrap().as_str(), - String::from(char::MAX) - ); - } - - #[test] fn test_common_prefixes() { let mut query = query(); let objs = objs(); diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs new file mode 100644 index 00000000..7b56d4d8 --- /dev/null +++ b/src/api/s3/mod.rs @@ -0,0 +1,15 @@ +pub mod api_server; +pub mod error; + +mod bucket; +mod copy; +pub mod cors; +mod delete; +pub mod get; +mod list; +mod post_object; +mod put; +mod website; + +mod router; +pub mod xml; diff --git a/src/api/s3_post_object.rs b/src/api/s3/post_object.rs index 585e0304..d063faa4 100644 --- a/src/api/s3_post_object.rs +++ b/src/api/s3/post_object.rs @@ -14,16 +14,15 @@ use serde::Deserialize; use garage_model::garage::Garage; -use crate::api_server::resolve_bucket; -use crate::error::*; -use crate::s3_put::{get_headers, save_stream}; -use crate::s3_xml; +use crate::s3::error::*; +use crate::s3::put::{get_headers, save_stream}; +use crate::s3::xml as s3_xml; use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc<Garage>, req: Request<Body>, - bucket: String, + bucket_name: String, ) -> Result<Response<Body>, Error> { let boundary = req .headers() @@ -48,9 +47,7 @@ pub async fn handle_post_object( let field = if let Some(field) = multipart.next_field().await? { field } else { - return Err(Error::BadRequest( - "Request did not contain a file".to_owned(), - )); + return Err(Error::bad_request("Request did not contain a file")); }; let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) { name @@ -66,14 +63,14 @@ pub async fn handle_post_object( "tag" => (/* tag need to be reencoded, but we don't support them yet anyway */), "acl" => { if params.insert("x-amz-acl", content).is_some() { - return Err(Error::BadRequest( - "Field 'acl' provided more than one time".to_string(), + return Err(Error::bad_request( + "Field 'acl' provided more than one time", )); } } _ => { if params.insert(&name, content).is_some() { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Field '{}' provided more than one time", name ))); @@ -90,9 +87,7 @@ pub async fn handle_post_object( .to_str()?; let credential = params .get("x-amz-credential") - .ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })? + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))? .to_str()?; let policy = params .get("policy") @@ -119,17 +114,31 @@ pub async fn handle_post_object( }; let date = parse_date(date)?; - let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?; + let api_key = verify_v4( + &garage, + "s3", + credential, + &date, + signature, + policy.as_bytes(), + ) + .await?; - let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?; + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket_name, &api_key) + .await?; if !api_key.allow_write(&bucket_id) { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); + return Err(Error::forbidden("Operation is not allowed for this key.")); } - let decoded_policy = base64::decode(&policy)?; + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; @@ -137,9 +146,7 @@ pub async fn handle_post_object( .ok_or_bad_request("Invalid expiration date")? .into(); if Utc::now() - expiration > Duration::zero() { - return Err(Error::BadRequest( - "Expiration date is in the paste".to_string(), - )); + return Err(Error::bad_request("Expiration date is in the paste")); } let mut conditions = decoded_policy.into_conditions()?; @@ -151,7 +158,7 @@ pub async fn handle_post_object( "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields "content-type" => { let conds = conditions.params.remove("content-type").ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -161,7 +168,7 @@ pub async fn handle_post_object( } }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -170,7 +177,7 @@ pub async fn handle_post_object( } "key" => { let conds = conditions.params.remove("key").ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -178,7 +185,7 @@ pub async fn handle_post_object( Operation::StartsWith(s) => key.starts_with(&s), }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -193,7 +200,7 @@ pub async fn handle_post_object( continue; } let conds = conditions.params.remove(¶m_key).ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -201,7 +208,7 @@ pub async fn handle_post_object( Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()), }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -212,7 +219,7 @@ pub async fn handle_post_object( } if let Some((param_key, _)) = conditions.params.iter().next() { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' is required in policy, but no value was provided", param_key ))); @@ -225,7 +232,7 @@ pub async fn handle_post_object( garage, headers, StreamLimiter::new(stream, conditions.content_length), - bucket_id, + &bucket, &key, None, None, @@ -242,7 +249,7 @@ pub async fn handle_post_object( { target .query_pairs_mut() - .append_pair("bucket", &bucket) + .append_pair("bucket", &bucket_name) .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); @@ -287,7 +294,7 @@ pub async fn handle_post_object( let xml = s3_xml::PostObject { xmlns: (), location: s3_xml::Value(location), - bucket: s3_xml::Value(bucket), + bucket: s3_xml::Value(bucket_name), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; @@ -318,7 +325,7 @@ impl Policy { match condition { PolicyCondition::Equal(map) => { if map.len() != 1 { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } let (mut k, v) = map.into_iter().next().expect("size was verified"); k.make_ascii_lowercase(); @@ -326,7 +333,7 @@ impl Policy { } PolicyCondition::OtherOp([cond, mut key, value]) => { if key.remove(0) != '$' { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } key.make_ascii_lowercase(); match cond.as_str() { @@ -339,7 +346,7 @@ impl Policy { .or_default() .push(Operation::StartsWith(value)); } - _ => return Err(Error::BadRequest("Invalid policy item".to_owned())), + _ => return Err(Error::bad_request("Invalid policy item")), } } PolicyCondition::SizeRange(key, min, max) => { @@ -347,7 +354,7 @@ impl Policy { length.0 = length.0.max(min); length.1 = length.1.min(max); } else { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } } } @@ -412,15 +419,15 @@ where self.read += bytes.len() as u64; // optimization to fail early when we know before the end it's too long if self.length.end() < &self.read { - return Poll::Ready(Some(Err(Error::BadRequest( - "File size does not match policy".to_owned(), + return Poll::Ready(Some(Err(Error::bad_request( + "File size does not match policy", )))); } } Poll::Ready(None) => { if !self.length.contains(&self.read) { - return Poll::Ready(Some(Err(Error::BadRequest( - "File size does not match policy".to_owned(), + return Poll::Ready(Some(Err(Error::bad_request( + "File size does not match policy", )))); } } diff --git a/src/api/s3_put.rs b/src/api/s3/put.rs index ed0bf00b..97b8e4e3 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use futures::prelude::*; @@ -8,25 +8,34 @@ use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; + +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_table::*; +use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::block_ref_table::*; +use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::index_counter::CountedItem; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; -use crate::error::*; -use crate::s3_xml; +use crate::s3::error::*; +use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, - bucket_id: Uuid, + bucket: &Bucket, key: &str, content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { @@ -46,7 +55,7 @@ pub async fn handle_put( garage, headers, body, - bucket_id, + bucket, key, content_md5, content_sha256, @@ -59,7 +68,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: Arc<Garage>, headers: ObjectVersionHeaders, body: S, - bucket_id: Uuid, + bucket: &Bucket, key: &str, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, @@ -80,6 +89,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let data_md5sum_hex = hex::encode(data_md5sum); let data_sha256sum = sha256sum(&first_block[..]); + let size = first_block.len() as u64; ensure_checksum_matches( data_md5sum.as_slice(), @@ -88,20 +98,22 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; + check_quotas(&garage, bucket, key, size).await?; + let object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { headers, - size: first_block.len() as u64, + size, etag: data_md5sum_hex.clone(), }, - first_block, + first_block.to_vec(), )), }; - let object = Object::new(bucket_id, key.into(), vec![object_version]); + let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -114,36 +126,42 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; - let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); + let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table // Write this entry now, even with empty block list, // to prevent block_ref entries from being deleted (they can be deleted // if the reference a version that isn't found in the version table) - let version = Version::new(version_uuid, bucket_id, key.into(), false); + let version = Version::new(version_uuid, bucket.id, key.into(), false); garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = blake2sum(&first_block[..]); - let tx_result = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await - .and_then(|(total_size, data_md5sum, data_sha256sum)| { + let first_block_hash = async_blake2sum(first_block.clone()).await; + + let tx_result = (|| async { + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; + ensure_checksum_matches( data_md5sum.as_slice(), data_sha256sum, content_md5.as_deref(), content_sha256, - ) - .map(|()| (total_size, data_md5sum)) - }); + )?; + + check_quotas(&garage, bucket, key, total_size).await?; + + Ok((total_size, data_md5sum)) + })() + .await; // If something went wrong, clean up let (total_size, md5sum_arr) = match tx_result { @@ -151,7 +169,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( Err(e) => { // Mark object as aborted, this will free the blocks further down object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); + let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; return Err(e); } @@ -167,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket_id, key.into(), vec![object_version]); + let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; Ok((version_uuid, md5sum_hex)) @@ -183,8 +201,8 @@ fn ensure_checksum_matches( ) -> Result<(), Error> { if let Some(expected_sha256) = content_sha256 { if expected_sha256 != data_sha256sum { - return Err(Error::BadRequest( - "Unable to validate x-amz-content-sha256".to_string(), + return Err(Error::bad_request( + "Unable to validate x-amz-content-sha256", )); } else { trace!("Successfully validated x-amz-content-sha256"); @@ -192,9 +210,7 @@ fn ensure_checksum_matches( } if let Some(expected_md5) = content_md5 { if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { - return Err(Error::BadRequest( - "Unable to validate content-md5".to_string(), - )); + return Err(Error::bad_request("Unable to validate content-md5")); } else { trace!("Successfully validated content-md5"); } @@ -202,18 +218,85 @@ fn ensure_checksum_matches( Ok(()) } +/// Check that inserting this object with this size doesn't exceed bucket quotas +async fn check_quotas( + garage: &Arc<Garage>, + bucket: &Bucket, + key: &str, + size: u64, +) -> Result<(), Error> { + let quotas = bucket.state.as_option().unwrap().quotas.get(); + if quotas.max_objects.is_none() && quotas.max_size.is_none() { + return Ok(()); + }; + + let key = key.to_string(); + let (prev_object, counters) = futures::try_join!( + garage.object_table.get(&bucket.id, &key), + garage.object_counter_table.table.get(&bucket.id, &EmptyKey), + )?; + + let counters = counters + .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .unwrap_or_default(); + + let (prev_cnt_obj, prev_cnt_size) = match prev_object { + Some(o) => { + let prev_cnt = o.counts().into_iter().collect::<HashMap<_, _>>(); + ( + prev_cnt.get(OBJECTS).cloned().unwrap_or_default(), + prev_cnt.get(BYTES).cloned().unwrap_or_default(), + ) + } + None => (0, 0), + }; + let cnt_obj_diff = 1 - prev_cnt_obj; + let cnt_size_diff = size as i64 - prev_cnt_size; + + if let Some(mo) = quotas.max_objects { + let current_objects = counters.get(OBJECTS).cloned().unwrap_or_default(); + if cnt_obj_diff > 0 && current_objects + cnt_obj_diff > mo as i64 { + return Err(Error::forbidden(format!( + "Object quota is reached, maximum objects for this bucket: {}", + mo + ))); + } + } + + if let Some(ms) = quotas.max_size { + let current_size = counters.get(BYTES).cloned().unwrap_or_default(); + if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 { + return Err(Error::forbidden(format!( + "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", + ms, current_size, size + ))); + } + } + + Ok(()) +} + async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, - first_block: Vec<u8>, + first_block: Bytes, first_block_hash: Hash, chunker: &mut StreamChunker<S>, ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); + let tracer = opentelemetry::global::tracer("garage"); + + let md5hasher = AsyncHasher::<Md5>::new(); + let sha256hasher = AsyncHasher::<Sha256>::new(); + + futures::future::join( + md5hasher.update(first_block.clone()), + sha256hasher.update(first_block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash first block (md5, sha256)"), + )) + .await; let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( @@ -235,9 +318,15 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); + let (_, _, block_hash) = futures::future::join3( + md5hasher.update(block.clone()), + sha256hasher.update(block.clone()), + async_blake2sum(block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash block (md5, sha256, blake2)"), + )) + .await; let block_len = block.len(); put_curr_version_block = put_block_meta( garage, @@ -255,9 +344,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( } let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); + let data_md5sum = md5hasher.finalize().await; - let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); Ok((total_size, data_md5sum, data_sha256sum)) @@ -297,7 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque<u8>, + buf: BytesBuf, } impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { @@ -306,11 +395,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(2 * block_size), + buf: BytesBuf::new(), } } - async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { + async fn next(&mut self) -> Result<Option<Bytes>, Error> { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; @@ -323,12 +412,8 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { if self.buf.is_empty() { Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::<Vec<u8>>(); - Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); - Ok(Some(block)) + Ok(Some(self.buf.take_max(self.block_size))) } } } @@ -428,7 +513,7 @@ pub async fn handle_put_part( // Check part hasn't already been uploaded if let Some(v) = version { if v.has_part_number(part_number) { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Part number {} has already been uploaded", part_number ))); @@ -437,7 +522,9 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block_hash = blake2sum(&first_block[..]); + + let first_block_hash = async_blake2sum(first_block.clone()).await; + let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, &version, @@ -475,7 +562,7 @@ pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, req: Request<Body>, bucket_name: &str, - bucket_id: Uuid, + bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, @@ -499,7 +586,7 @@ pub async fn handle_complete_multipart_upload( // Get object and version let key = key.to_string(); let (object, version) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), + garage.object_table.get(&bucket.id, &key), garage.version_table.get(&version_uuid, &EmptyKey), )?; @@ -513,7 +600,7 @@ pub async fn handle_complete_multipart_upload( let version = version.ok_or(Error::NoSuchKey)?; if version.blocks.is_empty() { - return Err(Error::BadRequest("No data was uploaded".to_string())); + return Err(Error::bad_request("No data was uploaded")); } let headers = match object_version.state { @@ -574,8 +661,8 @@ pub async fn handle_complete_multipart_upload( .map(|x| x.part_number) .eq(block_parts.into_iter()); if !same_parts { - return Err(Error::BadRequest( - "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(), + return Err(Error::bad_request( + "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again." )); } @@ -592,6 +679,14 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); + if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + return Err(e); + } + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { @@ -602,7 +697,7 @@ pub async fn handle_complete_multipart_upload( version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket_id, key.clone(), vec![object_version]); + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done diff --git a/src/api/s3_router.rs b/src/api/s3/router.rs index 95a7eceb..44f581ff 100644 --- a/src/api/s3_router.rs +++ b/src/api/s3/router.rs @@ -1,131 +1,13 @@ -use crate::error::{Error, OkOrBadRequest}; - use std::borrow::Cow; use hyper::header::HeaderValue; use hyper::{HeaderMap, Method, Request}; -/// This macro is used to generate very repetitive match {} blocks in this module -/// It is _not_ made to be used anywhere else -macro_rules! s3_match { - (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ - // usage: s3_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } - // returns true if the variant was one of the listed variants, false otherwise. - use Endpoint::*; - match $enum { - $( - $endpoint { .. } => true, - )* - _ => false - } - }}; - (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{ - // usage: s3_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] } - // returns Some(field_value), or None if the variant was not one of the listed variants. - use Endpoint::*; - match $enum { - $( - $endpoint {$param, ..} => Some($param), - )* - _ => None - } - }}; - (@gen_parser ($keyword:expr, $key:expr, $query:expr, $header:expr), - key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], - no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ - // usage: s3_match {@gen_parser (keyword, key, query, header), - // key: [ - // SOME_KEYWORD => VariantWithKey, - // ... - // ], - // no_key: [ - // SOME_KEYWORD => VariantWithoutKey, - // ... - // ] - // } - // See in from_{method} for more detailed usage. - use Endpoint::*; - use keywords::*; - match ($keyword, !$key.is_empty()){ - $( - ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k { - key: $key, - $($( - $param_k: s3_match!(@@parse_param $query, $conv_k, $param_k), - )*)? - }), - )* - $( - ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk { - $($( - $param_nk: s3_match!(@@parse_param $query, $conv_nk, $param_nk), - )*)? - }), - )* - (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw))) - } - }}; +use crate::helpers::Authorization; +use crate::router_macros::{generateQueryParameters, router_match}; +use crate::s3::error::*; - (@@parse_param $query:expr, query_opt, $param:ident) => {{ - // extract optional query parameter - $query.$param.take().map(|param| param.into_owned()) - }}; - (@@parse_param $query:expr, query, $param:ident) => {{ - // extract mendatory query parameter - $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned() - }}; - (@@parse_param $query:expr, opt_parse, $param:ident) => {{ - // extract and parse optional query parameter - // missing parameter is file, however parse error is reported as an error - $query.$param - .take() - .map(|param| param.parse()) - .transpose() - .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? - }}; - (@@parse_param $query:expr, parse, $param:ident) => {{ - // extract and parse mandatory query parameter - // both missing and un-parseable parameters are reported as errors - $query.$param.take().ok_or_bad_request("Missing argument for endpoint")? - .parse() - .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? - }}; - (@func - $(#[$doc:meta])* - pub enum Endpoint { - $( - $(#[$outer:meta])* - $variant:ident $({ - $($name:ident: $ty:ty,)* - })?, - )* - }) => { - $(#[$doc])* - pub enum Endpoint { - $( - $(#[$outer])* - $variant $({ - $($name: $ty, )* - })?, - )* - } - impl Endpoint { - pub fn name(&self) -> &'static str { - match self { - $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)* - } - } - } - }; - (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => { - $($then)* - }; - (@if () then ($($then:tt)*) else ($($else:tt)*)) => { - $($else)* - }; -} - -s3_match! {@func +router_match! {@func /// List of all S3 API endpoints. /// @@ -460,7 +342,7 @@ impl Endpoint { Method::POST => Self::from_post(key, &mut query)?, Method::PUT => Self::from_put(key, &mut query, req.headers())?, Method::DELETE => Self::from_delete(key, &mut query)?, - _ => return Err(Error::BadRequest("Unknown method".to_owned())), + _ => return Err(Error::bad_request("Unknown method")), }; if let Some(message) = query.nonempty_message() { @@ -471,7 +353,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a GET. fn from_get(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -528,7 +410,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a HEAD. fn from_head(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -542,7 +424,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a POST. fn from_post(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -564,7 +446,7 @@ impl Endpoint { query: &mut QueryParameters<'_>, headers: &HeaderMap<HeaderValue>, ) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, headers), key: [ @@ -606,7 +488,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a DELETE. fn from_delete(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -636,7 +518,7 @@ impl Endpoint { /// Get the key the request target. Returns None for requests which don't use a key. #[allow(dead_code)] pub fn get_key(&self) -> Option<&str> { - s3_match! { + router_match! { @extract self, key, @@ -673,7 +555,7 @@ impl Endpoint { if let Endpoint::ListBuckets = self { return Authorization::None; }; - let readonly = s3_match! { + let readonly = router_match! { @match self, [ @@ -717,7 +599,7 @@ impl Endpoint { SelectObjectContent, ] }; - let owner = s3_match! { + let owner = router_match! { @match self, [ @@ -740,87 +622,6 @@ impl Endpoint { } } -/// What kind of authorization is required to perform a given action -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Authorization { - /// No authorization is required - None, - /// Having Read permission on bucket - Read, - /// Having Write permission on bucket - Write, - /// Having Owner permission on bucket - Owner, -} - -/// This macro is used to generate part of the code in this module. It must be called only one, and -/// is useless outside of this module. -macro_rules! generateQueryParameters { - ( $($rest:expr => $name:ident),* ) => { - /// Struct containing all query parameters used in endpoints. Think of it as an HashMap, - /// but with keys statically known. - #[derive(Debug, Default)] - struct QueryParameters<'a> { - keyword: Option<Cow<'a, str>>, - $( - $name: Option<Cow<'a, str>>, - )* - } - - impl<'a> QueryParameters<'a> { - /// Build this struct from the query part of an URI. - fn from_query(query: &'a str) -> Result<Self, Error> { - let mut res: Self = Default::default(); - for (k, v) in url::form_urlencoded::parse(query.as_bytes()) { - let repeated = match k.as_ref() { - $( - $rest => if !v.is_empty() { - res.$name.replace(v).is_some() - } else { - false - }, - )* - _ => { - if k.starts_with("response-") || k.starts_with("X-Amz-") { - false - } else if v.as_ref().is_empty() { - if res.keyword.replace(k).is_some() { - return Err(Error::BadRequest("Multiple keywords".to_owned())); - } - continue; - } else { - debug!("Received an unknown query parameter: '{}'", k); - false - } - } - }; - if repeated { - return Err(Error::BadRequest(format!( - "Query parameter repeated: '{}'", - k - ))); - } - } - Ok(res) - } - - /// Get an error message in case not all parameters where used when extracting them to - /// build an Enpoint variant - fn nonempty_message(&self) -> Option<&str> { - if self.keyword.is_some() { - Some("Keyword not used") - } $( - else if self.$name.is_some() { - Some(concat!("'", $rest, "'")) - } - )* else { - None - } - } - } - } -} - // parameter name => struct field generateQueryParameters! { "continuation-token" => continuation_token, diff --git a/src/api/s3_website.rs b/src/api/s3/website.rs index b464dd45..77738971 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3/website.rs @@ -4,13 +4,12 @@ use std::sync::Arc; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::error::*; -use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; +use crate::s3::error::*; +use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::*; use garage_model::garage::Garage; -use garage_table::*; use garage_util::data::*; pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> { @@ -47,14 +46,11 @@ pub async fn handle_delete_website( bucket_id: Uuid, ) -> Result<Response<Body>, Error> { let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); param.website_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -77,14 +73,11 @@ pub async fn handle_put_website( } let mut bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; + let param = bucket.params_mut().unwrap(); let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; @@ -176,8 +169,8 @@ impl WebsiteConfiguration { || self.index_document.is_some() || self.routing_rules.is_some()) { - return Err(Error::BadRequest( - "Bad XML: can't have RedirectAllRequestsTo and other fields".to_owned(), + return Err(Error::bad_request( + "Bad XML: can't have RedirectAllRequestsTo and other fields", )); } if let Some(ref ed) = self.error_document { @@ -222,8 +215,8 @@ impl WebsiteConfiguration { impl Key { pub fn validate(&self) -> Result<(), Error> { if self.key.0.is_empty() { - Err(Error::BadRequest( - "Bad XML: error document specified but empty".to_owned(), + Err(Error::bad_request( + "Bad XML: error document specified but empty", )) } else { Ok(()) @@ -234,8 +227,8 @@ impl Key { impl Suffix { pub fn validate(&self) -> Result<(), Error> { if self.suffix.0.is_empty() | self.suffix.0.contains('/') { - Err(Error::BadRequest( - "Bad XML: index document is empty or contains /".to_owned(), + Err(Error::bad_request( + "Bad XML: index document is empty or contains /", )) } else { Ok(()) @@ -247,7 +240,7 @@ impl Target { pub fn validate(&self) -> Result<(), Error> { if let Some(ref protocol) = self.protocol { if protocol.0 != "http" && protocol.0 != "https" { - return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned())); + return Err(Error::bad_request("Bad XML: invalid protocol")); } } Ok(()) @@ -269,19 +262,19 @@ impl Redirect { pub fn validate(&self, has_prefix: bool) -> Result<(), Error> { if self.replace_prefix.is_some() { if self.replace_full.is_some() { - return Err(Error::BadRequest( - "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set".to_owned(), + return Err(Error::bad_request( + "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set", )); } if !has_prefix { - return Err(Error::BadRequest( - "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't".to_owned(), + return Err(Error::bad_request( + "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't", )); } } if let Some(ref protocol) = self.protocol { if protocol.0 != "http" && protocol.0 != "https" { - return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned())); + return Err(Error::bad_request("Bad XML: invalid protocol")); } } // TODO there are probably more invalide cases, but which ones? diff --git a/src/api/s3_xml.rs b/src/api/s3/xml.rs index 75ec4559..06f11288 100644 --- a/src/api/s3_xml.rs +++ b/src/api/s3/xml.rs @@ -1,7 +1,7 @@ use quick_xml::se::to_string; use serde::{Deserialize, Serialize, Serializer}; -use crate::Error as ApiError; +use crate::s3::error::Error as ApiError; pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> { let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string(); @@ -25,7 +25,7 @@ impl From<&str> for Value { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct IntValue(#[serde(rename = "$value")] pub i64); -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct Bucket { #[serde(rename = "CreationDate")] pub creation_date: Value, @@ -33,7 +33,7 @@ pub struct Bucket { pub name: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct Owner { #[serde(rename = "DisplayName")] pub display_name: Value, @@ -41,13 +41,13 @@ pub struct Owner { pub id: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct BucketList { #[serde(rename = "Bucket")] pub entries: Vec<Bucket>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct ListAllMyBucketsResult { #[serde(rename = "Buckets")] pub buckets: BucketList, @@ -55,7 +55,7 @@ pub struct ListAllMyBucketsResult { pub owner: Owner, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct LocationConstraint { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -63,7 +63,7 @@ pub struct LocationConstraint { pub region: String, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct Deleted { #[serde(rename = "Key")] pub key: Value, @@ -73,7 +73,7 @@ pub struct Deleted { pub delete_marker_version_id: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct Error { #[serde(rename = "Code")] pub code: Value, @@ -85,7 +85,7 @@ pub struct Error { pub region: Option<Value>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct DeleteError { #[serde(rename = "Code")] pub code: Value, @@ -97,7 +97,7 @@ pub struct DeleteError { pub version_id: Option<Value>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct DeleteResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -107,7 +107,7 @@ pub struct DeleteResult { pub errors: Vec<DeleteError>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct InitiateMultipartUploadResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -119,7 +119,7 @@ pub struct InitiateMultipartUploadResult { pub upload_id: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct CompleteMultipartUploadResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -133,7 +133,7 @@ pub struct CompleteMultipartUploadResult { pub etag: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct Initiator { #[serde(rename = "DisplayName")] pub display_name: Value, @@ -141,7 +141,7 @@ pub struct Initiator { pub id: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct ListMultipartItem { #[serde(rename = "Initiated")] pub initiated: Value, @@ -157,7 +157,7 @@ pub struct ListMultipartItem { pub storage_class: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct ListMultipartUploadsResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -187,7 +187,7 @@ pub struct ListMultipartUploadsResult { pub encoding_type: Option<Value>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct PartItem { #[serde(rename = "ETag")] pub etag: Value, @@ -199,7 +199,7 @@ pub struct PartItem { pub size: IntValue, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct ListPartsResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -227,7 +227,7 @@ pub struct ListPartsResult { pub storage_class: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct ListBucketItem { #[serde(rename = "Key")] pub key: Value, @@ -241,13 +241,13 @@ pub struct ListBucketItem { pub storage_class: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct CommonPrefix { #[serde(rename = "Prefix")] pub prefix: Value, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct ListBucketResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -281,7 +281,7 @@ pub struct ListBucketResult { pub common_prefixes: Vec<CommonPrefix>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct VersioningConfiguration { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), @@ -289,7 +289,7 @@ pub struct VersioningConfiguration { pub status: Option<Value>, } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct PostObject { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs new file mode 100644 index 00000000..f5a067bd --- /dev/null +++ b/src/api/signature/error.rs @@ -0,0 +1,36 @@ +use err_derive::Error; + +use crate::common_error::CommonError; +pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; + +/// Errors of this crate +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "{}", _0)] + /// Error from common error + Common(CommonError), + + /// Authorization Header Malformed + #[error(display = "Authorization header malformed, expected scope: {}", _0)] + AuthorizationHeaderMalformed(String), + + // Category: bad request + /// The request contained an invalid UTF-8 sequence in its path or in other parameters + #[error(display = "Invalid UTF-8: {}", _0)] + InvalidUtf8Str(#[error(source)] std::str::Utf8Error), + + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), +} + +impl<T> From<T> for Error +where + CommonError: From<T>, +{ + fn from(err: T) -> Self { + Error::Common(CommonError::from(err)) + } +} + +impl CommonErrorDerivative for Error {} diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index ebdee6da..4b8b990f 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -1,14 +1,15 @@ use chrono::{DateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::{Hmac, Mac}; use sha2::Sha256; use garage_util::data::{sha256sum, Hash}; -use crate::error::*; - +pub mod error; pub mod payload; pub mod streaming; +use error::*; + pub const SHORT_DATE: &str = "%Y%m%d"; pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; @@ -16,7 +17,7 @@ type HmacSha256 = Hmac<Sha256>; pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest( + return Err(Error::bad_request( "Request content hash does not match signed hash".to_string(), )); } @@ -28,20 +29,25 @@ pub fn signing_hmac( secret_key: &str, region: &str, service: &str, -) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> { +) -> Result<HmacSha256, crypto_common::InvalidLength> { let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?; date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?; region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?; service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?; signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?; Ok(hmac) } -pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String { - format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,) +pub fn compute_scope(datetime: &DateTime<Utc>, region: &str, service: &str) -> String { + format!( + "{}/{}/{}/aws4_request", + datetime.format(SHORT_DATE), + region, + service + ) } diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 2a41b307..4c7934e5 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -11,14 +11,15 @@ use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; -use super::signing_hmac; -use super::{LONG_DATETIME, SHORT_DATE}; +use super::LONG_DATETIME; +use super::{compute_scope, signing_hmac}; use crate::encoding::uri_encode; -use crate::error::*; +use crate::signature::error::*; pub async fn check_payload_signature( garage: &Garage, + service: &str, request: &Request<Body>, ) -> Result<(Option<Key>, Option<Hash>), Error> { let mut headers = HashMap::new(); @@ -64,6 +65,7 @@ pub async fn check_payload_signature( let key = verify_v4( garage, + service, &authorization.credential, &authorization.date, &authorization.signature, @@ -103,7 +105,7 @@ fn parse_authorization( let (auth_kind, rest) = authorization.split_at(first_space); if auth_kind != "AWS4-HMAC-SHA256" { - return Err(Error::BadRequest("Unsupported authorization method".into())); + return Err(Error::bad_request("Unsupported authorization method")); } let mut auth_params = HashMap::new(); @@ -127,10 +129,11 @@ fn parse_authorization( let date = headers .get("x-amz-date") .ok_or_bad_request("Missing X-Amz-Date field") + .map_err(Error::from) .and_then(|d| parse_date(d))?; if Utc::now() - date > Duration::hours(24) { - return Err(Error::BadRequest("Date is too old".to_string())); + return Err(Error::bad_request("Date is too old".to_string())); } let auth = Authorization { @@ -154,7 +157,7 @@ fn parse_query_authorization( headers: &HashMap<String, String>, ) -> Result<Authorization, Error> { if algorithm != "AWS4-HMAC-SHA256" { - return Err(Error::BadRequest( + return Err(Error::bad_request( "Unsupported authorization method".to_string(), )); } @@ -177,10 +180,10 @@ fn parse_query_authorization( .get("x-amz-expires") .ok_or_bad_request("X-Amz-Expires not found in query parameters")? .parse() - .map_err(|_| Error::BadRequest("X-Amz-Expires is not a number".to_string()))?; + .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?; if duration > 7 * 24 * 3600 { - return Err(Error::BadRequest( + return Err(Error::bad_request( "X-Amz-Exprires may not exceed a week".to_string(), )); } @@ -188,10 +191,11 @@ fn parse_query_authorization( let date = headers .get("x-amz-date") .ok_or_bad_request("Missing X-Amz-Date field") + .map_err(Error::from) .and_then(|d| parse_date(d))?; if Utc::now() - date > Duration::seconds(duration) { - return Err(Error::BadRequest("Date is too old".to_string())); + return Err(Error::bad_request("Date is too old".to_string())); } Ok(Authorization { @@ -281,6 +285,7 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> { pub async fn verify_v4( garage: &Garage, + service: &str, credential: &str, date: &DateTime<Utc>, signature: &str, @@ -288,11 +293,7 @@ pub async fn verify_v4( ) -> Result<Key, Error> { let (key_id, scope) = parse_credential(credential)?; - let scope_expected = format!( - "{}/{}/s3/aws4_request", - date.format(SHORT_DATE), - garage.config.s3_api.s3_region - ); + let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service); if scope != scope_expected { return Err(Error::AuthorizationHeaderMalformed(scope.to_string())); } @@ -302,20 +303,20 @@ pub async fn verify_v4( .get(&EmptyKey, &key_id) .await? .filter(|k| !k.state.is_deleted()) - .ok_or_else(|| Error::Forbidden(format!("No such key: {}", &key_id)))?; + .ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?; let key_p = key.params().unwrap(); let mut hmac = signing_hmac( date, &key_p.secret_key, &garage.config.s3_api.s3_region, - "s3", + service, ) .ok_or_internal_error("Unable to build signing HMAC")?; hmac.update(payload); let our_signature = hex::encode(hmac.finalize().into_bytes()); if signature != our_signature { - return Err(Error::Forbidden("Invalid signature".to_string())); + return Err(Error::forbidden("Invalid signature".to_string())); } Ok(key) diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 969a45d6..c8358c4f 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,18 +1,67 @@ use std::pin::Pin; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use futures::prelude::*; use futures::task; +use garage_model::key_table::Key; +use hmac::Mac; use hyper::body::Bytes; +use hyper::{Body, Request}; use garage_util::data::Hash; -use hmac::Mac; - -use super::sha256sum; -use super::HmacSha256; -use super::LONG_DATETIME; -use crate::error::*; +use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; + +use crate::signature::error::*; + +pub fn parse_streaming_body( + api_key: &Key, + req: Request<Body>, + content_sha256: &mut Option<Hash>, + region: &str, + service: &str, +) -> Result<Request<Body>, Error> { + match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let signature = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = req + .headers() + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) + .ok_or_bad_request("Invalid date")?; + let date: DateTime<Utc> = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, region, service); + let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) + .ok_or_internal_error("Unable to build signing HMAC")?; + + Ok(req.map(move |body| { + Body::wrap_stream( + SignedPayloadStream::new( + body.map_err(Error::from), + signing_hmac, + date, + &scope, + signature, + ) + .map_err(Error::from), + ) + })) + } + _ => Ok(req), + } +} /// Result of `sha256("")` const EMPTY_STRING_HEX_DIGEST: &str = @@ -38,7 +87,7 @@ fn compute_streaming_payload_signature( let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); - Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") + Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?) } mod payload { @@ -114,10 +163,10 @@ impl From<SignedPayloadStreamError> for Error { match err { SignedPayloadStreamError::Stream(e) => e, SignedPayloadStreamError::InvalidSignature => { - Error::BadRequest("Invalid payload signature".into()) + Error::bad_request("Invalid payload signature") } SignedPayloadStreamError::Message(e) => { - Error::BadRequest(format!("Chunk format error: {}", e)) + Error::bad_request(format!("Chunk format error: {}", e)) } } } @@ -295,7 +344,7 @@ mod tests { .with_timezone(&Utc); let secret_key = "test"; let region = "test"; - let scope = crate::signature::compute_scope(&datetime, region); + let scope = crate::signature::compute_scope(&datetime, region, "s3"); let signing_hmac = crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); |