aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/api
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml27
-rw-r--r--src/api/admin/api_server.rs209
-rw-r--r--src/api/admin/bucket.rs580
-rw-r--r--src/api/admin/cluster.rs193
-rw-r--r--src/api/admin/error.rs97
-rw-r--r--src/api/admin/key.rs256
-rw-r--r--src/api/admin/mod.rs7
-rw-r--r--src/api/admin/router.rs145
-rw-r--r--src/api/api_server.rs645
-rw-r--r--src/api/common_error.rs177
-rw-r--r--src/api/generic_server.rs211
-rw-r--r--src/api/helpers.rs191
-rw-r--r--src/api/k2v/api_server.rs190
-rw-r--r--src/api/k2v/batch.rs363
-rw-r--r--src/api/k2v/error.rs135
-rw-r--r--src/api/k2v/index.rs100
-rw-r--r--src/api/k2v/item.rs230
-rw-r--r--src/api/k2v/mod.rs9
-rw-r--r--src/api/k2v/range.rs100
-rw-r--r--src/api/k2v/router.rs252
-rw-r--r--src/api/lib.rs26
-rw-r--r--src/api/router_macros.rs213
-rw-r--r--src/api/s3/api_server.rs390
-rw-r--r--src/api/s3/bucket.rs (renamed from src/api/s3_bucket.rs)21
-rw-r--r--src/api/s3/copy.rs (renamed from src/api/s3_copy.rs)66
-rw-r--r--src/api/s3/cors.rs (renamed from src/api/s3_cors.rs)36
-rw-r--r--src/api/s3/delete.rs (renamed from src/api/s3_delete.rs)21
-rw-r--r--src/api/s3/error.rs (renamed from src/api/error.rs)243
-rw-r--r--src/api/s3/get.rs (renamed from src/api/s3_get.rs)180
-rw-r--r--src/api/s3/list.rs (renamed from src/api/s3_list.rs)106
-rw-r--r--src/api/s3/mod.rs15
-rw-r--r--src/api/s3/post_object.rs (renamed from src/api/s3_post_object.rs)89
-rw-r--r--src/api/s3/put.rs (renamed from src/api/s3_put.rs)217
-rw-r--r--src/api/s3/router.rs (renamed from src/api/s3_router.rs)225
-rw-r--r--src/api/s3/website.rs (renamed from src/api/s3_website.rs)51
-rw-r--r--src/api/s3/xml.rs (renamed from src/api/s3_xml.rs)44
-rw-r--r--src/api/signature/error.rs36
-rw-r--r--src/api/signature/mod.rs30
-rw-r--r--src/api/signature/payload.rs35
-rw-r--r--src/api/signature/streaming.rs71
40 files changed, 4787 insertions, 1445 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 5e96b081..7c3ed43b 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_api"
-version = "0.7.0"
+version = "0.8.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,28 +14,31 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_model = { version = "0.7.0", path = "../model" }
-garage_table = { version = "0.7.0", path = "../table" }
-garage_block = { version = "0.7.0", path = "../block" }
-garage_util = { version = "0.7.0", path = "../util" }
+garage_model = { version = "0.8.0", path = "../model" }
+garage_table = { version = "0.8.0", path = "../table" }
+garage_block = { version = "0.8.0", path = "../block" }
+garage_util = { version = "0.8.0", path = "../util" }
+garage_rpc = { version = "0.8.0", path = "../rpc" }
+async-trait = "0.1.7"
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
-crypto-mac = "0.10"
+crypto-common = "0.1"
err-derive = "0.3"
hex = "0.4"
-hmac = "0.10"
+hmac = "0.12"
idna = "0.2"
tracing = "0.1.30"
-md-5 = "0.9"
+md-5 = "0.10"
nom = "7.1"
-sha2 = "0.9"
+sha2 = "0.10"
futures = "0.3"
futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = "0.1"
form_urlencoded = "1.0.0"
http = "0.2"
@@ -52,3 +55,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"
opentelemetry = "0.17"
+opentelemetry-prometheus = { version = "0.10", optional = true }
+prometheus = { version = "0.13", optional = true }
+
+[features]
+k2v = [ "garage_util/k2v", "garage_model/k2v" ]
+metrics = [ "opentelemetry-prometheus", "prometheus" ]
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
new file mode 100644
index 00000000..0816bda1
--- /dev/null
+++ b/src/api/admin/api_server.rs
@@ -0,0 +1,209 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
+use hyper::{Body, Request, Response};
+
+use opentelemetry::trace::SpanRef;
+
+#[cfg(feature = "metrics")]
+use opentelemetry_prometheus::PrometheusExporter;
+#[cfg(feature = "metrics")]
+use prometheus::{Encoder, TextEncoder};
+
+use garage_model::garage::Garage;
+use garage_util::error::Error as GarageError;
+
+use crate::generic_server::*;
+
+use crate::admin::bucket::*;
+use crate::admin::cluster::*;
+use crate::admin::error::*;
+use crate::admin::key::*;
+use crate::admin::router::{Authorization, Endpoint};
+
+pub struct AdminApiServer {
+ garage: Arc<Garage>,
+ #[cfg(feature = "metrics")]
+ exporter: PrometheusExporter,
+ metrics_token: Option<String>,
+ admin_token: Option<String>,
+}
+
+impl AdminApiServer {
+ pub fn new(
+ garage: Arc<Garage>,
+ #[cfg(feature = "metrics")] exporter: PrometheusExporter,
+ ) -> Self {
+ let cfg = &garage.config.admin;
+ let metrics_token = cfg
+ .metrics_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ let admin_token = cfg
+ .admin_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ Self {
+ garage,
+ #[cfg(feature = "metrics")]
+ exporter,
+ metrics_token,
+ admin_token,
+ }
+ }
+
+ pub async fn run(
+ self,
+ bind_addr: SocketAddr,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let region = self.garage.config.s3_api.s3_region.clone();
+ ApiServer::new(region, self)
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ }
+
+ fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
+ Ok(Response::builder()
+ .status(204)
+ .header(ALLOW, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .body(Body::empty())?)
+ }
+
+ fn handle_metrics(&self) -> Result<Response<Body>, Error> {
+ #[cfg(feature = "metrics")]
+ {
+ use opentelemetry::trace::Tracer;
+
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ self.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(200)
+ .header(http::header::CONTENT_TYPE, encoder.format_type())
+ .body(Body::from(buffer))?)
+ }
+ #[cfg(not(feature = "metrics"))]
+ Err(Error::bad_request(
+ "Garage was built without the metrics feature".to_string(),
+ ))
+ }
+}
+
+#[async_trait]
+impl ApiHandler for AdminApiServer {
+ const API_NAME: &'static str = "admin";
+ const API_NAME_DISPLAY: &'static str = "Admin";
+
+ type Endpoint = Endpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
+ Endpoint::from_request(req)
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ let expected_auth_header =
+ match endpoint.authorization_type() {
+ Authorization::MetricsToken => self.metrics_token.as_ref(),
+ Authorization::AdminToken => match &self.admin_token {
+ None => return Err(Error::forbidden(
+ "Admin token isn't configured, admin API access is disabled for security.",
+ )),
+ Some(t) => Some(t),
+ },
+ };
+
+ if let Some(h) = expected_auth_header {
+ match req.headers().get("Authorization") {
+ None => return Err(Error::forbidden("Authorization token must be provided")),
+ Some(v) => {
+ let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false);
+ if !authorized {
+ return Err(Error::forbidden("Invalid authorization token provided"));
+ }
+ }
+ }
+ }
+
+ match endpoint {
+ Endpoint::Options => self.handle_options(&req),
+ Endpoint::Metrics => self.handle_metrics(),
+ Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
+ Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
+ // Layout
+ Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
+ Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
+ Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ // Keys
+ Endpoint::ListKeys => handle_list_keys(&self.garage).await,
+ Endpoint::GetKeyInfo { id, search } => {
+ handle_get_key_info(&self.garage, id, search).await
+ }
+ Endpoint::CreateKey => handle_create_key(&self.garage, req).await,
+ Endpoint::ImportKey => handle_import_key(&self.garage, req).await,
+ Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await,
+ Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await,
+ // Buckets
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage).await,
+ Endpoint::GetBucketInfo { id, global_alias } => {
+ handle_get_bucket_info(&self.garage, id, global_alias).await
+ }
+ Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
+ Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
+ Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await,
+ // Bucket-key permissions
+ Endpoint::BucketAllowKey => {
+ handle_bucket_change_key_perm(&self.garage, req, true).await
+ }
+ Endpoint::BucketDenyKey => {
+ handle_bucket_change_key_perm(&self.garage, req, false).await
+ }
+ // Bucket aliasing
+ Endpoint::GlobalAliasBucket { id, alias } => {
+ handle_global_alias_bucket(&self.garage, id, alias).await
+ }
+ Endpoint::GlobalUnaliasBucket { id, alias } => {
+ handle_global_unalias_bucket(&self.garage, id, alias).await
+ }
+ Endpoint::LocalAliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await,
+ Endpoint::LocalUnaliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await,
+ }
+ }
+}
+
+impl ApiEndpoint for Endpoint {
+ fn name(&self) -> &'static str {
+ Endpoint::name(self)
+ }
+
+ fn add_span_attributes(&self, _span: SpanRef<'_>) {}
+}
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
new file mode 100644
index 00000000..ac8a8a40
--- /dev/null
+++ b/src/api/admin/bucket.rs
@@ -0,0 +1,580 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::time::*;
+
+use garage_table::*;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_model::permission::*;
+use garage_model::s3::object_table::*;
+
+use crate::admin::error::*;
+use crate::admin::key::ApiBucketKeyPerm;
+use crate::common_error::CommonError;
+use crate::helpers::{json_ok_response, parse_json_body};
+
+pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let buckets = garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ let res = buckets
+ .into_iter()
+ .map(|b| {
+ let state = b.state.as_option().unwrap();
+ ListBucketResultItem {
+ id: hex::encode(b.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|((k, n), _, _)| BucketLocalAlias {
+ access_key_id: k.to_string(),
+ alias: n.to_string(),
+ })
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>();
+
+ Ok(json_ok_response(&res)?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct ListBucketResultItem {
+ id: String,
+ global_aliases: Vec<String>,
+ local_aliases: Vec<BucketLocalAlias>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct BucketLocalAlias {
+ access_key_id: String,
+ alias: String,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ApiBucketQuotas {
+ max_size: Option<u64>,
+ max_objects: Option<u64>,
+}
+
+pub async fn handle_get_bucket_info(
+ garage: &Arc<Garage>,
+ id: Option<String>,
+ global_alias: Option<String>,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = match (id, global_alias) {
+ (Some(id), None) => parse_bucket_id(&id)?,
+ (None, Some(ga)) => garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&ga)
+ .await?
+ .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
+ _ => {
+ return Err(Error::bad_request(
+ "Either id or globalAlias must be provided (but not both)",
+ ));
+ }
+ };
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+async fn bucket_info_results(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+) -> Result<Response<Body>, Error> {
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let counters = garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ if !key.state.is_deleted() {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = garage.key_table.get(&EmptyKey, k).await? {
+ if !key.state.is_deleted() {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ }
+
+ let state = bucket.state.as_option().unwrap();
+
+ let quotas = state.quotas.get();
+ let res =
+ GetBucketInfoResult {
+ id: hex::encode(&bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ website_access: state.website_config.get().is_some(),
+ website_config: state.website_config.get().clone().map(|wsc| {
+ GetBucketInfoWebsiteResult {
+ index_document: wsc.index_document,
+ error_document: wsc.error_document,
+ }
+ }),
+ keys: relevant_keys
+ .into_iter()
+ .map(|(_, key)| {
+ let p = key.state.as_option().unwrap();
+ GetBucketInfoKey {
+ access_key_id: key.key_id,
+ name: p.name.get().to_string(),
+ permissions: p
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ bucket_local_aliases: p
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, b)| *b == Some(bucket.id))
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>(),
+ objects: counters.get(OBJECTS).cloned().unwrap_or_default(),
+ bytes: counters.get(BYTES).cloned().unwrap_or_default(),
+ unfinshed_uploads: counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default(),
+ quotas: ApiBucketQuotas {
+ max_size: quotas.max_size,
+ max_objects: quotas.max_objects,
+ },
+ };
+
+ Ok(json_ok_response(&res)?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoResult {
+ id: String,
+ global_aliases: Vec<String>,
+ website_access: bool,
+ #[serde(default)]
+ website_config: Option<GetBucketInfoWebsiteResult>,
+ keys: Vec<GetBucketInfoKey>,
+ objects: i64,
+ bytes: i64,
+ unfinshed_uploads: i64,
+ quotas: ApiBucketQuotas,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoWebsiteResult {
+ index_document: String,
+ error_document: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetBucketInfoKey {
+ access_key_id: String,
+ name: String,
+ permissions: ApiBucketKeyPerm,
+ bucket_local_aliases: Vec<String>,
+}
+
+pub async fn handle_create_bucket(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<CreateBucketRequest>(req).await?;
+
+ if let Some(ga) = &req.global_alias {
+ if !is_valid_bucket_name(ga) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ ga, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
+ if alias.state.get().is_some() {
+ return Err(CommonError::BucketAlreadyExists.into());
+ }
+ }
+ }
+
+ if let Some(la) = &req.local_alias {
+ if !is_valid_bucket_name(&la.alias) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ la.alias, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ let key = garage
+ .key_helper()
+ .get_existing_key(&la.access_key_id)
+ .await?;
+ let state = key.state.as_option().unwrap();
+ if matches!(state.local_aliases.get(&la.alias), Some(_)) {
+ return Err(Error::bad_request("Local alias already exists"));
+ }
+ }
+
+ let bucket = Bucket::new();
+ garage.bucket_table.insert(&bucket).await?;
+
+ if let Some(ga) = &req.global_alias {
+ garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, ga)
+ .await?;
+ }
+
+ if let Some(la) = &req.local_alias {
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
+ .await?;
+
+ if la.allow.read || la.allow.write || la.allow.owner {
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(
+ bucket.id,
+ &la.access_key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read: la.allow.read,
+ allow_write: la.allow.write,
+ allow_owner: la.allow.owner,
+ },
+ )
+ .await?;
+ }
+ }
+
+ bucket_info_results(garage, bucket.id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateBucketRequest {
+ global_alias: Option<String>,
+ local_alias: Option<CreateBucketLocalAlias>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateBucketLocalAlias {
+ access_key_id: String,
+ alias: String,
+ #[serde(default)]
+ allow: ApiBucketKeyPerm,
+}
+
+pub async fn handle_delete_bucket(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<Body>, Error> {
+ let helper = garage.bucket_helper();
+
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let state = bucket.state.as_option().unwrap();
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(CommonError::BucketNotEmpty.into());
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, perm) in bucket.authorized_keys() {
+ if perm.is_any() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+ }
+ // 2. delete all local aliases
+ for ((key_id, alias), _, active) in state.local_aliases.items().iter() {
+ if *active {
+ helper
+ .unset_local_bucket_alias(bucket.id, key_id, alias)
+ .await?;
+ }
+ }
+ // 3. delete all global aliases
+ for (alias, _, active) in state.aliases.items().iter() {
+ if *active {
+ helper.purge_global_bucket_alias(bucket.id, alias).await?;
+ }
+ }
+
+ // 4. delete bucket
+ bucket.state = Deletable::delete();
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_update_bucket(
+ garage: &Arc<Garage>,
+ id: String,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<UpdateBucketRequest>(req).await?;
+ let bucket_id = parse_bucket_id(&id)?;
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let state = bucket.state.as_option_mut().unwrap();
+
+ if let Some(wa) = req.website_access {
+ if wa.enabled {
+ state.website_config.update(Some(WebsiteConfig {
+ index_document: wa.index_document.ok_or_bad_request(
+ "Please specify indexDocument when enabling website access.",
+ )?,
+ error_document: wa.error_document,
+ }));
+ } else {
+ if wa.index_document.is_some() || wa.error_document.is_some() {
+ return Err(Error::bad_request(
+ "Cannot specify indexDocument or errorDocument when disabling website access.",
+ ));
+ }
+ state.website_config.update(None);
+ }
+ }
+
+ if let Some(q) = req.quotas {
+ state.quotas.update(BucketQuotas {
+ max_size: q.max_size,
+ max_objects: q.max_objects,
+ });
+ }
+
+ garage.bucket_table.insert(&bucket).await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct UpdateBucketRequest {
+ website_access: Option<UpdateBucketWebsiteAccess>,
+ quotas: Option<ApiBucketQuotas>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct UpdateBucketWebsiteAccess {
+ enabled: bool,
+ index_document: Option<String>,
+ error_document: Option<String>,
+}
+
+// ---- BUCKET/KEY PERMISSIONS ----
+
+pub async fn handle_bucket_change_key_perm(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+ new_perm_flag: bool,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?;
+
+ let bucket_id = parse_bucket_id(&req.bucket_id)?;
+
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let state = bucket.state.as_option().unwrap();
+
+ let key = garage
+ .key_helper()
+ .get_existing_key(&req.access_key_id)
+ .await?;
+
+ let mut perm = state
+ .authorized_keys
+ .get(&key.key_id)
+ .cloned()
+ .unwrap_or(BucketKeyPerm::NO_PERMISSIONS);
+
+ if req.permissions.read {
+ perm.allow_read = new_perm_flag;
+ }
+ if req.permissions.write {
+ perm.allow_write = new_perm_flag;
+ }
+ if req.permissions.owner {
+ perm.allow_owner = new_perm_flag;
+ }
+
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, &key.key_id, perm)
+ .await?;
+
+ bucket_info_results(garage, bucket.id).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct BucketKeyPermChangeRequest {
+ bucket_id: String,
+ access_key_id: String,
+ permissions: ApiBucketKeyPerm,
+}
+
+// ---- BUCKET ALIASES ----
+
+pub async fn handle_global_alias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_global_unalias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_local_alias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ access_key_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket_id, &access_key_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+pub async fn handle_local_unalias_bucket(
+ garage: &Arc<Garage>,
+ bucket_id: String,
+ access_key_id: String,
+ alias: String,
+) -> Result<Response<Body>, Error> {
+ let bucket_id = parse_bucket_id(&bucket_id)?;
+
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &access_key_id, &alias)
+ .await?;
+
+ bucket_info_results(garage, bucket_id).await
+}
+
+// ---- HELPER ----
+
+fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
+ let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?;
+ Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
+}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
new file mode 100644
index 00000000..99c6e332
--- /dev/null
+++ b/src/api/admin/cluster.rs
@@ -0,0 +1,193 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+
+use garage_rpc::layout::*;
+
+use garage_model::garage::Garage;
+
+use crate::admin::error::*;
+use crate::helpers::{json_ok_response, parse_json_body};
+
+pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = GetClusterStatusResponse {
+ node: hex::encode(garage.system.id),
+ garage_version: garage_util::version::garage_version(),
+ garage_features: garage_util::version::garage_features(),
+ db_engine: garage.db.engine(),
+ known_nodes: garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|i| {
+ (
+ hex::encode(i.id),
+ KnownNodeResp {
+ addr: i.addr,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ hostname: i.status.hostname,
+ },
+ )
+ })
+ .collect(),
+ layout: get_cluster_layout(garage),
+ };
+
+ Ok(json_ok_response(&res)?)
+}
+
+pub async fn handle_connect_cluster_nodes(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<Vec<String>>(req).await?;
+
+ let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
+ .await
+ .into_iter()
+ .map(|r| match r {
+ Ok(()) => ConnectClusterNodesResponse {
+ success: true,
+ error: None,
+ },
+ Err(e) => ConnectClusterNodesResponse {
+ success: false,
+ error: Some(format!("{}", e)),
+ },
+ })
+ .collect::<Vec<_>>();
+
+ Ok(json_ok_response(&res)?)
+}
+
+pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = get_cluster_layout(garage);
+
+ Ok(json_ok_response(&res)?)
+}
+
+fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
+ let layout = garage.system.get_cluster_layout();
+
+ GetClusterLayoutResponse {
+ version: layout.version,
+ roles: layout
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| v.0.is_some())
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ staged_role_changes: layout
+ .staging
+ .items()
+ .iter()
+ .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ }
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetClusterStatusResponse {
+ node: String,
+ garage_version: &'static str,
+ garage_features: Option<&'static [&'static str]>,
+ db_engine: String,
+ known_nodes: HashMap<String, KnownNodeResp>,
+ layout: GetClusterLayoutResponse,
+}
+
+#[derive(Serialize)]
+struct ConnectClusterNodesResponse {
+ success: bool,
+ error: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetClusterLayoutResponse {
+ version: u64,
+ roles: HashMap<String, Option<NodeRole>>,
+ staged_role_changes: HashMap<String, Option<NodeRole>>,
+}
+
+#[derive(Serialize)]
+struct KnownNodeResp {
+ addr: SocketAddr,
+ is_up: bool,
+ last_seen_secs_ago: Option<u64>,
+ hostname: String,
+}
+
+pub async fn handle_update_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+
+ let mut layout = garage.system.get_cluster_layout();
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for (node, role) in updates {
+ let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
+ let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(node, NodeRoleV(role)));
+ }
+
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_apply_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.apply_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.revert_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
+
+#[derive(Deserialize)]
+struct ApplyRevertLayoutRequest {
+ version: u64,
+}
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
new file mode 100644
index 00000000..ed1a07bd
--- /dev/null
+++ b/src/api/admin/error.rs
@@ -0,0 +1,97 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+pub use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// The API access key does not exist
+ #[error(display = "Access key not found: {}", _0)]
+ NoSuchAccessKey(String),
+
+ /// In Import key, the key already exists
+ #[error(
+ display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
+ _0
+ )]
+ KeyAlreadyExists(String),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
+ }
+ }
+}
+
+impl Error {
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
+ }
+ }
+
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
+ use hyper::header;
+ header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
new file mode 100644
index 00000000..2bbabb7b
--- /dev/null
+++ b/src/api/admin/key.rs
@@ -0,0 +1,256 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_table::*;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::*;
+
+use crate::admin::error::*;
+use crate::helpers::{json_ok_response, parse_json_body};
+
+pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let res = garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| ListKeyResultItem {
+ id: k.key_id.to_string(),
+ name: k.params().unwrap().name.get().clone(),
+ })
+ .collect::<Vec<_>>();
+
+ Ok(json_ok_response(&res)?)
+}
+
+#[derive(Serialize)]
+struct ListKeyResultItem {
+ id: String,
+ name: String,
+}
+
+pub async fn handle_get_key_info(
+ garage: &Arc<Garage>,
+ id: Option<String>,
+ search: Option<String>,
+) -> Result<Response<Body>, Error> {
+ let key = if let Some(id) = id {
+ garage.key_helper().get_existing_key(&id).await?
+ } else if let Some(search) = search {
+ garage
+ .key_helper()
+ .get_existing_matching_key(&search)
+ .await?
+ } else {
+ unreachable!();
+ };
+
+ key_info_results(garage, key).await
+}
+
+pub async fn handle_create_key(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<CreateKeyRequest>(req).await?;
+
+ let key = Key::new(&req.name);
+ garage.key_table.insert(&key).await?;
+
+ key_info_results(garage, key).await
+}
+
+#[derive(Deserialize)]
+struct CreateKeyRequest {
+ name: String,
+}
+
+pub async fn handle_import_key(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<ImportKeyRequest>(req).await?;
+
+ let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::KeyAlreadyExists(req.access_key_id.to_string()));
+ }
+
+ let imported_key = Key::import(&req.access_key_id, &req.secret_access_key, &req.name);
+ garage.key_table.insert(&imported_key).await?;
+
+ key_info_results(garage, imported_key).await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ImportKeyRequest {
+ access_key_id: String,
+ secret_access_key: String,
+ name: String,
+}
+
+pub async fn handle_update_key(
+ garage: &Arc<Garage>,
+ id: String,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let req = parse_json_body::<UpdateKeyRequest>(req).await?;
+
+ let mut key = garage.key_helper().get_existing_key(&id).await?;
+
+ let key_state = key.state.as_option_mut().unwrap();
+
+ if let Some(new_name) = req.name {
+ key_state.name.update(new_name);
+ }
+ if let Some(allow) = req.allow {
+ if allow.create_bucket {
+ key_state.allow_create_bucket.update(true);
+ }
+ }
+ if let Some(deny) = req.deny {
+ if deny.create_bucket {
+ key_state.allow_create_bucket.update(false);
+ }
+ }
+
+ garage.key_table.insert(&key).await?;
+
+ key_info_results(garage, key).await
+}
+
+#[derive(Deserialize)]
+struct UpdateKeyRequest {
+ name: Option<String>,
+ allow: Option<KeyPerm>,
+ deny: Option<KeyPerm>,
+}
+
+pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> {
+ let mut key = garage.key_helper().get_existing_key(&id).await?;
+
+ key.state.as_option().unwrap();
+
+ garage.key_helper().delete_key(&mut key).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Body>, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ let key_state = key.state.as_option().unwrap();
+
+ for id in key_state
+ .authorized_buckets
+ .items()
+ .iter()
+ .map(|(id, _)| id)
+ .chain(
+ key_state
+ .local_aliases
+ .items()
+ .iter()
+ .filter_map(|(_, _, v)| v.as_ref()),
+ ) {
+ if !relevant_buckets.contains_key(id) {
+ if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? {
+ if b.state.as_option().is_some() {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+ }
+ }
+
+ let res = GetKeyInfoResult {
+ name: key_state.name.get().clone(),
+ access_key_id: key.key_id.clone(),
+ secret_access_key: key_state.secret_key.clone(),
+ permissions: KeyPerm {
+ create_bucket: *key_state.allow_create_bucket.get(),
+ },
+ buckets: relevant_buckets
+ .into_iter()
+ .map(|(_, bucket)| {
+ let state = bucket.state.as_option().unwrap();
+ KeyInfoBucketResult {
+ id: hex::encode(bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|((k, _), _, a)| *a && *k == key.key_id)
+ .map(|((_, n), _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ permissions: key_state
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ }
+ })
+ .collect::<Vec<_>>(),
+ };
+
+ Ok(json_ok_response(&res)?)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct GetKeyInfoResult {
+ name: String,
+ access_key_id: String,
+ secret_access_key: String,
+ permissions: KeyPerm,
+ buckets: Vec<KeyInfoBucketResult>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct KeyPerm {
+ #[serde(default)]
+ create_bucket: bool,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct KeyInfoBucketResult {
+ id: String,
+ global_aliases: Vec<String>,
+ local_aliases: Vec<String>,
+ permissions: ApiBucketKeyPerm,
+}
+
+#[derive(Serialize, Deserialize, Default)]
+pub(crate) struct ApiBucketKeyPerm {
+ #[serde(default)]
+ pub(crate) read: bool,
+ #[serde(default)]
+ pub(crate) write: bool,
+ #[serde(default)]
+ pub(crate) owner: bool,
+}
diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs
new file mode 100644
index 00000000..c4857c10
--- /dev/null
+++ b/src/api/admin/mod.rs
@@ -0,0 +1,7 @@
+pub mod api_server;
+mod error;
+mod router;
+
+mod bucket;
+mod cluster;
+mod key;
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
new file mode 100644
index 00000000..3eee8b67
--- /dev/null
+++ b/src/api/admin/router.rs
@@ -0,0 +1,145 @@
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::admin::error::*;
+use crate::router_macros::*;
+
+pub enum Authorization {
+ MetricsToken,
+ AdminToken,
+}
+
+router_match! {@func
+
+/// List of all Admin API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ Options,
+ Metrics,
+ GetClusterStatus,
+ ConnectClusterNodes,
+ // Layout
+ GetClusterLayout,
+ UpdateClusterLayout,
+ ApplyClusterLayout,
+ RevertClusterLayout,
+ // Keys
+ ListKeys,
+ CreateKey,
+ ImportKey,
+ GetKeyInfo {
+ id: Option<String>,
+ search: Option<String>,
+ },
+ DeleteKey {
+ id: String,
+ },
+ UpdateKey {
+ id: String,
+ },
+ // Buckets
+ ListBuckets,
+ CreateBucket,
+ GetBucketInfo {
+ id: Option<String>,
+ global_alias: Option<String>,
+ },
+ DeleteBucket {
+ id: String,
+ },
+ UpdateBucket {
+ id: String,
+ },
+ // Bucket-Key Permissions
+ BucketAllowKey,
+ BucketDenyKey,
+ // Bucket aliases
+ GlobalAliasBucket {
+ id: String,
+ alias: String,
+ },
+ GlobalUnaliasBucket {
+ id: String,
+ alias: String,
+ },
+ LocalAliasBucket {
+ id: String,
+ access_key_id: String,
+ alias: String,
+ },
+ LocalUnaliasBucket {
+ id: String,
+ access_key_id: String,
+ alias: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
+ let uri = req.uri();
+ let path = uri.path();
+ let query = uri.query();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let res = router_match!(@gen_path_parser (req.method(), path, query) [
+ OPTIONS _ => Options,
+ GET "/metrics" => Metrics,
+ GET "/v0/status" => GetClusterStatus,
+ POST "/v0/connect" => ConnectClusterNodes,
+ // Layout endpoints
+ GET "/v0/layout" => GetClusterLayout,
+ POST "/v0/layout" => UpdateClusterLayout,
+ POST "/v0/layout/apply" => ApplyClusterLayout,
+ POST "/v0/layout/revert" => RevertClusterLayout,
+ // API key endpoints
+ GET "/v0/key" if id => GetKeyInfo (query_opt::id, query_opt::search),
+ GET "/v0/key" if search => GetKeyInfo (query_opt::id, query_opt::search),
+ POST "/v0/key" if id => UpdateKey (query::id),
+ POST "/v0/key" => CreateKey,
+ POST "/v0/key/import" => ImportKey,
+ DELETE "/v0/key" if id => DeleteKey (query::id),
+ GET "/v0/key" => ListKeys,
+ // Bucket endpoints
+ GET "/v0/bucket" if id => GetBucketInfo (query_opt::id, query_opt::global_alias),
+ GET "/v0/bucket" if global_alias => GetBucketInfo (query_opt::id, query_opt::global_alias),
+ GET "/v0/bucket" => ListBuckets,
+ POST "/v0/bucket" => CreateBucket,
+ DELETE "/v0/bucket" if id => DeleteBucket (query::id),
+ PUT "/v0/bucket" if id => UpdateBucket (query::id),
+ // Bucket-key permissions
+ POST "/v0/bucket/allow" => BucketAllowKey,
+ POST "/v0/bucket/deny" => BucketDenyKey,
+ // Bucket aliases
+ PUT "/v0/bucket/alias/global" => GlobalAliasBucket (query::id, query::alias),
+ DELETE "/v0/bucket/alias/global" => GlobalUnaliasBucket (query::id, query::alias),
+ PUT "/v0/bucket/alias/local" => LocalAliasBucket (query::id, query::access_key_id, query::alias),
+ DELETE "/v0/bucket/alias/local" => LocalUnaliasBucket (query::id, query::access_key_id, query::alias),
+ ]);
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+
+ Ok(res)
+ }
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ match self {
+ Self::Metrics => Authorization::MetricsToken,
+ _ => Authorization::AdminToken,
+ }
+ }
+}
+
+generateQueryParameters! {
+ "id" => id,
+ "search" => search,
+ "globalAlias" => global_alias,
+ "alias" => alias,
+ "accessKeyId" => access_key_id
+}
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
deleted file mode 100644
index e7b86d9e..00000000
--- a/src/api/api_server.rs
+++ /dev/null
@@ -1,645 +0,0 @@
-use std::net::SocketAddr;
-use std::sync::Arc;
-
-use chrono::{DateTime, NaiveDateTime, Utc};
-use futures::future::Future;
-use futures::prelude::*;
-use hyper::header;
-use hyper::server::conn::AddrStream;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Method, Request, Response, Server};
-
-use opentelemetry::{
- global,
- metrics::{Counter, ValueRecorder},
- trace::{FutureExt, TraceContextExt, Tracer},
- Context, KeyValue,
-};
-
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-use garage_util::metrics::{gen_trace_id, RecordDuration};
-
-use garage_model::garage::Garage;
-use garage_model::key_table::Key;
-
-use garage_table::util::*;
-
-use crate::error::*;
-use crate::signature::compute_scope;
-use crate::signature::payload::check_payload_signature;
-use crate::signature::streaming::SignedPayloadStream;
-use crate::signature::LONG_DATETIME;
-
-use crate::helpers::*;
-use crate::s3_bucket::*;
-use crate::s3_copy::*;
-use crate::s3_cors::*;
-use crate::s3_delete::*;
-use crate::s3_get::*;
-use crate::s3_list::*;
-use crate::s3_post_object::handle_post_object;
-use crate::s3_put::*;
-use crate::s3_router::{Authorization, Endpoint};
-use crate::s3_website::*;
-
-struct ApiMetrics {
- request_counter: Counter<u64>,
- error_counter: Counter<u64>,
- request_duration: ValueRecorder<f64>,
-}
-
-impl ApiMetrics {
- fn new() -> Self {
- let meter = global::meter("garage/api");
- Self {
- request_counter: meter
- .u64_counter("api.request_counter")
- .with_description("Number of API calls to the various S3 API endpoints")
- .init(),
- error_counter: meter
- .u64_counter("api.error_counter")
- .with_description(
- "Number of API calls to the various S3 API endpoints that resulted in errors",
- )
- .init(),
- request_duration: meter
- .f64_value_recorder("api.request_duration")
- .with_description("Duration of API calls to the various S3 API endpoints")
- .init(),
- }
- }
-}
-
-/// Run the S3 API server
-pub async fn run_api_server(
- garage: Arc<Garage>,
- shutdown_signal: impl Future<Output = ()>,
-) -> Result<(), GarageError> {
- let addr = &garage.config.s3_api.api_bind_addr;
-
- let metrics = Arc::new(ApiMetrics::new());
-
- let service = make_service_fn(|conn: &AddrStream| {
- let garage = garage.clone();
- let metrics = metrics.clone();
-
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
- let garage = garage.clone();
- let metrics = metrics.clone();
-
- handler(garage, metrics, req, client_addr)
- }))
- }
- });
-
- let server = Server::bind(addr).serve(service);
-
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("API server listening on http://{}", addr);
-
- graceful.await?;
- Ok(())
-}
-
-async fn handler(
- garage: Arc<Garage>,
- metrics: Arc<ApiMetrics>,
- req: Request<Body>,
- addr: SocketAddr,
-) -> Result<Response<Body>, GarageError> {
- let uri = req.uri().clone();
- info!("{} {} {}", addr, req.method(), uri);
- debug!("{:?}", req);
-
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer
- .span_builder("S3 API call (unknown)")
- .with_trace_id(gen_trace_id())
- .with_attributes(vec![
- KeyValue::new("method", format!("{}", req.method())),
- KeyValue::new("uri", req.uri().to_string()),
- ])
- .start(&tracer);
-
- let res = handler_stage2(garage.clone(), metrics, req)
- .with_context(Context::current_with_span(span))
- .await;
-
- match res {
- Ok(x) => {
- debug!("{} {:?}", x.status(), x.headers());
- Ok(x)
- }
- Err(e) => {
- let body: Body = Body::from(e.aws_xml(&garage.config.s3_api.s3_region, uri.path()));
- let mut http_error_builder = Response::builder()
- .status(e.http_status_code())
- .header("Content-Type", "application/xml");
-
- if let Some(header_map) = http_error_builder.headers_mut() {
- e.add_headers(header_map)
- }
-
- let http_error = http_error_builder.body(body)?;
-
- if e.http_status_code().is_server_error() {
- warn!("Response: error {}, {}", e.http_status_code(), e);
- } else {
- info!("Response: error {}, {}", e.http_status_code(), e);
- }
- Ok(http_error)
- }
- }
-}
-
-async fn handler_stage2(
- garage: Arc<Garage>,
- metrics: Arc<ApiMetrics>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let authority = req
- .headers()
- .get(header::HOST)
- .ok_or_bad_request("Host header required")?
- .to_str()?;
-
- let host = authority_to_host(authority)?;
-
- let bucket_name = garage
- .config
- .s3_api
- .root_domain
- .as_ref()
- .and_then(|root_domain| host_to_bucket(&host, root_domain));
-
- let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?;
- debug!("Endpoint: {:?}", endpoint);
-
- let current_context = Context::current();
- let current_span = current_context.span();
- current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
- current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
- current_span.set_attribute(KeyValue::new(
- "bucket",
- bucket_name.clone().unwrap_or_default(),
- ));
-
- let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
-
- let res = handler_stage3(garage, req, endpoint, bucket_name)
- .record_duration(&metrics.request_duration, &metrics_tags[..])
- .await;
-
- metrics.request_counter.add(1, &metrics_tags[..]);
-
- let status_code = match &res {
- Ok(r) => r.status(),
- Err(e) => e.http_status_code(),
- };
- if status_code.is_client_error() || status_code.is_server_error() {
- metrics.error_counter.add(
- 1,
- &[
- metrics_tags[0].clone(),
- KeyValue::new("status_code", status_code.as_str().to_string()),
- ],
- );
- }
-
- res
-}
-
-async fn handler_stage3(
- garage: Arc<Garage>,
- req: Request<Body>,
- endpoint: Endpoint,
- bucket_name: Option<String>,
-) -> Result<Response<Body>, Error> {
- // Some endpoints are processed early, before we even check for an API key
- if let Endpoint::PostObject = endpoint {
- return handle_post_object(garage, req, bucket_name.unwrap()).await;
- }
- if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, bucket_name).await;
- }
-
- let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?;
- let api_key = api_key.ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?;
-
- let req = match req.headers().get("x-amz-content-sha256") {
- Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
- let signature = content_sha256
- .take()
- .ok_or_bad_request("No signature provided")?;
-
- let secret_key = &api_key
- .state
- .as_option()
- .ok_or_internal_error("Deleted key state")?
- .secret_key;
-
- let date = req
- .headers()
- .get("x-amz-date")
- .ok_or_bad_request("Missing X-Amz-Date field")?
- .to_str()?;
- let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
- .ok_or_bad_request("Invalid date")?;
- let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
-
- let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
- let signing_hmac = crate::signature::signing_hmac(
- &date,
- secret_key,
- &garage.config.s3_api.s3_region,
- "s3",
- )
- .ok_or_internal_error("Unable to build signing HMAC")?;
-
- req.map(move |body| {
- Body::wrap_stream(
- SignedPayloadStream::new(
- body.map_err(Error::from),
- signing_hmac,
- date,
- &scope,
- signature,
- )
- .map_err(Error::from),
- )
- })
- }
- _ => req,
- };
-
- let bucket_name = match bucket_name {
- None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
- Some(bucket) => bucket.to_string(),
- };
-
- // Special code path for CreateBucket API endpoint
- if let Endpoint::CreateBucket {} = endpoint {
- return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
- }
-
- let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
- let bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
-
- let allowed = match endpoint.authorization_type() {
- Authorization::Read => api_key.allow_read(&bucket_id),
- Authorization::Write => api_key.allow_write(&bucket_id),
- Authorization::Owner => api_key.allow_owner(&bucket_id),
- _ => unreachable!(),
- };
-
- if !allowed {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
- }
-
- // Look up what CORS rule might apply to response.
- // Requests for methods different than GET, HEAD or POST
- // are always preflighted, i.e. the browser should make
- // an OPTIONS call before to check it is allowed
- let matching_cors_rule = match *req.method() {
- Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
- _ => None,
- };
-
- let resp = match endpoint {
- Endpoint::HeadObject {
- key, part_number, ..
- } => handle_head(garage, &req, bucket_id, &key, part_number).await,
- Endpoint::GetObject {
- key, part_number, ..
- } => handle_get(garage, &req, bucket_id, &key, part_number).await,
- Endpoint::UploadPart {
- key,
- part_number,
- upload_id,
- } => {
- handle_put_part(
- garage,
- req,
- bucket_id,
- &key,
- part_number,
- &upload_id,
- content_sha256,
- )
- .await
- }
- Endpoint::CopyObject { key } => handle_copy(garage, &api_key, &req, bucket_id, &key).await,
- Endpoint::UploadPartCopy {
- key,
- part_number,
- upload_id,
- } => {
- handle_upload_part_copy(
- garage,
- &api_key,
- &req,
- bucket_id,
- &key,
- part_number,
- &upload_id,
- )
- .await
- }
- Endpoint::PutObject { key } => {
- handle_put(garage, req, bucket_id, &key, content_sha256).await
- }
- Endpoint::AbortMultipartUpload { key, upload_id } => {
- handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
- }
- Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
- Endpoint::CreateMultipartUpload { key } => {
- handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
- }
- Endpoint::CompleteMultipartUpload { key, upload_id } => {
- handle_complete_multipart_upload(
- garage,
- req,
- &bucket_name,
- bucket_id,
- &key,
- &upload_id,
- content_sha256,
- )
- .await
- }
- Endpoint::CreateBucket {} => unreachable!(),
- Endpoint::HeadBucket {} => {
- let empty_body: Body = Body::from(vec![]);
- let response = Response::builder().body(empty_body).unwrap();
- Ok(response)
- }
- Endpoint::DeleteBucket {} => {
- handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
- }
- Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
- Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
- Endpoint::ListObjects {
- delimiter,
- encoding_type,
- marker,
- max_keys,
- prefix,
- } => {
- handle_list(
- garage,
- &ListObjectsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- prefix: prefix.unwrap_or_default(),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- },
- is_v2: false,
- marker,
- continuation_token: None,
- start_after: None,
- },
- )
- .await
- }
- Endpoint::ListObjectsV2 {
- delimiter,
- encoding_type,
- max_keys,
- prefix,
- continuation_token,
- start_after,
- list_type,
- ..
- } => {
- if list_type == "2" {
- handle_list(
- garage,
- &ListObjectsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- prefix: prefix.unwrap_or_default(),
- },
- is_v2: true,
- marker: None,
- continuation_token,
- start_after,
- },
- )
- .await
- } else {
- Err(Error::BadRequest(format!(
- "Invalid endpoint: list-type={}",
- list_type
- )))
- }
- }
- Endpoint::ListMultipartUploads {
- delimiter,
- encoding_type,
- key_marker,
- max_uploads,
- prefix,
- upload_id_marker,
- } => {
- handle_list_multipart_upload(
- garage,
- &ListMultipartUploadsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- prefix: prefix.unwrap_or_default(),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- },
- key_marker,
- upload_id_marker,
- },
- )
- .await
- }
- Endpoint::ListParts {
- key,
- max_parts,
- part_number_marker,
- upload_id,
- } => {
- handle_list_parts(
- garage,
- &ListPartsQuery {
- bucket_name,
- bucket_id,
- key,
- upload_id,
- part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
- max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- },
- )
- .await
- }
- Endpoint::DeleteObjects {} => {
- handle_delete_objects(garage, bucket_id, req, content_sha256).await
- }
- Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
- Endpoint::PutBucketWebsite {} => {
- handle_put_website(garage, bucket_id, req, content_sha256).await
- }
- Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
- Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
- Endpoint::PutBucketCors {} => handle_put_cors(garage, bucket_id, req, content_sha256).await,
- Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
- endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
- };
-
- // If request was a success and we have a CORS rule that applies to it,
- // add the corresponding CORS headers to the response
- let mut resp_ok = resp?;
- if let Some(rule) = matching_cors_rule {
- add_cors_headers(&mut resp_ok, rule)
- .ok_or_internal_error("Invalid bucket CORS configuration")?;
- }
-
- Ok(resp_ok)
-}
-
-async fn handle_request_without_bucket(
- garage: Arc<Garage>,
- _req: Request<Body>,
- api_key: Key,
- endpoint: Endpoint,
-) -> Result<Response<Body>, Error> {
- match endpoint {
- Endpoint::ListBuckets => handle_list_buckets(&garage, &api_key).await,
- endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
- }
-}
-
-#[allow(clippy::ptr_arg)]
-pub async fn resolve_bucket(
- garage: &Garage,
- bucket_name: &String,
- api_key: &Key,
-) -> Result<Uuid, Error> {
- let api_key_params = api_key
- .state
- .as_option()
- .ok_or_internal_error("Key should not be deleted at this point")?;
-
- if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
- Ok(*bucket_id)
- } else {
- Ok(garage
- .bucket_helper()
- .resolve_global_bucket_name(bucket_name)
- .await?
- .ok_or(Error::NoSuchBucket)?)
- }
-}
-
-/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in
-/// the host header of the request
-///
-/// S3 internally manages only buckets and keys. This function splits
-/// an HTTP path to get the corresponding bucket name and key.
-pub fn parse_bucket_key<'a>(
- path: &'a str,
- host_bucket: Option<&'a str>,
-) -> Result<(&'a str, Option<&'a str>), Error> {
- let path = path.trim_start_matches('/');
-
- if let Some(bucket) = host_bucket {
- if !path.is_empty() {
- return Ok((bucket, Some(path)));
- } else {
- return Ok((bucket, None));
- }
- }
-
- let (bucket, key) = match path.find('/') {
- Some(i) => {
- let key = &path[i + 1..];
- if !key.is_empty() {
- (&path[..i], Some(key))
- } else {
- (&path[..i], None)
- }
- }
- None => (path, None),
- };
- if bucket.is_empty() {
- return Err(Error::BadRequest("No bucket specified".to_string()));
- }
- Ok((bucket, key))
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn parse_bucket_containing_a_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?;
- assert_eq!(bucket, "my_bucket");
- assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
- Ok(())
- }
-
- #[test]
- fn parse_bucket_containing_no_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("/my_bucket/", None)?;
- assert_eq!(bucket, "my_bucket");
- assert!(key.is_none());
- let (bucket, key) = parse_bucket_key("/my_bucket", None)?;
- assert_eq!(bucket, "my_bucket");
- assert!(key.is_none());
- Ok(())
- }
-
- #[test]
- fn parse_bucket_containing_no_bucket() {
- let parsed = parse_bucket_key("", None);
- assert!(parsed.is_err());
- let parsed = parse_bucket_key("/", None);
- assert!(parsed.is_err());
- let parsed = parse_bucket_key("////", None);
- assert!(parsed.is_err());
- }
-
- #[test]
- fn parse_bucket_with_vhost_and_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?;
- assert_eq!(bucket, "my-bucket");
- assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
- Ok(())
- }
-
- #[test]
- fn parse_bucket_with_vhost_no_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?;
- assert_eq!(bucket, "my-bucket");
- assert!(key.is_none());
- let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?;
- assert_eq!(bucket, "my-bucket");
- assert!(key.is_none());
- Ok(())
- }
-}
diff --git a/src/api/common_error.rs b/src/api/common_error.rs
new file mode 100644
index 00000000..20f9f266
--- /dev/null
+++ b/src/api/common_error.rs
@@ -0,0 +1,177 @@
+use err_derive::Error;
+use hyper::StatusCode;
+
+use garage_util::error::Error as GarageError;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum CommonError {
+ // ---- INTERNAL ERRORS ----
+ /// Error related to deeper parts of Garage
+ #[error(display = "Internal error: {}", _0)]
+ InternalError(#[error(source)] GarageError),
+
+ /// Error related to Hyper
+ #[error(display = "Internal error (Hyper error): {}", _0)]
+ Hyper(#[error(source)] hyper::Error),
+
+ /// Error related to HTTP
+ #[error(display = "Internal error (HTTP error): {}", _0)]
+ Http(#[error(source)] http::Error),
+
+ // ---- GENERIC CLIENT ERRORS ----
+ /// Proper authentication was not provided
+ #[error(display = "Forbidden: {}", _0)]
+ Forbidden(String),
+
+ /// Generic bad request response with custom message
+ #[error(display = "Bad request: {}", _0)]
+ BadRequest(String),
+
+ // ---- SPECIFIC ERROR CONDITIONS ----
+ // These have to be error codes referenced in the S3 spec here:
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+ /// The bucket requested don't exists
+ #[error(display = "Bucket not found: {}", _0)]
+ NoSuchBucket(String),
+
+ /// Tried to create a bucket that already exist
+ #[error(display = "Bucket already exists")]
+ BucketAlreadyExists,
+
+ /// Tried to delete a non-empty bucket
+ #[error(display = "Tried to delete a non-empty bucket")]
+ BucketNotEmpty,
+
+ // Category: bad request
+ /// Bucket name is not valid according to AWS S3 specs
+ #[error(display = "Invalid bucket name: {}", _0)]
+ InvalidBucketName(String),
+}
+
+impl CommonError {
+ pub fn http_status_code(&self) -> StatusCode {
+ match self {
+ CommonError::InternalError(
+ GarageError::Timeout
+ | GarageError::RemoteError(_)
+ | GarageError::Quorum(_, _, _, _),
+ ) => StatusCode::SERVICE_UNAVAILABLE,
+ CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
+ StatusCode::INTERNAL_SERVER_ERROR
+ }
+ CommonError::BadRequest(_) => StatusCode::BAD_REQUEST,
+ CommonError::Forbidden(_) => StatusCode::FORBIDDEN,
+ CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND,
+ CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT,
+ CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ pub fn aws_code(&self) -> &'static str {
+ match self {
+ CommonError::Forbidden(_) => "AccessDenied",
+ CommonError::InternalError(
+ GarageError::Timeout
+ | GarageError::RemoteError(_)
+ | GarageError::Quorum(_, _, _, _),
+ ) => "ServiceUnavailable",
+ CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
+ "InternalError"
+ }
+ CommonError::BadRequest(_) => "InvalidRequest",
+ CommonError::NoSuchBucket(_) => "NoSuchBucket",
+ CommonError::BucketAlreadyExists => "BucketAlreadyExists",
+ CommonError::BucketNotEmpty => "BucketNotEmpty",
+ CommonError::InvalidBucketName(_) => "InvalidBucketName",
+ }
+ }
+
+ pub fn bad_request<M: ToString>(msg: M) -> Self {
+ CommonError::BadRequest(msg.to_string())
+ }
+}
+
+pub trait CommonErrorDerivative: From<CommonError> {
+ fn internal_error<M: ToString>(msg: M) -> Self {
+ Self::from(CommonError::InternalError(GarageError::Message(
+ msg.to_string(),
+ )))
+ }
+
+ fn bad_request<M: ToString>(msg: M) -> Self {
+ Self::from(CommonError::BadRequest(msg.to_string()))
+ }
+
+ fn forbidden<M: ToString>(msg: M) -> Self {
+ Self::from(CommonError::Forbidden(msg.to_string()))
+ }
+}
+
+/// Trait to map error to the Bad Request error code
+pub trait OkOrBadRequest {
+ type S;
+ fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>;
+}
+
+impl<T, E> OkOrBadRequest for Result<T, E>
+where
+ E: std::fmt::Display,
+{
+ type S = T;
+ fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(CommonError::BadRequest(format!(
+ "{}: {}",
+ reason.as_ref(),
+ e
+ ))),
+ }
+ }
+}
+
+impl<T> OkOrBadRequest for Option<T> {
+ type S = T;
+ fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Some(x) => Ok(x),
+ None => Err(CommonError::BadRequest(reason.as_ref().to_string())),
+ }
+ }
+}
+
+/// Trait to map an error to an Internal Error code
+pub trait OkOrInternalError {
+ type S;
+ fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, CommonError>;
+}
+
+impl<T, E> OkOrInternalError for Result<T, E>
+where
+ E: std::fmt::Display,
+{
+ type S = T;
+ fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(CommonError::InternalError(GarageError::Message(format!(
+ "{}: {}",
+ reason.as_ref(),
+ e
+ )))),
+ }
+ }
+}
+
+impl<T> OkOrInternalError for Option<T> {
+ type S = T;
+ fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, CommonError> {
+ match self {
+ Some(x) => Ok(x),
+ None => Err(CommonError::InternalError(GarageError::Message(
+ reason.as_ref().to_string(),
+ ))),
+ }
+ }
+}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
new file mode 100644
index 00000000..62fe4e5a
--- /dev/null
+++ b/src/api/generic_server.rs
@@ -0,0 +1,211 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+
+use hyper::header::HeaderValue;
+use hyper::server::conn::AddrStream;
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{Body, Request, Response, Server};
+use hyper::{HeaderMap, StatusCode};
+
+use opentelemetry::{
+ global,
+ metrics::{Counter, ValueRecorder},
+ trace::{FutureExt, SpanRef, TraceContextExt, Tracer},
+ Context, KeyValue,
+};
+
+use garage_util::error::Error as GarageError;
+use garage_util::metrics::{gen_trace_id, RecordDuration};
+
+pub(crate) trait ApiEndpoint: Send + Sync + 'static {
+ fn name(&self) -> &'static str;
+ fn add_span_attributes(&self, span: SpanRef<'_>);
+}
+
+pub trait ApiError: std::error::Error + Send + Sync + 'static {
+ fn http_status_code(&self) -> StatusCode;
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>);
+ fn http_body(&self, garage_region: &str, path: &str) -> Body;
+}
+
+#[async_trait]
+pub(crate) trait ApiHandler: Send + Sync + 'static {
+ const API_NAME: &'static str;
+ const API_NAME_DISPLAY: &'static str;
+
+ type Endpoint: ApiEndpoint;
+ type Error: ApiError;
+
+ fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>;
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: Self::Endpoint,
+ ) -> Result<Response<Body>, Self::Error>;
+}
+
+pub(crate) struct ApiServer<A: ApiHandler> {
+ region: String,
+ api_handler: A,
+
+ // Metrics
+ request_counter: Counter<u64>,
+ error_counter: Counter<u64>,
+ request_duration: ValueRecorder<f64>,
+}
+
+impl<A: ApiHandler> ApiServer<A> {
+ pub fn new(region: String, api_handler: A) -> Arc<Self> {
+ let meter = global::meter("garage/api");
+ Arc::new(Self {
+ region,
+ api_handler,
+ request_counter: meter
+ .u64_counter(format!("api.{}.request_counter", A::API_NAME))
+ .with_description(format!(
+ "Number of API calls to the various {} API endpoints",
+ A::API_NAME_DISPLAY
+ ))
+ .init(),
+ error_counter: meter
+ .u64_counter(format!("api.{}.error_counter", A::API_NAME))
+ .with_description(format!(
+ "Number of API calls to the various {} API endpoints that resulted in errors",
+ A::API_NAME_DISPLAY
+ ))
+ .init(),
+ request_duration: meter
+ .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME))
+ .with_description(format!(
+ "Duration of API calls to the various {} API endpoints",
+ A::API_NAME_DISPLAY
+ ))
+ .init(),
+ })
+ }
+
+ pub async fn run_server(
+ self: Arc<Self>,
+ bind_addr: SocketAddr,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let service = make_service_fn(|conn: &AddrStream| {
+ let this = self.clone();
+
+ let client_addr = conn.remote_addr();
+ async move {
+ Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
+ let this = this.clone();
+
+ this.handler(req, client_addr)
+ }))
+ }
+ });
+
+ let server = Server::bind(&bind_addr).serve(service);
+
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ info!(
+ "{} API server listening on http://{}",
+ A::API_NAME_DISPLAY,
+ bind_addr
+ );
+
+ graceful.await?;
+ Ok(())
+ }
+
+ async fn handler(
+ self: Arc<Self>,
+ req: Request<Body>,
+ addr: SocketAddr,
+ ) -> Result<Response<Body>, GarageError> {
+ let uri = req.uri().clone();
+ info!("{} {} {}", addr, req.method(), uri);
+ debug!("{:?}", req);
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let span = tracer
+ .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY))
+ .with_trace_id(gen_trace_id())
+ .with_attributes(vec![
+ KeyValue::new("method", format!("{}", req.method())),
+ KeyValue::new("uri", req.uri().to_string()),
+ ])
+ .start(&tracer);
+
+ let res = self
+ .handler_stage2(req)
+ .with_context(Context::current_with_span(span))
+ .await;
+
+ match res {
+ Ok(x) => {
+ debug!("{} {:?}", x.status(), x.headers());
+ Ok(x)
+ }
+ Err(e) => {
+ let body: Body = e.http_body(&self.region, uri.path());
+ let mut http_error_builder = Response::builder().status(e.http_status_code());
+
+ if let Some(header_map) = http_error_builder.headers_mut() {
+ e.add_http_headers(header_map)
+ }
+
+ let http_error = http_error_builder.body(body)?;
+
+ if e.http_status_code().is_server_error() {
+ warn!("Response: error {}, {}", e.http_status_code(), e);
+ } else {
+ info!("Response: error {}, {}", e.http_status_code(), e);
+ }
+ Ok(http_error)
+ }
+ }
+ }
+
+ async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> {
+ let endpoint = self.api_handler.parse_endpoint(&req)?;
+ debug!("Endpoint: {}", endpoint.name());
+
+ let current_context = Context::current();
+ let current_span = current_context.span();
+ current_span.update_name::<String>(format!(
+ "{} API {}",
+ A::API_NAME_DISPLAY,
+ endpoint.name()
+ ));
+ current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
+ endpoint.add_span_attributes(current_span);
+
+ let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
+
+ let res = self
+ .api_handler
+ .handle(req, endpoint)
+ .record_duration(&self.request_duration, &metrics_tags[..])
+ .await;
+
+ self.request_counter.add(1, &metrics_tags[..]);
+
+ let status_code = match &res {
+ Ok(r) => r.status(),
+ Err(e) => e.http_status_code(),
+ };
+ if status_code.is_client_error() || status_code.is_server_error() {
+ self.error_counter.add(
+ 1,
+ &[
+ metrics_tags[0].clone(),
+ KeyValue::new("status_code", status_code.as_str().to_string()),
+ ],
+ );
+ }
+
+ res
+ }
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index c2709bb3..642dbc42 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,5 +1,21 @@
-use crate::Error;
+use hyper::{Body, Request, Response};
use idna::domain_to_unicode;
+use serde::{Deserialize, Serialize};
+
+use crate::common_error::{CommonError as Error, *};
+
+/// What kind of authorization is required to perform a given action
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Authorization {
+ /// No authorization is required
+ None,
+ /// Having Read permission on bucket
+ Read,
+ /// Having Write permission on bucket
+ Write,
+ /// Having Owner permission on bucket
+ Owner,
+}
/// Host to bucket
///
@@ -31,7 +47,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
let mut iter = authority.chars().enumerate();
let (_, first_char) = iter
.next()
- .ok_or_else(|| Error::BadRequest("Authority is empty".to_string()))?;
+ .ok_or_else(|| Error::bad_request("Authority is empty".to_string()))?;
let split = match first_char {
'[' => {
@@ -39,7 +55,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
match iter.next() {
Some((_, ']')) => iter.next(),
_ => {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Authority {} has an illegal format",
authority
)))
@@ -52,7 +68,7 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
let authority = match split {
Some((i, ':')) => Ok(&authority[..i]),
None => Ok(authority),
- Some((_, _)) => Err(Error::BadRequest(format!(
+ Some((_, _)) => Err(Error::bad_request(format!(
"Authority {} has an illegal format",
authority
))),
@@ -60,11 +76,135 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
authority.map(|h| domain_to_unicode(h).0)
}
+/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in
+/// the host header of the request
+///
+/// S3 internally manages only buckets and keys. This function splits
+/// an HTTP path to get the corresponding bucket name and key.
+pub fn parse_bucket_key<'a>(
+ path: &'a str,
+ host_bucket: Option<&'a str>,
+) -> Result<(&'a str, Option<&'a str>), Error> {
+ let path = path.trim_start_matches('/');
+
+ if let Some(bucket) = host_bucket {
+ if !path.is_empty() {
+ return Ok((bucket, Some(path)));
+ } else {
+ return Ok((bucket, None));
+ }
+ }
+
+ let (bucket, key) = match path.find('/') {
+ Some(i) => {
+ let key = &path[i + 1..];
+ if !key.is_empty() {
+ (&path[..i], Some(key))
+ } else {
+ (&path[..i], None)
+ }
+ }
+ None => (path, None),
+ };
+ if bucket.is_empty() {
+ return Err(Error::bad_request("No bucket specified"));
+ }
+ Ok((bucket, key))
+}
+
+const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
+
+/// Compute the key after the prefix
+pub fn key_after_prefix(pfx: &str) -> Option<String> {
+ let mut next = pfx.to_string();
+ while !next.is_empty() {
+ let tail = next.pop().unwrap();
+ if tail >= char::MAX {
+ continue;
+ }
+
+ // Circumvent a limitation of RangeFrom that overflow earlier than needed
+ // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
+ let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
+ char::MAX
+ } else {
+ (tail..).nth(1).unwrap()
+ };
+
+ next.push(new_tail);
+ return Some(next);
+ }
+
+ None
+}
+
+pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+}
+
+pub fn json_ok_response<T: Serialize>(res: &T) -> Result<Response<Body>, Error> {
+ let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?;
+ Ok(Response::builder()
+ .status(hyper::StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, "application/json")
+ .body(Body::from(resp_json))?)
+}
+
#[cfg(test)]
mod tests {
use super::*;
#[test]
+ fn parse_bucket_containing_a_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?;
+ assert_eq!(bucket, "my_bucket");
+ assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
+ Ok(())
+ }
+
+ #[test]
+ fn parse_bucket_containing_no_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("/my_bucket/", None)?;
+ assert_eq!(bucket, "my_bucket");
+ assert!(key.is_none());
+ let (bucket, key) = parse_bucket_key("/my_bucket", None)?;
+ assert_eq!(bucket, "my_bucket");
+ assert!(key.is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn parse_bucket_containing_no_bucket() {
+ let parsed = parse_bucket_key("", None);
+ assert!(parsed.is_err());
+ let parsed = parse_bucket_key("/", None);
+ assert!(parsed.is_err());
+ let parsed = parse_bucket_key("////", None);
+ assert!(parsed.is_err());
+ }
+
+ #[test]
+ fn parse_bucket_with_vhost_and_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?;
+ assert_eq!(bucket, "my-bucket");
+ assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
+ Ok(())
+ }
+
+ #[test]
+ fn parse_bucket_with_vhost_no_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?;
+ assert_eq!(bucket, "my-bucket");
+ assert!(key.is_none());
+ let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?;
+ assert_eq!(bucket, "my-bucket");
+ assert!(key.is_none());
+ Ok(())
+ }
+
+ #[test]
fn authority_to_host_with_port() -> Result<(), Error> {
let domain = authority_to_host("[::1]:3902")?;
assert_eq!(domain, "[::1]");
@@ -111,4 +251,47 @@ mod tests {
assert_eq!(host_to_bucket("not-garage.tld", "garage.tld"), None);
assert_eq!(host_to_bucket("not-garage.tld", ".garage.tld"), None);
}
+
+ #[test]
+ fn test_key_after_prefix() {
+ use std::iter::FromIterator;
+
+ assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
+ assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
+ assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭");
+ assert_eq!(
+ key_after_prefix("􏿽").unwrap().as_str(),
+ String::from(char::from_u32(0x10FFFE).unwrap())
+ );
+
+ // When the last character is the biggest UTF8 char
+ let a = String::from_iter(['a', char::MAX].iter());
+ assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
+
+ // When all characters are the biggest UTF8 char
+ let b = String::from_iter([char::MAX; 3].iter());
+ assert!(key_after_prefix(b.as_str()).is_none());
+
+ // Check utf8 surrogates
+ let c = String::from('\u{D7FF}');
+ assert_eq!(
+ key_after_prefix(c.as_str()).unwrap().as_str(),
+ String::from('\u{E000}')
+ );
+
+ // Check the character before the biggest one
+ let d = String::from('\u{10FFFE}');
+ assert_eq!(
+ key_after_prefix(d.as_str()).unwrap().as_str(),
+ String::from(char::MAX)
+ );
+ }
+}
+
+#[derive(Serialize)]
+pub(crate) struct CustomApiErrorBody {
+ pub(crate) code: String,
+ pub(crate) message: String,
+ pub(crate) region: String,
+ pub(crate) path: String,
}
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
new file mode 100644
index 00000000..084867b5
--- /dev/null
+++ b/src/api/k2v/api_server.rs
@@ -0,0 +1,190 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+
+use crate::generic_server::*;
+use crate::k2v::error::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::k2v::batch::*;
+use crate::k2v::index::*;
+use crate::k2v::item::*;
+use crate::k2v::router::Endpoint;
+use crate::s3::cors::*;
+
+pub struct K2VApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct K2VApiEndpoint {
+ bucket_name: String,
+ endpoint: Endpoint,
+}
+
+impl K2VApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ bind_addr: SocketAddr,
+ s3_region: String,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ ApiServer::new(s3_region, K2VApiServer { garage })
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ }
+}
+
+#[async_trait]
+impl ApiHandler for K2VApiServer {
+ const API_NAME: &'static str = "k2v";
+ const API_NAME_DISPLAY: &'static str = "K2V";
+
+ type Endpoint = K2VApiEndpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
+ let (endpoint, bucket_name) = Endpoint::from_request(req)?;
+
+ Ok(K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: K2VApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // The OPTIONS method is procesed early, before we even check for an API key
+ if let Endpoint::Options = endpoint {
+ return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
+ .await
+ .ok_or_bad_request("Error handling OPTIONS")?);
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
+ let api_key = api_key
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "k2v",
+ )?;
+
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::forbidden("Operation is not allowed for this key."));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
+ .ok_or_internal_error("Error looking up CORS rule")?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::DeleteItem {
+ partition_key,
+ sort_key,
+ } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::InsertItem {
+ partition_key,
+ sort_key,
+ } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::ReadItem {
+ partition_key,
+ sort_key,
+ } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::PollItem {
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ } => {
+ handle_poll_item(
+ garage,
+ &req,
+ bucket_id,
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ )
+ .await
+ }
+ Endpoint::ReadIndex {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
+ Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
+ Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
+ Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ Endpoint::Options => unreachable!(),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for K2VApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone()));
+ }
+}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
new file mode 100644
index 00000000..db9901cf
--- /dev/null
+++ b/src/api/k2v/batch.rs
@@ -0,0 +1,363 @@
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::{EnumerationOrder, TableSchema};
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::helpers::*;
+use crate::k2v::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_insert_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
+
+ let mut items2 = vec![];
+ for it in items {
+ let ct = it
+ .ct
+ .map(|s| CausalContext::parse(&s))
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+ let v = match it.v {
+ Some(vs) => {
+ DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
+ }
+ None => DvvsValue::Deleted,
+ };
+ items2.push((it.pk, it.sk, ct, v));
+ }
+
+ garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_read_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<ReadBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_read_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: ReadBatchQuery,
+) -> Result<ReadBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: !query.tombstones,
+ conflicts_only: query.conflicts_only,
+ };
+
+ let (items, more, next_start) = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
+ return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true."));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None),
+ None => (vec![], false, None),
+ }
+ } else {
+ let (items, more, next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ query.limit,
+ Some(filter),
+ EnumerationOrder::from_reverse(query.reverse),
+ )
+ .await?;
+
+ let items = items
+ .into_iter()
+ .map(ReadBatchResponseItem::from)
+ .collect::<Vec<_>>();
+
+ (items, more, next_start)
+ };
+
+ Ok(ReadBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ limit: query.limit,
+ reverse: query.reverse,
+ single_item: query.single_item,
+ conflicts_only: query.conflicts_only,
+ tombstones: query.tombstones,
+ items,
+ more,
+ next_start,
+ })
+}
+
+pub async fn handle_delete_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<DeleteBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_delete_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: DeleteBatchQuery,
+) -> Result<DeleteBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: true,
+ conflicts_only: false,
+ };
+
+ let deleted_items = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() {
+ return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true."));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => {
+ let cc = i.causal_context();
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ .await?;
+ 1
+ }
+ None => 0,
+ }
+ } else {
+ let (items, more, _next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ None,
+ Some(filter),
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ assert!(!more);
+
+ // TODO delete items
+ let items = items
+ .into_iter()
+ .map(|i| {
+ let cc = i.causal_context();
+ (
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ })
+ .collect::<Vec<_>>();
+ let n = items.len();
+
+ garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+
+ n
+ };
+
+ Ok(DeleteBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ single_item: query.single_item,
+ deleted_items,
+ })
+}
+
+#[derive(Deserialize)]
+struct InsertBatchItem {
+ pk: String,
+ sk: String,
+ ct: Option<String>,
+ v: Option<String>,
+}
+
+#[derive(Deserialize)]
+struct ReadBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ limit: Option<u64>,
+ #[serde(default)]
+ reverse: bool,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+ #[serde(default, rename = "conflictsOnly")]
+ conflicts_only: bool,
+ #[serde(default)]
+ tombstones: bool,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+ #[serde(rename = "conflictsOnly")]
+ conflicts_only: bool,
+ tombstones: bool,
+
+ items: Vec<ReadBatchResponseItem>,
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponseItem {
+ sk: String,
+ ct: String,
+ v: Vec<Option<String>>,
+}
+
+impl ReadBatchResponseItem {
+ fn from(i: K2VItem) -> Self {
+ let ct = i.causal_context().serialize();
+ let v = i
+ .values()
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Value(x) => Some(base64::encode(x)),
+ DvvsValue::Deleted => None,
+ })
+ .collect::<Vec<_>>();
+ Self {
+ sk: i.sort_key,
+ ct,
+ v,
+ }
+ }
+}
+
+#[derive(Deserialize)]
+struct DeleteBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+}
+
+#[derive(Serialize)]
+struct DeleteBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+
+ #[serde(rename = "deletedItems")]
+ deleted_items: usize,
+}
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
new file mode 100644
index 00000000..42491466
--- /dev/null
+++ b/src/api/k2v/error.rs
@@ -0,0 +1,135 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+use crate::signature::error::Error as SignatureError;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// Authorization Header Malformed
+ #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ AuthorizationHeaderMalformed(String),
+
+ /// The object requested don't exists
+ #[error(display = "Key not found")]
+ NoSuchKey,
+
+ /// Some base64 encoded data was badly encoded
+ #[error(display = "Invalid base64: {}", _0)]
+ InvalidBase64(#[error(source)] base64::DecodeError),
+
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+
+ /// The client asked for an invalid return format (invalid Accept header)
+ #[error(display = "Not acceptable: {}", _0)]
+ NotAcceptable(String),
+
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
+ #[error(display = "Invalid UTF-8: {}", _0)]
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ e => Self::Common(CommonError::BadRequest(format!("{}", e))),
+ }
+ }
+}
+
+impl From<SignatureError> for Error {
+ fn from(err: SignatureError) -> Self {
+ match err {
+ SignatureError::Common(c) => Self::Common(c),
+ SignatureError::AuthorizationHeaderMalformed(c) => {
+ Self::AuthorizationHeaderMalformed(c)
+ }
+ SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
+ SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
+ }
+ }
+}
+
+impl Error {
+ /// This returns a keyword for the corresponding error.
+ /// Here, these keywords are not necessarily those from AWS S3,
+ /// as we are building a custom API
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchKey => "NoSuchKey",
+ Error::NotAcceptable(_) => "NotAcceptable",
+ Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
+ Error::InvalidBase64(_) => "InvalidBase64",
+ Error::InvalidHeader(_) => "InvalidHeaderValue",
+ Error::InvalidUtf8Str(_) => "InvalidUtf8String",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchKey => StatusCode::NOT_FOUND,
+ Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
+ Error::AuthorizationHeaderMalformed(_)
+ | Error::InvalidBase64(_)
+ | Error::InvalidHeader(_)
+ | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
+ use hyper::header;
+ header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
new file mode 100644
index 00000000..210950bf
--- /dev/null
+++ b/src/api/k2v/index.rs
@@ -0,0 +1,100 @@
+use std::sync::Arc;
+
+use hyper::{Body, Response, StatusCode};
+use serde::Serialize;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_rpc::ring::Ring;
+use garage_table::util::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+
+use crate::k2v::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_read_index(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+) -> Result<Response<Body>, Error> {
+ let reverse = reverse.unwrap_or(false);
+
+ let ring: Arc<Ring> = garage.system.ring.borrow().clone();
+
+ let (partition_keys, more, next_start) = read_range(
+ &garage.k2v.counter_table.table,
+ &bucket_id,
+ &prefix,
+ &start,
+ &end,
+ limit,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ EnumerationOrder::from_reverse(reverse),
+ )
+ .await?;
+
+ let s_entries = ENTRIES.to_string();
+ let s_conflicts = CONFLICTS.to_string();
+ let s_values = VALUES.to_string();
+ let s_bytes = BYTES.to_string();
+
+ let resp = ReadIndexResponse {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ partition_keys: partition_keys
+ .into_iter()
+ .map(|part| {
+ let vals = part.filtered_values(&ring);
+ ReadIndexResponseEntry {
+ pk: part.sk,
+ entries: *vals.get(&s_entries).unwrap_or(&0),
+ conflicts: *vals.get(&s_conflicts).unwrap_or(&0),
+ values: *vals.get(&s_values).unwrap_or(&0),
+ bytes: *vals.get(&s_bytes).unwrap_or(&0),
+ }
+ })
+ .collect::<Vec<_>>(),
+ more,
+ next_start,
+ };
+
+ let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponse {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+
+ #[serde(rename = "partitionKeys")]
+ partition_keys: Vec<ReadIndexResponseEntry>,
+
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponseEntry {
+ pk: String,
+ entries: i64,
+ conflicts: i64,
+ values: i64,
+ bytes: i64,
+}
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
new file mode 100644
index 00000000..836d386f
--- /dev/null
+++ b/src/api/k2v/item.rs
@@ -0,0 +1,230 @@
+use std::sync::Arc;
+
+use http::header;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_util::data::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::k2v::error::*;
+
+pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+
+pub enum ReturnFormat {
+ Json,
+ Binary,
+ Either,
+}
+
+impl ReturnFormat {
+ pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ let accept = match req.headers().get(header::ACCEPT) {
+ Some(a) => a.to_str()?,
+ None => return Ok(Self::Json),
+ };
+
+ let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>();
+ let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*");
+ let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*");
+
+ match (accept_json, accept_binary) {
+ (true, true) => Ok(Self::Either),
+ (true, false) => Ok(Self::Json),
+ (false, true) => Ok(Self::Binary),
+ (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())),
+ }
+ }
+
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ let vals = item.values();
+
+ if vals.is_empty() {
+ return Err(Error::NoSuchKey);
+ }
+
+ let ct = item.causal_context().serialize();
+ match self {
+ Self::Binary if vals.len() > 1 => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .status(StatusCode::CONFLICT)
+ .body(Body::empty())?),
+ Self::Binary => {
+ assert!(vals.len() == 1);
+ Self::make_binary_response(ct, vals[0])
+ }
+ Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]),
+ _ => Self::make_json_response(ct, &vals[..]),
+ }
+ }
+
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ match v {
+ DvvsValue::Deleted => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?),
+ DvvsValue::Value(v) => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::OK)
+ .body(Body::from(v.to_vec()))?),
+ }
+ }
+
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ let items = v
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => serde_json::Value::Null,
+ DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)),
+ })
+ .collect::<Vec<_>>();
+ let json_body =
+ serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?;
+ Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/json")
+ .status(StatusCode::OK)
+ .body(Body::from(json_body))?)
+ }
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_read_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &String,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let item = garage
+ .k2v
+ .item_table
+ .get(
+ &K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ sort_key,
+ )
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ format.make_response(&item)
+}
+
+pub async fn handle_insert_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let value = DvvsValue::Value(body.to_vec());
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_delete_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let value = DvvsValue::Deleted;
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_poll_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout_secs: Option<u64>,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let causal_context =
+ CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
+
+ let item = garage
+ .k2v
+ .rpc
+ .poll(
+ bucket_id,
+ partition_key,
+ sort_key,
+ causal_context,
+ timeout_secs.unwrap_or(300) * 1000,
+ )
+ .await?;
+
+ if let Some(item) = item {
+ format.make_response(&item)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs
new file mode 100644
index 00000000..b6a8c5cf
--- /dev/null
+++ b/src/api/k2v/mod.rs
@@ -0,0 +1,9 @@
+pub mod api_server;
+mod error;
+mod router;
+
+mod batch;
+mod index;
+mod item;
+
+mod range;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
new file mode 100644
index 00000000..bb9d3be5
--- /dev/null
+++ b/src/api/k2v/range.rs
@@ -0,0 +1,100 @@
+//! Utility module for retrieving ranges of items in Garage tables
+//! Implements parameters (prefix, start, end, limit) as specified
+//! for endpoints ReadIndex, ReadBatch and DeleteBatch
+
+use std::sync::Arc;
+
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::helpers::key_after_prefix;
+use crate::k2v::error::*;
+
+/// Read range in a Garage table.
+/// Returns (entries, more?, nextStart)
+#[allow(clippy::too_many_arguments)]
+pub(crate) async fn read_range<F>(
+ table: &Arc<Table<F, TableShardedReplication>>,
+ partition_key: &F::P,
+ prefix: &Option<String>,
+ start: &Option<String>,
+ end: &Option<String>,
+ limit: Option<u64>,
+ filter: Option<F::Filter>,
+ enumeration_order: EnumerationOrder,
+) -> Result<(Vec<F::E>, bool, Option<String>), Error>
+where
+ F: TableSchema<S = String> + 'static,
+{
+ let (mut start, mut start_ignore) = match (prefix, start) {
+ (None, None) => (None, false),
+ (None, Some(s)) => (Some(s.clone()), false),
+ (Some(p), Some(s)) => {
+ if !s.starts_with(p) {
+ return Err(Error::bad_request(format!(
+ "Start key '{}' does not start with prefix '{}'",
+ s, p
+ )));
+ }
+ (Some(s.clone()), false)
+ }
+ (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
+ let start = key_after_prefix(p)
+ .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
+ (Some(start), true)
+ }
+ (Some(p), None) => (Some(p.clone()), false),
+ };
+
+ let mut entries = vec![];
+ loop {
+ let n_get = std::cmp::min(
+ 1000,
+ limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
+ );
+ let get_ret = table
+ .get_range(
+ partition_key,
+ start.clone(),
+ filter.clone(),
+ n_get,
+ enumeration_order,
+ )
+ .await?;
+
+ let get_ret_len = get_ret.len();
+
+ for entry in get_ret {
+ if start_ignore && Some(entry.sort_key()) == start.as_ref() {
+ continue;
+ }
+ if let Some(p) = prefix {
+ if !entry.sort_key().starts_with(p) {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(e) = end {
+ let is_finished = match enumeration_order {
+ EnumerationOrder::Forward => entry.sort_key() >= e,
+ EnumerationOrder::Reverse => entry.sort_key() <= e,
+ };
+ if is_finished {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(l) = limit {
+ if entries.len() >= l as usize {
+ return Ok((entries, true, Some(entry.sort_key().clone())));
+ }
+ }
+ entries.push(entry);
+ }
+
+ if get_ret_len < n_get {
+ return Ok((entries, false, None));
+ }
+
+ start = Some(entries.last().unwrap().sort_key().clone());
+ start_ignore = true;
+ }
+}
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
new file mode 100644
index 00000000..50e6965b
--- /dev/null
+++ b/src/api/k2v/router.rs
@@ -0,0 +1,252 @@
+use crate::k2v::error::*;
+
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
+
+router_match! {@func
+
+
+/// List of all K2V API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ DeleteBatch {
+ },
+ DeleteItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ InsertBatch {
+ },
+ InsertItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ Options,
+ PollItem {
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout: Option<u64>,
+ },
+ ReadBatch {
+ },
+ ReadIndex {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+ },
+ ReadItem {
+ partition_key: String,
+ sort_key: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> {
+ let uri = req.uri();
+ let path = uri.path().trim_start_matches('/');
+ let query = uri.query();
+
+ let (bucket, partition_key) = path
+ .split_once('/')
+ .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
+ .unwrap_or((path.to_owned(), ""));
+
+ if bucket.is_empty() {
+ return Err(Error::bad_request("Missing bucket name"));
+ }
+
+ if *req.method() == Method::OPTIONS {
+ return Ok((Self::Options, bucket));
+ }
+
+ let partition_key = percent_encoding::percent_decode_str(partition_key)
+ .decode_utf8()?
+ .into_owned();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let method_search = Method::from_bytes(b"SEARCH").unwrap();
+ let res = match *req.method() {
+ Method::GET => Self::from_get(partition_key, &mut query)?,
+ //&Method::HEAD => Self::from_head(partition_key, &mut query)?,
+ Method::POST => Self::from_post(partition_key, &mut query)?,
+ Method::PUT => Self::from_put(partition_key, &mut query)?,
+ Method::DELETE => Self::from_delete(partition_key, &mut query)?,
+ _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
+ _ => return Err(Error::bad_request("Unknown method")),
+ };
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+ Ok((res, bucket))
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a GET.
+ fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout),
+ EMPTY => ReadItem (query::sort_key),
+ ],
+ no_key: [
+ EMPTY => ReadIndex (query_opt::prefix, query_opt::start, query_opt::end, opt_parse::limit, opt_parse::reverse),
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a SEARCH.
+ fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => ReadBatch,
+ ]
+ }
+ }
+
+ /*
+ /// Determine which endpoint a request is for, knowing it is a HEAD.
+ fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id),
+ ],
+ no_key: [
+ EMPTY => HeadBucket,
+ ]
+ }
+ }
+ */
+
+ /// Determine which endpoint a request is for, knowing it is a POST.
+ fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => InsertBatch,
+ DELETE => DeleteBatch,
+ SEARCH => ReadBatch,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a PUT.
+ fn from_put(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => InsertItem (query::sort_key),
+
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a DELETE.
+ fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => DeleteItem (query::sort_key),
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Get the partition key the request target. Returns None for requests which don't use a partition key.
+ #[allow(dead_code)]
+ pub fn get_partition_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ partition_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the sort key the request target. Returns None for requests which don't use a sort key.
+ #[allow(dead_code)]
+ pub fn get_sort_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ sort_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ let readonly = router_match! {
+ @match
+ self,
+ [
+ PollItem,
+ ReadBatch,
+ ReadIndex,
+ ReadItem,
+ ]
+ };
+ if readonly {
+ Authorization::Read
+ } else {
+ Authorization::Write
+ }
+ }
+}
+
+// parameter name => struct field
+generateQueryParameters! {
+ "prefix" => prefix,
+ "start" => start,
+ "causality_token" => causality_token,
+ "end" => end,
+ "limit" => limit,
+ "reverse" => reverse,
+ "sort_key" => sort_key,
+ "timeout" => timeout
+}
+
+mod keywords {
+ //! This module contain all query parameters with no associated value
+ //! used to differentiate endpoints.
+ pub const EMPTY: &str = "";
+
+ pub const DELETE: &str = "delete";
+ pub const SEARCH: &str = "search";
+}
diff --git a/src/api/lib.rs b/src/api/lib.rs
index de60ec53..370dfd7a 100644
--- a/src/api/lib.rs
+++ b/src/api/lib.rs
@@ -2,26 +2,16 @@
#[macro_use]
extern crate tracing;
-pub mod error;
-pub use error::Error;
+pub mod common_error;
mod encoding;
-
-mod api_server;
-pub use api_server::run_api_server;
-
+pub mod generic_server;
+pub mod helpers;
+mod router_macros;
/// This mode is public only to help testing. Don't expect stability here
pub mod signature;
-pub mod helpers;
-mod s3_bucket;
-mod s3_copy;
-pub mod s3_cors;
-mod s3_delete;
-pub mod s3_get;
-mod s3_list;
-mod s3_post_object;
-mod s3_put;
-mod s3_router;
-mod s3_website;
-mod s3_xml;
+pub mod admin;
+#[cfg(feature = "k2v")]
+pub mod k2v;
+pub mod s3;
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs
new file mode 100644
index 00000000..4c593300
--- /dev/null
+++ b/src/api/router_macros.rs
@@ -0,0 +1,213 @@
+/// This macro is used to generate very repetitive match {} blocks in this module
+/// It is _not_ made to be used anywhere else
+macro_rules! router_match {
+ (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{
+ // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] }
+ // returns true if the variant was one of the listed variants, false otherwise.
+ use Endpoint::*;
+ match $enum {
+ $(
+ $endpoint { .. } => true,
+ )*
+ _ => false
+ }
+ }};
+ (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{
+ // usage: router_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] }
+ // returns Some(field_value), or None if the variant was not one of the listed variants.
+ use Endpoint::*;
+ match $enum {
+ $(
+ $endpoint {$param, ..} => Some($param),
+ )*
+ _ => None
+ }
+ }};
+ (@gen_path_parser ($method:expr, $reqpath:expr, $query:expr)
+ [
+ $($meth:ident $path:pat $(if $required:ident)? => $api:ident $(($($conv:ident :: $param:ident),*))?,)*
+ ]) => {{
+ {
+ use Endpoint::*;
+ match ($method, $reqpath) {
+ $(
+ (&Method::$meth, $path) if true $(&& $query.$required.is_some())? => $api {
+ $($(
+ $param: router_match!(@@parse_param $query, $conv, $param),
+ )*)?
+ },
+ )*
+ (m, p) => {
+ return Err(Error::bad_request(format!(
+ "Unknown API endpoint: {} {}",
+ m, p
+ )))
+ }
+ }
+ }
+ }};
+ (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr),
+ key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
+ no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
+ // usage: router_match {@gen_parser (keyword, key, query, header),
+ // key: [
+ // SOME_KEYWORD => VariantWithKey,
+ // ...
+ // ],
+ // no_key: [
+ // SOME_KEYWORD => VariantWithoutKey,
+ // ...
+ // ]
+ // }
+ // See in from_{method} for more detailed usage.
+ use Endpoint::*;
+ use keywords::*;
+ match ($keyword, !$key.is_empty()){
+ $(
+ ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k {
+ $key,
+ $($(
+ $param_k: router_match!(@@parse_param $query, $conv_k, $param_k),
+ )*)?
+ }),
+ )*
+ $(
+ ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk {
+ $($(
+ $param_nk: router_match!(@@parse_param $query, $conv_nk, $param_nk),
+ )*)?
+ }),
+ )*
+ (kw, _) => Err(Error::bad_request(format!("Invalid endpoint: {}", kw)))
+ }
+ }};
+
+ (@@parse_param $query:expr, query_opt, $param:ident) => {{
+ // extract optional query parameter
+ $query.$param.take().map(|param| param.into_owned())
+ }};
+ (@@parse_param $query:expr, query, $param:ident) => {{
+ // extract mendatory query parameter
+ $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned()
+ }};
+ (@@parse_param $query:expr, opt_parse, $param:ident) => {{
+ // extract and parse optional query parameter
+ // missing parameter is file, however parse error is reported as an error
+ $query.$param
+ .take()
+ .map(|param| param.parse())
+ .transpose()
+ .map_err(|_| Error::bad_request("Failed to parse query parameter"))?
+ }};
+ (@@parse_param $query:expr, parse, $param:ident) => {{
+ // extract and parse mandatory query parameter
+ // both missing and un-parseable parameters are reported as errors
+ $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?
+ .parse()
+ .map_err(|_| Error::bad_request("Failed to parse query parameter"))?
+ }};
+ (@func
+ $(#[$doc:meta])*
+ pub enum Endpoint {
+ $(
+ $(#[$outer:meta])*
+ $variant:ident $({
+ $($name:ident: $ty:ty,)*
+ })?,
+ )*
+ }) => {
+ $(#[$doc])*
+ pub enum Endpoint {
+ $(
+ $(#[$outer])*
+ $variant $({
+ $($name: $ty, )*
+ })?,
+ )*
+ }
+ impl Endpoint {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)*
+ }
+ }
+ }
+ };
+ (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => {
+ $($then)*
+ };
+ (@if () then ($($then:tt)*) else ($($else:tt)*)) => {
+ $($else)*
+ };
+}
+
+/// This macro is used to generate part of the code in this module. It must be called only one, and
+/// is useless outside of this module.
+macro_rules! generateQueryParameters {
+ ( $($rest:expr => $name:ident),* ) => {
+ /// Struct containing all query parameters used in endpoints. Think of it as an HashMap,
+ /// but with keys statically known.
+ #[derive(Debug, Default)]
+ struct QueryParameters<'a> {
+ keyword: Option<Cow<'a, str>>,
+ $(
+ $name: Option<Cow<'a, str>>,
+ )*
+ }
+
+ impl<'a> QueryParameters<'a> {
+ /// Build this struct from the query part of an URI.
+ fn from_query(query: &'a str) -> Result<Self, Error> {
+ let mut res: Self = Default::default();
+ for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
+ let repeated = match k.as_ref() {
+ $(
+ $rest => if !v.is_empty() {
+ res.$name.replace(v).is_some()
+ } else {
+ false
+ },
+ )*
+ _ => {
+ if k.starts_with("response-") || k.starts_with("X-Amz-") {
+ false
+ } else if v.as_ref().is_empty() {
+ if res.keyword.replace(k).is_some() {
+ return Err(Error::bad_request("Multiple keywords"));
+ }
+ continue;
+ } else {
+ debug!("Received an unknown query parameter: '{}'", k);
+ false
+ }
+ }
+ };
+ if repeated {
+ return Err(Error::bad_request(format!(
+ "Query parameter repeated: '{}'",
+ k
+ )));
+ }
+ }
+ Ok(res)
+ }
+
+ /// Get an error message in case not all parameters where used when extracting them to
+ /// build an Enpoint variant
+ fn nonempty_message(&self) -> Option<&str> {
+ if self.keyword.is_some() {
+ Some("Keyword not used")
+ } $(
+ else if self.$name.is_some() {
+ Some(concat!("'", $rest, "'"))
+ }
+ )* else {
+ None
+ }
+ }
+ }
+ }
+}
+
+pub(crate) use generateQueryParameters;
+pub(crate) use router_match;
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
new file mode 100644
index 00000000..27837297
--- /dev/null
+++ b/src/api/s3/api_server.rs
@@ -0,0 +1,390 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::header;
+use hyper::{Body, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+
+use crate::generic_server::*;
+use crate::s3::error::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::s3::bucket::*;
+use crate::s3::copy::*;
+use crate::s3::cors::*;
+use crate::s3::delete::*;
+use crate::s3::get::*;
+use crate::s3::list::*;
+use crate::s3::post_object::handle_post_object;
+use crate::s3::put::*;
+use crate::s3::router::Endpoint;
+use crate::s3::website::*;
+
+pub struct S3ApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct S3ApiEndpoint {
+ bucket_name: Option<String>,
+ endpoint: Endpoint,
+}
+
+impl S3ApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ addr: SocketAddr,
+ s3_region: String,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ ApiServer::new(s3_region, S3ApiServer { garage })
+ .run_server(addr, shutdown_signal)
+ .await
+ }
+
+ async fn handle_request_without_bucket(
+ &self,
+ _req: Request<Body>,
+ api_key: Key,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ match endpoint {
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ }
+ }
+}
+
+#[async_trait]
+impl ApiHandler for S3ApiServer {
+ const API_NAME: &'static str = "s3";
+ const API_NAME_DISPLAY: &'static str = "S3";
+
+ type Endpoint = S3ApiEndpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ let authority = req
+ .headers()
+ .get(header::HOST)
+ .ok_or_bad_request("Host header required")?
+ .to_str()?;
+
+ let host = authority_to_host(authority)?;
+
+ let bucket_name = self
+ .garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .and_then(|root_domain| host_to_bucket(&host, root_domain));
+
+ let (endpoint, bucket_name) =
+ Endpoint::from_request(req, bucket_name.map(ToOwned::to_owned))?;
+
+ Ok(S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: S3ApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // Some endpoints are processed early, before we even check for an API key
+ if let Endpoint::PostObject = endpoint {
+ return handle_post_object(garage, req, bucket_name.unwrap()).await;
+ }
+ if let Endpoint::Options = endpoint {
+ return handle_options_s3api(garage, &req, bucket_name).await;
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
+ let api_key = api_key
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "s3",
+ )?;
+
+ let bucket_name = match bucket_name {
+ None => {
+ return self
+ .handle_request_without_bucket(req, api_key, endpoint)
+ .await
+ }
+ Some(bucket) => bucket.to_string(),
+ };
+
+ // Special code path for CreateBucket API endpoint
+ if let Endpoint::CreateBucket {} = endpoint {
+ return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
+ }
+
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::forbidden("Operation is not allowed for this key."));
+ }
+
+ let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?;
+
+ let resp = match endpoint {
+ Endpoint::HeadObject {
+ key, part_number, ..
+ } => handle_head(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::GetObject {
+ key, part_number, ..
+ } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::UploadPart {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_put_part(
+ garage,
+ req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CopyObject { key } => {
+ handle_copy(garage, &api_key, &req, bucket_id, &key).await
+ }
+ Endpoint::UploadPartCopy {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_upload_part_copy(
+ garage,
+ &api_key,
+ &req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ )
+ .await
+ }
+ Endpoint::PutObject { key } => {
+ handle_put(garage, req, &bucket, &key, content_sha256).await
+ }
+ Endpoint::AbortMultipartUpload { key, upload_id } => {
+ handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
+ }
+ Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
+ Endpoint::CreateMultipartUpload { key } => {
+ handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
+ }
+ Endpoint::CompleteMultipartUpload { key, upload_id } => {
+ handle_complete_multipart_upload(
+ garage,
+ req,
+ &bucket_name,
+ &bucket,
+ &key,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CreateBucket {} => unreachable!(),
+ Endpoint::HeadBucket {} => {
+ let empty_body: Body = Body::from(vec![]);
+ let response = Response::builder().body(empty_body).unwrap();
+ Ok(response)
+ }
+ Endpoint::DeleteBucket {} => {
+ handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
+ }
+ Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
+ Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
+ Endpoint::ListObjects {
+ delimiter,
+ encoding_type,
+ marker,
+ max_keys,
+ prefix,
+ } => {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ is_v2: false,
+ marker,
+ continuation_token: None,
+ start_after: None,
+ },
+ )
+ .await
+ }
+ Endpoint::ListObjectsV2 {
+ delimiter,
+ encoding_type,
+ max_keys,
+ prefix,
+ continuation_token,
+ start_after,
+ list_type,
+ ..
+ } => {
+ if list_type == "2" {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ prefix: prefix.unwrap_or_default(),
+ },
+ is_v2: true,
+ marker: None,
+ continuation_token,
+ start_after,
+ },
+ )
+ .await
+ } else {
+ Err(Error::bad_request(format!(
+ "Invalid endpoint: list-type={}",
+ list_type
+ )))
+ }
+ }
+ Endpoint::ListMultipartUploads {
+ delimiter,
+ encoding_type,
+ key_marker,
+ max_uploads,
+ prefix,
+ upload_id_marker,
+ } => {
+ handle_list_multipart_upload(
+ garage,
+ &ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ key_marker,
+ upload_id_marker,
+ },
+ )
+ .await
+ }
+ Endpoint::ListParts {
+ key,
+ max_parts,
+ part_number_marker,
+ upload_id,
+ } => {
+ handle_list_parts(
+ garage,
+ &ListPartsQuery {
+ bucket_name,
+ bucket_id,
+ key,
+ upload_id,
+ part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
+ max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ },
+ )
+ .await
+ }
+ Endpoint::DeleteObjects {} => {
+ handle_delete_objects(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
+ Endpoint::PutBucketWebsite {} => {
+ handle_put_website(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
+ Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
+ Endpoint::PutBucketCors {} => {
+ handle_put_cors(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for S3ApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new(
+ "bucket",
+ self.bucket_name.clone().unwrap_or_default(),
+ ));
+ }
+}
diff --git a/src/api/s3_bucket.rs b/src/api/s3/bucket.rs
index 8a5407d3..3ac6a6ec 100644
--- a/src/api/s3_bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -7,15 +7,15 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use garage_model::object_table::ObjectFilter;
use garage_model::permission::BucketKeyPerm;
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
-use crate::error::*;
-use crate::s3_xml;
+use crate::common_error::CommonError;
+use crate::s3::error::*;
+use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
@@ -130,7 +130,7 @@ pub async fn handle_create_bucket(
if let Some(location_constraint) = cmd {
if location_constraint != garage.config.s3_api.s3_region {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`",
location_constraint,
garage.config.s3_api.s3_region
@@ -158,12 +158,12 @@ pub async fn handle_create_bucket(
// otherwise return a forbidden error.
let kp = api_key.bucket_permissions(&bucket_id);
if !(kp.allow_write || kp.allow_owner) {
- return Err(Error::BucketAlreadyExists);
+ return Err(CommonError::BucketAlreadyExists.into());
}
} else {
// Create the bucket!
if !is_valid_bucket_name(&bucket_name) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"{}: {}",
bucket_name, INVALID_BUCKET_NAME_MESSAGE
)));
@@ -228,12 +228,8 @@ pub async fn handle_delete_bucket(
// Delete bucket
// Check bucket is empty
- let objects = garage
- .object_table
- .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
- .await?;
- if !objects.is_empty() {
- return Err(Error::BucketNotEmpty);
+ if !garage.bucket_helper().is_bucket_empty(bucket_id).await? {
+ return Err(CommonError::BucketNotEmpty.into());
}
// --- done checking, now commit ---
@@ -299,7 +295,6 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
let mut ret = None;
for item in cbc.children() {
- println!("{:?}", item);
if item.has_tag_name("LocationConstraint") {
if ret != None {
return None;
diff --git a/src/api/s3_copy.rs b/src/api/s3/copy.rs
index fc4707e2..7eb6459d 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3/copy.rs
@@ -5,23 +5,26 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
+use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
+use garage_rpc::netapp::bytes_buf::BytesBuf;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
-use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
-use crate::api_server::{parse_bucket_key, resolve_bucket};
-use crate::error::*;
-use crate::s3_put::{decode_upload_id, get_headers};
-use crate::s3_xml::{self, xmlns_tag};
+use crate::helpers::parse_bucket_key;
+use crate::s3::error::*;
+use crate::s3::put::{decode_upload_id, get_headers};
+use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
@@ -201,8 +204,8 @@ pub async fn handle_upload_part_copy(
let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size)
.map_err(|e| (e, source_version_meta.size))?;
if ranges.len() != 1 {
- return Err(Error::BadRequest(
- "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(),
+ return Err(Error::bad_request(
+ "Invalid x-amz-copy-source-range header: exactly 1 range must be given",
));
} else {
ranges.pop().unwrap()
@@ -230,8 +233,8 @@ pub async fn handle_upload_part_copy(
// This is only for small files, we don't bother handling this.
// (in AWS UploadPartCopy works for parts at least 5MB which
// is never the case of an inline object)
- return Err(Error::BadRequest(
- "Source object is too small (minimum part size is 5Mb)".into(),
+ return Err(Error::bad_request(
+ "Source object is too small (minimum part size is 5Mb)",
));
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
@@ -250,7 +253,7 @@ pub async fn handle_upload_part_copy(
// Check this part number hasn't yet been uploaded
if let Some(dv) = dest_version {
if dv.has_part_number(part_number) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Part number {} has already been uploaded",
part_number
)));
@@ -305,13 +308,18 @@ pub async fn handle_upload_part_copy(
// if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again).
let garage2 = garage.clone();
+ let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
- .flat_map(|(block_hash, range_to_copy)| {
+ .enumerate()
+ .flat_map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
stream::once(async move {
- let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ let data = garage3
+ .block_manager
+ .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ .await?;
match range_to_copy {
- Some(r) => Ok((data[r].to_vec(), None)),
+ Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
}
})
@@ -413,10 +421,13 @@ async fn get_copy_source(
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
- let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?;
+ let source_bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&source_bucket.to_string(), api_key)
+ .await?;
if !api_key.allow_read(&source_bucket_id) {
- return Err(Error::Forbidden(format!(
+ return Err(Error::forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
)));
@@ -536,8 +547,8 @@ impl CopyPreconditionHeaders {
(None, None, None, Some(ims)) => v_date > *ims,
(None, None, None, None) => true,
_ => {
- return Err(Error::BadRequest(
- "Invalid combination of x-amz-copy-source-if-xxxxx headers".into(),
+ return Err(Error::bad_request(
+ "Invalid combination of x-amz-copy-source-if-xxxxx headers",
))
}
};
@@ -550,13 +561,13 @@ impl CopyPreconditionHeaders {
}
}
-type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
block_size: usize,
block_stream: Pin<Box<stream::Peekable<S>>>,
- buffer: Vec<u8>,
+ buffer: BytesBuf,
hash: Option<Hash>,
}
@@ -565,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
Self {
block_size,
block_stream,
- buffer: vec![],
+ buffer: BytesBuf::new(),
hash: None,
}
}
@@ -583,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block;
+ self.buffer.extend(next_block);
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -594,11 +605,11 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ Ok((self.buffer.take_all(), self.hash.take()))
}
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CopyObjectResult {
#[serde(rename = "LastModified")]
pub last_modified: s3_xml::Value,
@@ -606,7 +617,7 @@ pub struct CopyObjectResult {
pub etag: s3_xml::Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CopyPartResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -619,7 +630,7 @@ pub struct CopyPartResult {
#[cfg(test)]
mod tests {
use super::*;
- use crate::s3_xml::to_xml_with_header;
+ use crate::s3::xml::to_xml_with_header;
#[test]
fn copy_object_result() -> Result<(), Error> {
@@ -651,7 +662,6 @@ mod tests {
last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()),
etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()),
};
- println!("{}", to_xml_with_header(&v)?);
assert_eq!(to_xml_with_header(&v)?, expected_retval);
diff --git a/src/api/s3_cors.rs b/src/api/s3/cors.rs
index ab77e23a..c7273464 100644
--- a/src/api/s3_cors.rs
+++ b/src/api/s3/cors.rs
@@ -9,13 +9,12 @@ use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::error::*;
-use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::s3::error::*;
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
-use garage_table::*;
use garage_util::data::*;
pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
@@ -48,14 +47,11 @@ pub async fn handle_delete_cors(
bucket_id: Uuid,
) -> Result<Response<Body>, Error> {
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
param.cors_config.update(None);
garage.bucket_table.insert(&bucket).await?;
@@ -78,14 +74,11 @@ pub async fn handle_put_cors(
}
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
@@ -119,12 +112,7 @@ pub async fn handle_options_s3api(
let helper = garage.bucket_helper();
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
if let Some(id) = bucket_id {
- let bucket = garage
- .bucket_table
- .get(&EmptyKey, &id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
+ let bucket = garage.bucket_helper().get_existing_bucket(id).await?;
handle_options_for_bucket(req, &bucket)
} else {
// If there is a bucket name in the request, but that name
@@ -185,7 +173,7 @@ pub fn handle_options_for_bucket(
}
}
- Err(Error::Forbidden("This CORS request is not allowed.".into()))
+ Err(Error::forbidden("This CORS request is not allowed."))
}
pub fn find_matching_cors_rule<'a>(
diff --git a/src/api/s3_delete.rs b/src/api/s3/delete.rs
index b243d982..b337155f 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3/delete.rs
@@ -6,10 +6,10 @@ use garage_util::data::*;
use garage_util::time::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
+use garage_model::s3::object_table::*;
-use crate::error::*;
-use crate::s3_xml;
+use crate::s3::error::*;
+use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
async fn handle_delete_internal(
@@ -64,14 +64,13 @@ pub async fn handle_delete(
bucket_id: Uuid,
key: &str,
) -> Result<Response<Body>, Error> {
- let (_deleted_version, delete_marker_version) =
- handle_delete_internal(&garage, bucket_id, key).await?;
-
- Ok(Response::builder()
- .header("x-amz-version-id", hex::encode(delete_marker_version))
- .status(StatusCode::NO_CONTENT)
- .body(Body::from(vec![]))
- .unwrap())
+ match handle_delete_internal(&garage, bucket_id, key).await {
+ Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::from(vec![]))
+ .unwrap()),
+ Err(e) => Err(e),
+ }
}
pub async fn handle_delete_objects(
diff --git a/src/api/error.rs b/src/api/s3/error.rs
index f53ed1fd..67009d63 100644
--- a/src/api/error.rs
+++ b/src/api/s3/error.rs
@@ -2,34 +2,24 @@ use std::convert::TryInto;
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{HeaderMap, StatusCode};
+use hyper::{Body, HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
-use garage_util::error::Error as GarageError;
-use crate::s3_xml;
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::s3::xml as s3_xml;
+use crate::signature::error::Error as SignatureError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
- // Category: internal error
- /// Error related to deeper parts of Garage
- #[error(display = "Internal error: {}", _0)]
- InternalError(#[error(source)] GarageError),
-
- /// Error related to Hyper
- #[error(display = "Internal error (Hyper error): {}", _0)]
- Hyper(#[error(source)] hyper::Error),
-
- /// Error related to HTTP
- #[error(display = "Internal error (HTTP error): {}", _0)]
- Http(#[error(source)] http::Error),
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
// Category: cannot process
- /// No proper api key was used, or the signature was invalid
- #[error(display = "Forbidden: {}", _0)]
- Forbidden(String),
-
/// Authorization Header Malformed
#[error(display = "Authorization header malformed, expected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
@@ -38,22 +28,10 @@ pub enum Error {
#[error(display = "Key not found")]
NoSuchKey,
- /// The bucket requested don't exists
- #[error(display = "Bucket not found")]
- NoSuchBucket,
-
/// The multipart upload requested don't exists
#[error(display = "Upload not found")]
NoSuchUpload,
- /// Tried to create a bucket that already exist
- #[error(display = "Bucket already exists")]
- BucketAlreadyExists,
-
- /// Tried to delete a non-empty bucket
- #[error(display = "Tried to delete a non-empty bucket")]
- BucketNotEmpty,
-
/// Precondition failed (e.g. x-amz-copy-source-if-match)
#[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed,
@@ -80,10 +58,6 @@ pub enum Error {
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUtf8String(#[error(source)] std::string::FromUtf8Error),
- /// Some base64 encoded data was badly encoded
- #[error(display = "Invalid base64: {}", _0)]
- InvalidBase64(#[error(source)] base64::DecodeError),
-
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)]
InvalidXml(String),
@@ -96,15 +70,34 @@ pub enum Error {
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
- /// The client sent an invalid request
- #[error(display = "Bad request: {}", _0)]
- BadRequest(String),
-
/// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String),
}
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ e => Self::bad_request(format!("{}", e)),
+ }
+ }
+}
+
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
@@ -117,88 +110,71 @@ impl From<quick_xml::de::DeError> for Error {
}
}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
+impl From<SignatureError> for Error {
+ fn from(err: SignatureError) -> Self {
match err {
- HelperError::Internal(i) => Self::InternalError(i),
- HelperError::BadRequest(b) => Self::BadRequest(b),
+ SignatureError::Common(c) => Self::Common(c),
+ SignatureError::AuthorizationHeaderMalformed(c) => {
+ Self::AuthorizationHeaderMalformed(c)
+ }
+ SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
+ SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
impl From<multer::Error> for Error {
fn from(err: multer::Error) -> Self {
- Self::BadRequest(err.to_string())
+ Self::bad_request(err)
}
}
impl Error {
- /// Get the HTTP status code that best represents the meaning of the error for the client
- pub fn http_status_code(&self) -> StatusCode {
- match self {
- Error::NoSuchKey | Error::NoSuchBucket | Error::NoSuchUpload => StatusCode::NOT_FOUND,
- Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT,
- Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
- Error::Forbidden(_) => StatusCode::FORBIDDEN,
- Error::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
- ) => StatusCode::SERVICE_UNAVAILABLE,
- Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
- StatusCode::INTERNAL_SERVER_ERROR
- }
- Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
- Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
- _ => StatusCode::BAD_REQUEST,
- }
- }
-
pub fn aws_code(&self) -> &'static str {
match self {
+ Error::Common(c) => c.aws_code(),
Error::NoSuchKey => "NoSuchKey",
- Error::NoSuchBucket => "NoSuchBucket",
Error::NoSuchUpload => "NoSuchUpload",
- Error::BucketAlreadyExists => "BucketAlreadyExists",
- Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed",
Error::InvalidPart => "InvalidPart",
Error::InvalidPartOrder => "InvalidPartOrder",
Error::EntityTooSmall => "EntityTooSmall",
- Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented",
- Error::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
- ) => "ServiceUnavailable",
- Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
- _ => "InvalidRequest",
+ Error::InvalidXml(_) => "MalformedXML",
+ Error::InvalidRange(_) => "InvalidRange",
+ Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => {
+ "InvalidRequest"
+ }
}
}
+}
- pub fn aws_xml(&self, garage_region: &str, path: &str) -> String {
- let error = s3_xml::Error {
- code: s3_xml::Value(self.aws_code().to_string()),
- message: s3_xml::Value(format!("{}", self)),
- resource: Some(s3_xml::Value(path.to_string())),
- region: Some(s3_xml::Value(garage_region.to_string())),
- };
- s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
- r#"
-<?xml version="1.0" encoding="UTF-8"?>
-<Error>
- <Code>InternalError</Code>
- <Message>XML encoding of error failed</Message>
-</Error>
- "#
- .into()
- })
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchKey | Error::NoSuchUpload => StatusCode::NOT_FOUND,
+ Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
+ Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
+ Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
+ Error::AuthorizationHeaderMalformed(_)
+ | Error::InvalidPart
+ | Error::InvalidPartOrder
+ | Error::EntityTooSmall
+ | Error::InvalidXml(_)
+ | Error::InvalidUtf8Str(_)
+ | Error::InvalidUtf8String(_)
+ | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
+ }
}
- pub fn add_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
use hyper::header;
+
+ header_map.append(header::CONTENT_TYPE, "application/xml".parse().unwrap());
+
#[allow(clippy::single_match)]
match self {
Error::InvalidRange((_, len)) => {
@@ -212,68 +188,23 @@ impl Error {
_ => (),
}
}
-}
-
-/// Trait to map error to the Bad Request error code
-pub trait OkOrBadRequest {
- type S;
- fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
-}
-
-impl<T, E> OkOrBadRequest for Result<T, E>
-where
- E: std::fmt::Display,
-{
- type S = T;
- fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Ok(x) => Ok(x),
- Err(e) => Err(Error::BadRequest(format!("{}: {}", reason.as_ref(), e))),
- }
- }
-}
-
-impl<T> OkOrBadRequest for Option<T> {
- type S = T;
- fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Some(x) => Ok(x),
- None => Err(Error::BadRequest(reason.as_ref().to_string())),
- }
- }
-}
-
-/// Trait to map an error to an Internal Error code
-pub trait OkOrInternalError {
- type S;
- fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
-}
-
-impl<T, E> OkOrInternalError for Result<T, E>
-where
- E: std::fmt::Display,
-{
- type S = T;
- fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Ok(x) => Ok(x),
- Err(e) => Err(Error::InternalError(GarageError::Message(format!(
- "{}: {}",
- reason.as_ref(),
- e
- )))),
- }
- }
-}
-impl<T> OkOrInternalError for Option<T> {
- type S = T;
- fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
- match self {
- Some(x) => Ok(x),
- None => Err(Error::InternalError(GarageError::Message(
- reason.as_ref().to_string(),
- ))),
- }
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = s3_xml::Error {
+ code: s3_xml::Value(self.aws_code().to_string()),
+ message: s3_xml::Value(format!("{}", self)),
+ resource: Some(s3_xml::Value(path.to_string())),
+ region: Some(s3_xml::Value(garage_region.to_string())),
+ };
+ Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
+ r#"
+<?xml version="1.0" encoding="UTF-8"?>
+<Error>
+ <Code>InternalError</Code>
+ <Message>XML encoding of error failed</Message>
+</Error>
+ "#
+ .into()
+ }))
}
}
diff --git a/src/api/s3_get.rs b/src/api/s3/get.rs
index 7f647e15..2a99551a 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3/get.rs
@@ -2,22 +2,25 @@
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
-use futures::stream::*;
+use futures::future;
+use futures::stream::{self, StreamExt};
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
-use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
+use tokio::sync::mpsc;
+use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey;
use garage_util::data::*;
+use garage_util::error::OkOrMessage;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
-use crate::error::*;
+use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@@ -210,8 +213,8 @@ pub async fn handle_get(
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => {
- return Err(Error::BadRequest(
- "Cannot specify both partNumber and Range header".into(),
+ return Err(Error::bad_request(
+ "Cannot specify both partNumber and Range header",
));
}
(Some(pn), None) => {
@@ -242,36 +245,56 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let read_first_block = garage.block_manager.rpc_get_block(first_block_hash);
- let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
-
- let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
- let version = version.ok_or(Error::NoSuchKey)?;
+ let (tx, rx) = mpsc::channel(2);
+
+ let order_stream = OrderTag::stream();
+ let first_block_hash = *first_block_hash;
+ let version_uuid = last_v.uuid;
+
+ tokio::spawn(async move {
+ match async {
+ let garage2 = garage.clone();
+ let version_fut = tokio::spawn(async move {
+ garage2.version_table.get(&version_uuid, &EmptyKey).await
+ });
+
+ let stream_block_0 = garage
+ .block_manager
+ .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ .await?;
+ tx.send(stream_block_0)
+ .await
+ .ok_or_message("channel closed")?;
+
+ let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
+ for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
+ let stream_block_i = garage
+ .block_manager
+ .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ tx.send(stream_block_i)
+ .await
+ .ok_or_message("channel closed")?;
+ }
- let mut blocks = version
- .blocks
- .items()
- .iter()
- .map(|(_, vb)| (vb.hash, None))
- .collect::<Vec<_>>();
- blocks[0].1 = Some(first_block);
-
- let body_stream = futures::stream::iter(blocks)
- .map(move |(hash, data_opt)| {
- let garage = garage.clone();
- async move {
- if let Some(data) = data_opt {
- Ok(Bytes::from(data))
- } else {
- garage
- .block_manager
- .rpc_get_block(&hash)
- .await
- .map(Bytes::from)
- }
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ let _ = tx
+ .send(Box::pin(stream::once(future::ready(Err(err)))))
+ .await;
}
- })
- .buffered(2);
+ }
+ });
+
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
@@ -302,9 +325,9 @@ async fn handle_get_range(
let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
Ok(resp_builder.body(body)?)
} else {
- None.ok_or_internal_error(
+ Err(Error::internal_error(
"Requested range not present in inline bytes when it should have been",
- )
+ ))
}
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
@@ -422,40 +445,79 @@ fn body_from_blocks_range(
all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
));
- let mut true_offset = 0;
+ let mut block_offset: u64 = 0;
for (_, b) in all_blocks.iter() {
- if true_offset >= end {
+ if block_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
- if true_offset < end && true_offset + b.size > begin {
- blocks.push((*b, true_offset));
+ if block_offset < end && block_offset + b.size > begin {
+ blocks.push((*b, block_offset));
}
- true_offset += b.size;
+ block_offset += b.size as u64;
}
+ let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks)
- .map(move |(block, true_offset)| {
+ .enumerate()
+ .map(move |(i, (block, block_offset))| {
let garage = garage.clone();
async move {
- let data = garage.block_manager.rpc_get_block(&block.hash).await?;
- let data = Bytes::from(data);
- let start_in_block = if true_offset > begin {
- 0
- } else {
- begin - true_offset
- };
- let end_in_block = if true_offset + block.size < end {
- block.size
- } else {
- end - true_offset
- };
- Result::<Bytes, Error>::Ok(
- data.slice(start_in_block as usize..end_in_block as usize),
- )
+ garage
+ .block_manager
+ .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
+ .await
+ .unwrap_or_else(|e| error_stream(i, e))
+ .scan(block_offset, move |chunk_offset, chunk| {
+ let r = match chunk {
+ Ok(chunk_bytes) => {
+ let chunk_len = chunk_bytes.len() as u64;
+ let r = if *chunk_offset >= end {
+ // The current chunk is after the part we want to read.
+ // Returning None here will stop the scan, the rest of the
+ // stream will be ignored
+ None
+ } else if *chunk_offset + chunk_len <= begin {
+ // The current chunk is before the part we want to read.
+ // We return a None that will be removed by the filter_map
+ // below.
+ Some(None)
+ } else {
+ // The chunk has an intersection with the requested range
+ let start_in_chunk = if *chunk_offset > begin {
+ 0
+ } else {
+ begin - *chunk_offset
+ };
+ let end_in_chunk = if *chunk_offset + chunk_len < end {
+ chunk_len
+ } else {
+ end - *chunk_offset
+ };
+ Some(Some(Ok(chunk_bytes
+ .slice(start_in_chunk as usize..end_in_chunk as usize))))
+ };
+ *chunk_offset += chunk_bytes.len() as u64;
+ r
+ }
+ Err(e) => Some(Some(Err(e))),
+ };
+ futures::future::ready(r)
+ })
+ .filter_map(futures::future::ready)
}
})
- .buffered(2);
+ .buffered(2)
+ .flatten();
hyper::body::Body::wrap_stream(body_stream)
}
+
+fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
+ Box::pin(futures::stream::once(async move {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Could not get block {}: {}", i, e),
+ ))
+ }))
+}
diff --git a/src/api/s3_list.rs b/src/api/s3/list.rs
index 5852fc1b..e5f486c8 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3/list.rs
@@ -10,15 +10,16 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::Version;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::Version;
-use garage_table::EmptyKey;
+use garage_table::{EmptyKey, EnumerationOrder};
use crate::encoding::*;
-use crate::error::*;
-use crate::s3_put;
-use crate::s3_xml;
+use crate::helpers::key_after_prefix;
+use crate::s3::error::*;
+use crate::s3::put as s3_put;
+use crate::s3::xml as s3_xml;
const DUMMY_NAME: &str = "Dummy Key";
const DUMMY_KEY: &str = "GKDummyKey";
@@ -66,8 +67,14 @@ pub async fn handle_list(
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
- t.get_range(&bucket, key, Some(ObjectFilter::IsData), count)
- .await
+ t.get_range(
+ &bucket,
+ key,
+ Some(ObjectFilter::IsData),
+ count,
+ EnumerationOrder::Forward,
+ )
+ .await
}
};
@@ -165,8 +172,14 @@ pub async fn handle_list_multipart_upload(
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
- t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count)
- .await
+ t.get_range(
+ &bucket,
+ key,
+ Some(ObjectFilter::IsUploading),
+ count,
+ EnumerationOrder::Forward,
+ )
+ .await
}
};
@@ -569,13 +582,19 @@ impl ListObjectsQuery {
// representing the key to start with.
(Some(token), _) => match &token[..1] {
"[" => Ok(RangeBegin::IncludingKey {
- key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ key: String::from_utf8(
+ base64::decode(token[1..].as_bytes())
+ .ok_or_bad_request("Invalid continuation token")?,
+ )?,
fallback_key: None,
}),
"]" => Ok(RangeBegin::AfterKey {
- key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ key: String::from_utf8(
+ base64::decode(token[1..].as_bytes())
+ .ok_or_bad_request("Invalid continuation token")?,
+ )?,
}),
- _ => Err(Error::BadRequest("Invalid continuation token".to_string())),
+ _ => Err(Error::bad_request("Invalid continuation token")),
},
// StartAfter has defined semantics in the spec:
@@ -923,39 +942,13 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
}
}
-const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
-
-/// Compute the key after the prefix
-fn key_after_prefix(pfx: &str) -> Option<String> {
- let mut next = pfx.to_string();
- while !next.is_empty() {
- let tail = next.pop().unwrap();
- if tail >= char::MAX {
- continue;
- }
-
- // Circumvent a limitation of RangeFrom that overflow earlier than needed
- // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
- let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
- char::MAX
- } else {
- (tail..).nth(1).unwrap()
- };
-
- next.push(new_tail);
- return Some(next);
- }
-
- None
-}
-
/*
* Unit tests of this module
*/
#[cfg(test)]
mod tests {
use super::*;
- use garage_model::version_table::*;
+ use garage_model::s3::version_table::*;
use garage_util::*;
use std::iter::FromIterator;
@@ -1003,39 +996,6 @@ mod tests {
}
#[test]
- fn test_key_after_prefix() {
- assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
- assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
- assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭");
- assert_eq!(
- key_after_prefix("􏿽").unwrap().as_str(),
- String::from(char::from_u32(0x10FFFE).unwrap())
- );
-
- // When the last character is the biggest UTF8 char
- let a = String::from_iter(['a', char::MAX].iter());
- assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
-
- // When all characters are the biggest UTF8 char
- let b = String::from_iter([char::MAX; 3].iter());
- assert!(key_after_prefix(b.as_str()).is_none());
-
- // Check utf8 surrogates
- let c = String::from('\u{D7FF}');
- assert_eq!(
- key_after_prefix(c.as_str()).unwrap().as_str(),
- String::from('\u{E000}')
- );
-
- // Check the character before the biggest one
- let d = String::from('\u{10FFFE}');
- assert_eq!(
- key_after_prefix(d.as_str()).unwrap().as_str(),
- String::from(char::MAX)
- );
- }
-
- #[test]
fn test_common_prefixes() {
let mut query = query();
let objs = objs();
diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs
new file mode 100644
index 00000000..7b56d4d8
--- /dev/null
+++ b/src/api/s3/mod.rs
@@ -0,0 +1,15 @@
+pub mod api_server;
+pub mod error;
+
+mod bucket;
+mod copy;
+pub mod cors;
+mod delete;
+pub mod get;
+mod list;
+mod post_object;
+mod put;
+mod website;
+
+mod router;
+pub mod xml;
diff --git a/src/api/s3_post_object.rs b/src/api/s3/post_object.rs
index 585e0304..d063faa4 100644
--- a/src/api/s3_post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -14,16 +14,15 @@ use serde::Deserialize;
use garage_model::garage::Garage;
-use crate::api_server::resolve_bucket;
-use crate::error::*;
-use crate::s3_put::{get_headers, save_stream};
-use crate::s3_xml;
+use crate::s3::error::*;
+use crate::s3::put::{get_headers, save_stream};
+use crate::s3::xml as s3_xml;
use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
req: Request<Body>,
- bucket: String,
+ bucket_name: String,
) -> Result<Response<Body>, Error> {
let boundary = req
.headers()
@@ -48,9 +47,7 @@ pub async fn handle_post_object(
let field = if let Some(field) = multipart.next_field().await? {
field
} else {
- return Err(Error::BadRequest(
- "Request did not contain a file".to_owned(),
- ));
+ return Err(Error::bad_request("Request did not contain a file"));
};
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
name
@@ -66,14 +63,14 @@ pub async fn handle_post_object(
"tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
"acl" => {
if params.insert("x-amz-acl", content).is_some() {
- return Err(Error::BadRequest(
- "Field 'acl' provided more than one time".to_string(),
+ return Err(Error::bad_request(
+ "Field 'acl' provided more than one time",
));
}
}
_ => {
if params.insert(&name, content).is_some() {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Field '{}' provided more than one time",
name
)));
@@ -90,9 +87,7 @@ pub async fn handle_post_object(
.to_str()?;
let credential = params
.get("x-amz-credential")
- .ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?
.to_str()?;
let policy = params
.get("policy")
@@ -119,17 +114,31 @@ pub async fn handle_post_object(
};
let date = parse_date(date)?;
- let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?;
+ let api_key = verify_v4(
+ &garage,
+ "s3",
+ credential,
+ &date,
+ signature,
+ policy.as_bytes(),
+ )
+ .await?;
- let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
if !api_key.allow_write(&bucket_id) {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
+ return Err(Error::forbidden("Operation is not allowed for this key."));
}
- let decoded_policy = base64::decode(&policy)?;
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
@@ -137,9 +146,7 @@ pub async fn handle_post_object(
.ok_or_bad_request("Invalid expiration date")?
.into();
if Utc::now() - expiration > Duration::zero() {
- return Err(Error::BadRequest(
- "Expiration date is in the paste".to_string(),
- ));
+ return Err(Error::bad_request("Expiration date is in the paste"));
}
let mut conditions = decoded_policy.into_conditions()?;
@@ -151,7 +158,7 @@ pub async fn handle_post_object(
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| {
- Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
@@ -161,7 +168,7 @@ pub async fn handle_post_object(
}
};
if !ok {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
@@ -170,7 +177,7 @@ pub async fn handle_post_object(
}
"key" => {
let conds = conditions.params.remove("key").ok_or_else(|| {
- Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
@@ -178,7 +185,7 @@ pub async fn handle_post_object(
Operation::StartsWith(s) => key.starts_with(&s),
};
if !ok {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
@@ -193,7 +200,7 @@ pub async fn handle_post_object(
continue;
}
let conds = conditions.params.remove(&param_key).ok_or_else(|| {
- Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
@@ -201,7 +208,7 @@ pub async fn handle_post_object(
Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()),
};
if !ok {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
@@ -212,7 +219,7 @@ pub async fn handle_post_object(
}
if let Some((param_key, _)) = conditions.params.iter().next() {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Key '{}' is required in policy, but no value was provided",
param_key
)));
@@ -225,7 +232,7 @@ pub async fn handle_post_object(
garage,
headers,
StreamLimiter::new(stream, conditions.content_length),
- bucket_id,
+ &bucket,
&key,
None,
None,
@@ -242,7 +249,7 @@ pub async fn handle_post_object(
{
target
.query_pairs_mut()
- .append_pair("bucket", &bucket)
+ .append_pair("bucket", &bucket_name)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
@@ -287,7 +294,7 @@ pub async fn handle_post_object(
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
- bucket: s3_xml::Value(bucket),
+ bucket: s3_xml::Value(bucket_name),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
@@ -318,7 +325,7 @@ impl Policy {
match condition {
PolicyCondition::Equal(map) => {
if map.len() != 1 {
- return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ return Err(Error::bad_request("Invalid policy item"));
}
let (mut k, v) = map.into_iter().next().expect("size was verified");
k.make_ascii_lowercase();
@@ -326,7 +333,7 @@ impl Policy {
}
PolicyCondition::OtherOp([cond, mut key, value]) => {
if key.remove(0) != '$' {
- return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ return Err(Error::bad_request("Invalid policy item"));
}
key.make_ascii_lowercase();
match cond.as_str() {
@@ -339,7 +346,7 @@ impl Policy {
.or_default()
.push(Operation::StartsWith(value));
}
- _ => return Err(Error::BadRequest("Invalid policy item".to_owned())),
+ _ => return Err(Error::bad_request("Invalid policy item")),
}
}
PolicyCondition::SizeRange(key, min, max) => {
@@ -347,7 +354,7 @@ impl Policy {
length.0 = length.0.max(min);
length.1 = length.1.min(max);
} else {
- return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ return Err(Error::bad_request("Invalid policy item"));
}
}
}
@@ -412,15 +419,15 @@ where
self.read += bytes.len() as u64;
// optimization to fail early when we know before the end it's too long
if self.length.end() < &self.read {
- return Poll::Ready(Some(Err(Error::BadRequest(
- "File size does not match policy".to_owned(),
+ return Poll::Ready(Some(Err(Error::bad_request(
+ "File size does not match policy",
))));
}
}
Poll::Ready(None) => {
if !self.length.contains(&self.read) {
- return Poll::Ready(Some(Err(Error::BadRequest(
- "File size does not match policy".to_owned(),
+ return Poll::Ready(Some(Err(Error::bad_request(
+ "File size does not match policy",
))));
}
}
diff --git a/src/api/s3_put.rs b/src/api/s3/put.rs
index ed0bf00b..97b8e4e3 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3/put.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, BTreeSet, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use futures::prelude::*;
@@ -8,25 +8,34 @@ use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use opentelemetry::{
+ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
+ Context,
+};
+
+use garage_rpc::netapp::bytes_buf::BytesBuf;
use garage_table::*;
+use garage_util::async_hash::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
-use garage_model::block_ref_table::*;
+use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::index_counter::CountedItem;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
-use crate::error::*;
-use crate::s3_xml;
+use crate::s3::error::*;
+use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
- bucket_id: Uuid,
+ bucket: &Bucket,
key: &str,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@@ -46,7 +55,7 @@ pub async fn handle_put(
garage,
headers,
body,
- bucket_id,
+ bucket,
key,
content_md5,
content_sha256,
@@ -59,7 +68,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: Arc<Garage>,
headers: ObjectVersionHeaders,
body: S,
- bucket_id: Uuid,
+ bucket: &Bucket,
key: &str,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
@@ -80,6 +89,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let data_md5sum_hex = hex::encode(data_md5sum);
let data_sha256sum = sha256sum(&first_block[..]);
+ let size = first_block.len() as u64;
ensure_checksum_matches(
data_md5sum.as_slice(),
@@ -88,20 +98,22 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
+ check_quotas(&garage, bucket, key, size).await?;
+
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta {
headers,
- size: first_block.len() as u64,
+ size,
etag: data_md5sum_hex.clone(),
},
- first_block,
+ first_block.to_vec(),
)),
};
- let object = Object::new(bucket_id, key.into(), vec![object_version]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
@@ -114,36 +126,42 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
- let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
- let version = Version::new(version_uuid, bucket_id, key.into(), false);
+ let version = Version::new(version_uuid, bucket.id, key.into(), false);
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block_hash = blake2sum(&first_block[..]);
- let tx_result = read_and_put_blocks(
- &garage,
- &version,
- 1,
- first_block,
- first_block_hash,
- &mut chunker,
- )
- .await
- .and_then(|(total_size, data_md5sum, data_sha256sum)| {
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
+
+ let tx_result = (|| async {
+ let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
+ &garage,
+ &version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
+
ensure_checksum_matches(
data_md5sum.as_slice(),
data_sha256sum,
content_md5.as_deref(),
content_sha256,
- )
- .map(|()| (total_size, data_md5sum))
- });
+ )?;
+
+ check_quotas(&garage, bucket, key, total_size).await?;
+
+ Ok((total_size, data_md5sum))
+ })()
+ .await;
// If something went wrong, clean up
let (total_size, md5sum_arr) = match tx_result {
@@ -151,7 +169,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
Err(e) => {
// Mark object as aborted, this will free the blocks further down
object_version.state = ObjectVersionState::Aborted;
- let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
return Err(e);
}
@@ -167,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
first_block_hash,
));
- let object = Object::new(bucket_id, key.into(), vec![object_version]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
Ok((version_uuid, md5sum_hex))
@@ -183,8 +201,8 @@ fn ensure_checksum_matches(
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
- return Err(Error::BadRequest(
- "Unable to validate x-amz-content-sha256".to_string(),
+ return Err(Error::bad_request(
+ "Unable to validate x-amz-content-sha256",
));
} else {
trace!("Successfully validated x-amz-content-sha256");
@@ -192,9 +210,7 @@ fn ensure_checksum_matches(
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
- return Err(Error::BadRequest(
- "Unable to validate content-md5".to_string(),
- ));
+ return Err(Error::bad_request("Unable to validate content-md5"));
} else {
trace!("Successfully validated content-md5");
}
@@ -202,18 +218,85 @@ fn ensure_checksum_matches(
Ok(())
}
+/// Check that inserting this object with this size doesn't exceed bucket quotas
+async fn check_quotas(
+ garage: &Arc<Garage>,
+ bucket: &Bucket,
+ key: &str,
+ size: u64,
+) -> Result<(), Error> {
+ let quotas = bucket.state.as_option().unwrap().quotas.get();
+ if quotas.max_objects.is_none() && quotas.max_size.is_none() {
+ return Ok(());
+ };
+
+ let key = key.to_string();
+ let (prev_object, counters) = futures::try_join!(
+ garage.object_table.get(&bucket.id, &key),
+ garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
+ )?;
+
+ let counters = counters
+ .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let (prev_cnt_obj, prev_cnt_size) = match prev_object {
+ Some(o) => {
+ let prev_cnt = o.counts().into_iter().collect::<HashMap<_, _>>();
+ (
+ prev_cnt.get(OBJECTS).cloned().unwrap_or_default(),
+ prev_cnt.get(BYTES).cloned().unwrap_or_default(),
+ )
+ }
+ None => (0, 0),
+ };
+ let cnt_obj_diff = 1 - prev_cnt_obj;
+ let cnt_size_diff = size as i64 - prev_cnt_size;
+
+ if let Some(mo) = quotas.max_objects {
+ let current_objects = counters.get(OBJECTS).cloned().unwrap_or_default();
+ if cnt_obj_diff > 0 && current_objects + cnt_obj_diff > mo as i64 {
+ return Err(Error::forbidden(format!(
+ "Object quota is reached, maximum objects for this bucket: {}",
+ mo
+ )));
+ }
+ }
+
+ if let Some(ms) = quotas.max_size {
+ let current_size = counters.get(BYTES).cloned().unwrap_or_default();
+ if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
+ return Err(Error::forbidden(format!(
+ "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
+ ms, current_size, size
+ )));
+ }
+ }
+
+ Ok(())
+}
+
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
- first_block: Vec<u8>,
+ first_block: Bytes,
first_block_hash: Hash,
chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
- let mut md5hasher = Md5::new();
- let mut sha256hasher = Sha256::new();
- md5hasher.update(&first_block[..]);
- sha256hasher.update(&first_block[..]);
+ let tracer = opentelemetry::global::tracer("garage");
+
+ let md5hasher = AsyncHasher::<Md5>::new();
+ let sha256hasher = AsyncHasher::<Sha256>::new();
+
+ futures::future::join(
+ md5hasher.update(first_block.clone()),
+ sha256hasher.update(first_block.clone()),
+ )
+ .with_context(Context::current_with_span(
+ tracer.start("Hash first block (md5, sha256)"),
+ ))
+ .await;
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
@@ -235,9 +318,15 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- md5hasher.update(&block[..]);
- sha256hasher.update(&block[..]);
- let block_hash = blake2sum(&block[..]);
+ let (_, _, block_hash) = futures::future::join3(
+ md5hasher.update(block.clone()),
+ sha256hasher.update(block.clone()),
+ async_blake2sum(block.clone()),
+ )
+ .with_context(Context::current_with_span(
+ tracer.start("Hash block (md5, sha256, blake2)"),
+ ))
+ .await;
let block_len = block.len();
put_curr_version_block = put_block_meta(
garage,
@@ -255,9 +344,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
}
let total_size = next_offset as u64;
- let data_md5sum = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize().await;
- let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = sha256hasher.finalize().await;
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
@@ -297,7 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<u8>,
+ buf: BytesBuf,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -306,11 +395,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(2 * block_size),
+ buf: BytesBuf::new(),
}
}
- async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
+ async fn next(&mut self) -> Result<Option<Bytes>, Error> {
while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
@@ -323,12 +412,8 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
if self.buf.is_empty() {
Ok(None)
- } else if self.buf.len() <= self.block_size {
- let block = self.buf.drain(..).collect::<Vec<u8>>();
- Ok(Some(block))
} else {
- let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
- Ok(Some(block))
+ Ok(Some(self.buf.take_max(self.block_size)))
}
}
}
@@ -428,7 +513,7 @@ pub async fn handle_put_part(
// Check part hasn't already been uploaded
if let Some(v) = version {
if v.has_part_number(part_number) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Part number {} has already been uploaded",
part_number
)));
@@ -437,7 +522,9 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block_hash = blake2sum(&first_block[..]);
+
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
+
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
&version,
@@ -475,7 +562,7 @@ pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
req: Request<Body>,
bucket_name: &str,
- bucket_id: Uuid,
+ bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
@@ -499,7 +586,7 @@ pub async fn handle_complete_multipart_upload(
// Get object and version
let key = key.to_string();
let (object, version) = futures::try_join!(
- garage.object_table.get(&bucket_id, &key),
+ garage.object_table.get(&bucket.id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
)?;
@@ -513,7 +600,7 @@ pub async fn handle_complete_multipart_upload(
let version = version.ok_or(Error::NoSuchKey)?;
if version.blocks.is_empty() {
- return Err(Error::BadRequest("No data was uploaded".to_string()));
+ return Err(Error::bad_request("No data was uploaded"));
}
let headers = match object_version.state {
@@ -574,8 +661,8 @@ pub async fn handle_complete_multipart_upload(
.map(|x| x.part_number)
.eq(block_parts.into_iter());
if !same_parts {
- return Err(Error::BadRequest(
- "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
+ return Err(Error::bad_request(
+ "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again."
));
}
@@ -592,6 +679,14 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
+ if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
+ object_version.state = ObjectVersionState::Aborted;
+ let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
+ garage.object_table.insert(&final_object).await?;
+
+ return Err(e);
+ }
+
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
@@ -602,7 +697,7 @@ pub async fn handle_complete_multipart_upload(
version.blocks.items()[0].1.hash,
));
- let final_object = Object::new(bucket_id, key.clone(), vec![object_version]);
+ let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
diff --git a/src/api/s3_router.rs b/src/api/s3/router.rs
index 95a7eceb..44f581ff 100644
--- a/src/api/s3_router.rs
+++ b/src/api/s3/router.rs
@@ -1,131 +1,13 @@
-use crate::error::{Error, OkOrBadRequest};
-
use std::borrow::Cow;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, Method, Request};
-/// This macro is used to generate very repetitive match {} blocks in this module
-/// It is _not_ made to be used anywhere else
-macro_rules! s3_match {
- (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{
- // usage: s3_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] }
- // returns true if the variant was one of the listed variants, false otherwise.
- use Endpoint::*;
- match $enum {
- $(
- $endpoint { .. } => true,
- )*
- _ => false
- }
- }};
- (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{
- // usage: s3_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] }
- // returns Some(field_value), or None if the variant was not one of the listed variants.
- use Endpoint::*;
- match $enum {
- $(
- $endpoint {$param, ..} => Some($param),
- )*
- _ => None
- }
- }};
- (@gen_parser ($keyword:expr, $key:expr, $query:expr, $header:expr),
- key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
- no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
- // usage: s3_match {@gen_parser (keyword, key, query, header),
- // key: [
- // SOME_KEYWORD => VariantWithKey,
- // ...
- // ],
- // no_key: [
- // SOME_KEYWORD => VariantWithoutKey,
- // ...
- // ]
- // }
- // See in from_{method} for more detailed usage.
- use Endpoint::*;
- use keywords::*;
- match ($keyword, !$key.is_empty()){
- $(
- ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k {
- key: $key,
- $($(
- $param_k: s3_match!(@@parse_param $query, $conv_k, $param_k),
- )*)?
- }),
- )*
- $(
- ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk {
- $($(
- $param_nk: s3_match!(@@parse_param $query, $conv_nk, $param_nk),
- )*)?
- }),
- )*
- (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw)))
- }
- }};
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
+use crate::s3::error::*;
- (@@parse_param $query:expr, query_opt, $param:ident) => {{
- // extract optional query parameter
- $query.$param.take().map(|param| param.into_owned())
- }};
- (@@parse_param $query:expr, query, $param:ident) => {{
- // extract mendatory query parameter
- $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned()
- }};
- (@@parse_param $query:expr, opt_parse, $param:ident) => {{
- // extract and parse optional query parameter
- // missing parameter is file, however parse error is reported as an error
- $query.$param
- .take()
- .map(|param| param.parse())
- .transpose()
- .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
- }};
- (@@parse_param $query:expr, parse, $param:ident) => {{
- // extract and parse mandatory query parameter
- // both missing and un-parseable parameters are reported as errors
- $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?
- .parse()
- .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
- }};
- (@func
- $(#[$doc:meta])*
- pub enum Endpoint {
- $(
- $(#[$outer:meta])*
- $variant:ident $({
- $($name:ident: $ty:ty,)*
- })?,
- )*
- }) => {
- $(#[$doc])*
- pub enum Endpoint {
- $(
- $(#[$outer])*
- $variant $({
- $($name: $ty, )*
- })?,
- )*
- }
- impl Endpoint {
- pub fn name(&self) -> &'static str {
- match self {
- $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)*
- }
- }
- }
- };
- (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => {
- $($then)*
- };
- (@if () then ($($then:tt)*) else ($($else:tt)*)) => {
- $($else)*
- };
-}
-
-s3_match! {@func
+router_match! {@func
/// List of all S3 API endpoints.
///
@@ -460,7 +342,7 @@ impl Endpoint {
Method::POST => Self::from_post(key, &mut query)?,
Method::PUT => Self::from_put(key, &mut query, req.headers())?,
Method::DELETE => Self::from_delete(key, &mut query)?,
- _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ _ => return Err(Error::bad_request("Unknown method")),
};
if let Some(message) = query.nonempty_message() {
@@ -471,7 +353,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a GET.
fn from_get(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -528,7 +410,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a HEAD.
fn from_head(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -542,7 +424,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a POST.
fn from_post(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -564,7 +446,7 @@ impl Endpoint {
query: &mut QueryParameters<'_>,
headers: &HeaderMap<HeaderValue>,
) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, headers),
key: [
@@ -606,7 +488,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a DELETE.
fn from_delete(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -636,7 +518,7 @@ impl Endpoint {
/// Get the key the request target. Returns None for requests which don't use a key.
#[allow(dead_code)]
pub fn get_key(&self) -> Option<&str> {
- s3_match! {
+ router_match! {
@extract
self,
key,
@@ -673,7 +555,7 @@ impl Endpoint {
if let Endpoint::ListBuckets = self {
return Authorization::None;
};
- let readonly = s3_match! {
+ let readonly = router_match! {
@match
self,
[
@@ -717,7 +599,7 @@ impl Endpoint {
SelectObjectContent,
]
};
- let owner = s3_match! {
+ let owner = router_match! {
@match
self,
[
@@ -740,87 +622,6 @@ impl Endpoint {
}
}
-/// What kind of authorization is required to perform a given action
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum Authorization {
- /// No authorization is required
- None,
- /// Having Read permission on bucket
- Read,
- /// Having Write permission on bucket
- Write,
- /// Having Owner permission on bucket
- Owner,
-}
-
-/// This macro is used to generate part of the code in this module. It must be called only one, and
-/// is useless outside of this module.
-macro_rules! generateQueryParameters {
- ( $($rest:expr => $name:ident),* ) => {
- /// Struct containing all query parameters used in endpoints. Think of it as an HashMap,
- /// but with keys statically known.
- #[derive(Debug, Default)]
- struct QueryParameters<'a> {
- keyword: Option<Cow<'a, str>>,
- $(
- $name: Option<Cow<'a, str>>,
- )*
- }
-
- impl<'a> QueryParameters<'a> {
- /// Build this struct from the query part of an URI.
- fn from_query(query: &'a str) -> Result<Self, Error> {
- let mut res: Self = Default::default();
- for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
- let repeated = match k.as_ref() {
- $(
- $rest => if !v.is_empty() {
- res.$name.replace(v).is_some()
- } else {
- false
- },
- )*
- _ => {
- if k.starts_with("response-") || k.starts_with("X-Amz-") {
- false
- } else if v.as_ref().is_empty() {
- if res.keyword.replace(k).is_some() {
- return Err(Error::BadRequest("Multiple keywords".to_owned()));
- }
- continue;
- } else {
- debug!("Received an unknown query parameter: '{}'", k);
- false
- }
- }
- };
- if repeated {
- return Err(Error::BadRequest(format!(
- "Query parameter repeated: '{}'",
- k
- )));
- }
- }
- Ok(res)
- }
-
- /// Get an error message in case not all parameters where used when extracting them to
- /// build an Enpoint variant
- fn nonempty_message(&self) -> Option<&str> {
- if self.keyword.is_some() {
- Some("Keyword not used")
- } $(
- else if self.$name.is_some() {
- Some(concat!("'", $rest, "'"))
- }
- )* else {
- None
- }
- }
- }
- }
-}
-
// parameter name => struct field
generateQueryParameters! {
"continuation-token" => continuation_token,
diff --git a/src/api/s3_website.rs b/src/api/s3/website.rs
index b464dd45..77738971 100644
--- a/src/api/s3_website.rs
+++ b/src/api/s3/website.rs
@@ -4,13 +4,12 @@ use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::error::*;
-use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::s3::error::*;
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
-use garage_table::*;
use garage_util::data::*;
pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> {
@@ -47,14 +46,11 @@ pub async fn handle_delete_website(
bucket_id: Uuid,
) -> Result<Response<Body>, Error> {
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
param.website_config.update(None);
garage.bucket_table.insert(&bucket).await?;
@@ -77,14 +73,11 @@ pub async fn handle_put_website(
}
let mut bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+ let param = bucket.params_mut().unwrap();
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
@@ -176,8 +169,8 @@ impl WebsiteConfiguration {
|| self.index_document.is_some()
|| self.routing_rules.is_some())
{
- return Err(Error::BadRequest(
- "Bad XML: can't have RedirectAllRequestsTo and other fields".to_owned(),
+ return Err(Error::bad_request(
+ "Bad XML: can't have RedirectAllRequestsTo and other fields",
));
}
if let Some(ref ed) = self.error_document {
@@ -222,8 +215,8 @@ impl WebsiteConfiguration {
impl Key {
pub fn validate(&self) -> Result<(), Error> {
if self.key.0.is_empty() {
- Err(Error::BadRequest(
- "Bad XML: error document specified but empty".to_owned(),
+ Err(Error::bad_request(
+ "Bad XML: error document specified but empty",
))
} else {
Ok(())
@@ -234,8 +227,8 @@ impl Key {
impl Suffix {
pub fn validate(&self) -> Result<(), Error> {
if self.suffix.0.is_empty() | self.suffix.0.contains('/') {
- Err(Error::BadRequest(
- "Bad XML: index document is empty or contains /".to_owned(),
+ Err(Error::bad_request(
+ "Bad XML: index document is empty or contains /",
))
} else {
Ok(())
@@ -247,7 +240,7 @@ impl Target {
pub fn validate(&self) -> Result<(), Error> {
if let Some(ref protocol) = self.protocol {
if protocol.0 != "http" && protocol.0 != "https" {
- return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned()));
+ return Err(Error::bad_request("Bad XML: invalid protocol"));
}
}
Ok(())
@@ -269,19 +262,19 @@ impl Redirect {
pub fn validate(&self, has_prefix: bool) -> Result<(), Error> {
if self.replace_prefix.is_some() {
if self.replace_full.is_some() {
- return Err(Error::BadRequest(
- "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set".to_owned(),
+ return Err(Error::bad_request(
+ "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set",
));
}
if !has_prefix {
- return Err(Error::BadRequest(
- "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't".to_owned(),
+ return Err(Error::bad_request(
+ "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't",
));
}
}
if let Some(ref protocol) = self.protocol {
if protocol.0 != "http" && protocol.0 != "https" {
- return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned()));
+ return Err(Error::bad_request("Bad XML: invalid protocol"));
}
}
// TODO there are probably more invalide cases, but which ones?
diff --git a/src/api/s3_xml.rs b/src/api/s3/xml.rs
index 75ec4559..06f11288 100644
--- a/src/api/s3_xml.rs
+++ b/src/api/s3/xml.rs
@@ -1,7 +1,7 @@
use quick_xml::se::to_string;
use serde::{Deserialize, Serialize, Serializer};
-use crate::Error as ApiError;
+use crate::s3::error::Error as ApiError;
pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> {
let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string();
@@ -25,7 +25,7 @@ impl From<&str> for Value {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct IntValue(#[serde(rename = "$value")] pub i64);
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Bucket {
#[serde(rename = "CreationDate")]
pub creation_date: Value,
@@ -33,7 +33,7 @@ pub struct Bucket {
pub name: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Owner {
#[serde(rename = "DisplayName")]
pub display_name: Value,
@@ -41,13 +41,13 @@ pub struct Owner {
pub id: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct BucketList {
#[serde(rename = "Bucket")]
pub entries: Vec<Bucket>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListAllMyBucketsResult {
#[serde(rename = "Buckets")]
pub buckets: BucketList,
@@ -55,7 +55,7 @@ pub struct ListAllMyBucketsResult {
pub owner: Owner,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct LocationConstraint {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -63,7 +63,7 @@ pub struct LocationConstraint {
pub region: String,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Deleted {
#[serde(rename = "Key")]
pub key: Value,
@@ -73,7 +73,7 @@ pub struct Deleted {
pub delete_marker_version_id: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Error {
#[serde(rename = "Code")]
pub code: Value,
@@ -85,7 +85,7 @@ pub struct Error {
pub region: Option<Value>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct DeleteError {
#[serde(rename = "Code")]
pub code: Value,
@@ -97,7 +97,7 @@ pub struct DeleteError {
pub version_id: Option<Value>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct DeleteResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -107,7 +107,7 @@ pub struct DeleteResult {
pub errors: Vec<DeleteError>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct InitiateMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -119,7 +119,7 @@ pub struct InitiateMultipartUploadResult {
pub upload_id: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CompleteMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -133,7 +133,7 @@ pub struct CompleteMultipartUploadResult {
pub etag: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Initiator {
#[serde(rename = "DisplayName")]
pub display_name: Value,
@@ -141,7 +141,7 @@ pub struct Initiator {
pub id: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListMultipartItem {
#[serde(rename = "Initiated")]
pub initiated: Value,
@@ -157,7 +157,7 @@ pub struct ListMultipartItem {
pub storage_class: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListMultipartUploadsResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -187,7 +187,7 @@ pub struct ListMultipartUploadsResult {
pub encoding_type: Option<Value>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct PartItem {
#[serde(rename = "ETag")]
pub etag: Value,
@@ -199,7 +199,7 @@ pub struct PartItem {
pub size: IntValue,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListPartsResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -227,7 +227,7 @@ pub struct ListPartsResult {
pub storage_class: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListBucketItem {
#[serde(rename = "Key")]
pub key: Value,
@@ -241,13 +241,13 @@ pub struct ListBucketItem {
pub storage_class: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CommonPrefix {
#[serde(rename = "Prefix")]
pub prefix: Value,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListBucketResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -281,7 +281,7 @@ pub struct ListBucketResult {
pub common_prefixes: Vec<CommonPrefix>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct VersioningConfiguration {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@@ -289,7 +289,7 @@ pub struct VersioningConfiguration {
pub status: Option<Value>,
}
-#[derive(Debug, Serialize, PartialEq)]
+#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct PostObject {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs
new file mode 100644
index 00000000..f5a067bd
--- /dev/null
+++ b/src/api/signature/error.rs
@@ -0,0 +1,36 @@
+use err_derive::Error;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ /// Authorization Header Malformed
+ #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ AuthorizationHeaderMalformed(String),
+
+ // Category: bad request
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
+ #[error(display = "Invalid UTF-8: {}", _0)]
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
+
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs
index ebdee6da..4b8b990f 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/signature/mod.rs
@@ -1,14 +1,15 @@
use chrono::{DateTime, Utc};
-use hmac::{Hmac, Mac, NewMac};
+use hmac::{Hmac, Mac};
use sha2::Sha256;
use garage_util::data::{sha256sum, Hash};
-use crate::error::*;
-
+pub mod error;
pub mod payload;
pub mod streaming;
+use error::*;
+
pub const SHORT_DATE: &str = "%Y%m%d";
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
@@ -16,7 +17,7 @@ type HmacSha256 = Hmac<Sha256>;
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
- return Err(Error::BadRequest(
+ return Err(Error::bad_request(
"Request content hash does not match signed hash".to_string(),
));
}
@@ -28,20 +29,25 @@ pub fn signing_hmac(
secret_key: &str,
region: &str,
service: &str,
-) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
+) -> Result<HmacSha256, crypto_common::InvalidLength> {
let secret = String::from("AWS4") + secret_key;
- let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
+ let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?;
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
- let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
+ let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?;
region_hmac.update(region.as_bytes());
- let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
+ let mut service_hmac = HmacSha256::new_from_slice(&region_hmac.finalize().into_bytes())?;
service_hmac.update(service.as_bytes());
- let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
+ let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?;
signing_hmac.update(b"aws4_request");
- let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
+ let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?;
Ok(hmac)
}
-pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String {
- format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,)
+pub fn compute_scope(datetime: &DateTime<Utc>, region: &str, service: &str) -> String {
+ format!(
+ "{}/{}/{}/aws4_request",
+ datetime.format(SHORT_DATE),
+ region,
+ service
+ )
}
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index 2a41b307..4c7934e5 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -11,14 +11,15 @@ use garage_util::data::Hash;
use garage_model::garage::Garage;
use garage_model::key_table::*;
-use super::signing_hmac;
-use super::{LONG_DATETIME, SHORT_DATE};
+use super::LONG_DATETIME;
+use super::{compute_scope, signing_hmac};
use crate::encoding::uri_encode;
-use crate::error::*;
+use crate::signature::error::*;
pub async fn check_payload_signature(
garage: &Garage,
+ service: &str,
request: &Request<Body>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
@@ -64,6 +65,7 @@ pub async fn check_payload_signature(
let key = verify_v4(
garage,
+ service,
&authorization.credential,
&authorization.date,
&authorization.signature,
@@ -103,7 +105,7 @@ fn parse_authorization(
let (auth_kind, rest) = authorization.split_at(first_space);
if auth_kind != "AWS4-HMAC-SHA256" {
- return Err(Error::BadRequest("Unsupported authorization method".into()));
+ return Err(Error::bad_request("Unsupported authorization method"));
}
let mut auth_params = HashMap::new();
@@ -127,10 +129,11 @@ fn parse_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")
+ .map_err(Error::from)
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::hours(24) {
- return Err(Error::BadRequest("Date is too old".to_string()));
+ return Err(Error::bad_request("Date is too old".to_string()));
}
let auth = Authorization {
@@ -154,7 +157,7 @@ fn parse_query_authorization(
headers: &HashMap<String, String>,
) -> Result<Authorization, Error> {
if algorithm != "AWS4-HMAC-SHA256" {
- return Err(Error::BadRequest(
+ return Err(Error::bad_request(
"Unsupported authorization method".to_string(),
));
}
@@ -177,10 +180,10 @@ fn parse_query_authorization(
.get("x-amz-expires")
.ok_or_bad_request("X-Amz-Expires not found in query parameters")?
.parse()
- .map_err(|_| Error::BadRequest("X-Amz-Expires is not a number".to_string()))?;
+ .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?;
if duration > 7 * 24 * 3600 {
- return Err(Error::BadRequest(
+ return Err(Error::bad_request(
"X-Amz-Exprires may not exceed a week".to_string(),
));
}
@@ -188,10 +191,11 @@ fn parse_query_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")
+ .map_err(Error::from)
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::seconds(duration) {
- return Err(Error::BadRequest("Date is too old".to_string()));
+ return Err(Error::bad_request("Date is too old".to_string()));
}
Ok(Authorization {
@@ -281,6 +285,7 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
pub async fn verify_v4(
garage: &Garage,
+ service: &str,
credential: &str,
date: &DateTime<Utc>,
signature: &str,
@@ -288,11 +293,7 @@ pub async fn verify_v4(
) -> Result<Key, Error> {
let (key_id, scope) = parse_credential(credential)?;
- let scope_expected = format!(
- "{}/{}/s3/aws4_request",
- date.format(SHORT_DATE),
- garage.config.s3_api.s3_region
- );
+ let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service);
if scope != scope_expected {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
}
@@ -302,20 +303,20 @@ pub async fn verify_v4(
.get(&EmptyKey, &key_id)
.await?
.filter(|k| !k.state.is_deleted())
- .ok_or_else(|| Error::Forbidden(format!("No such key: {}", &key_id)))?;
+ .ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?;
let key_p = key.params().unwrap();
let mut hmac = signing_hmac(
date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
- "s3",
+ service,
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(payload);
let our_signature = hex::encode(hmac.finalize().into_bytes());
if signature != our_signature {
- return Err(Error::Forbidden("Invalid signature".to_string()));
+ return Err(Error::forbidden("Invalid signature".to_string()));
}
Ok(key)
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index 969a45d6..c8358c4f 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -1,18 +1,67 @@
use std::pin::Pin;
-use chrono::{DateTime, Utc};
+use chrono::{DateTime, NaiveDateTime, Utc};
use futures::prelude::*;
use futures::task;
+use garage_model::key_table::Key;
+use hmac::Mac;
use hyper::body::Bytes;
+use hyper::{Body, Request};
use garage_util::data::Hash;
-use hmac::Mac;
-
-use super::sha256sum;
-use super::HmacSha256;
-use super::LONG_DATETIME;
-use crate::error::*;
+use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
+
+use crate::signature::error::*;
+
+pub fn parse_streaming_body(
+ api_key: &Key,
+ req: Request<Body>,
+ content_sha256: &mut Option<Hash>,
+ region: &str,
+ service: &str,
+) -> Result<Request<Body>, Error> {
+ match req.headers().get("x-amz-content-sha256") {
+ Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
+ let signature = content_sha256
+ .take()
+ .ok_or_bad_request("No signature provided")?;
+
+ let secret_key = &api_key
+ .state
+ .as_option()
+ .ok_or_internal_error("Deleted key state")?
+ .secret_key;
+
+ let date = req
+ .headers()
+ .get("x-amz-date")
+ .ok_or_bad_request("Missing X-Amz-Date field")?
+ .to_str()?;
+ let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
+ .ok_or_bad_request("Invalid date")?;
+ let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
+
+ let scope = compute_scope(&date, region, service);
+ let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
+ .ok_or_internal_error("Unable to build signing HMAC")?;
+
+ Ok(req.map(move |body| {
+ Body::wrap_stream(
+ SignedPayloadStream::new(
+ body.map_err(Error::from),
+ signing_hmac,
+ date,
+ &scope,
+ signature,
+ )
+ .map_err(Error::from),
+ )
+ }))
+ }
+ _ => Ok(req),
+ }
+}
/// Result of `sha256("")`
const EMPTY_STRING_HEX_DIGEST: &str =
@@ -38,7 +87,7 @@ fn compute_streaming_payload_signature(
let mut hmac = signing_hmac.clone();
hmac.update(string_to_sign.as_bytes());
- Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
+ Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?)
}
mod payload {
@@ -114,10 +163,10 @@ impl From<SignedPayloadStreamError> for Error {
match err {
SignedPayloadStreamError::Stream(e) => e,
SignedPayloadStreamError::InvalidSignature => {
- Error::BadRequest("Invalid payload signature".into())
+ Error::bad_request("Invalid payload signature")
}
SignedPayloadStreamError::Message(e) => {
- Error::BadRequest(format!("Chunk format error: {}", e))
+ Error::bad_request(format!("Chunk format error: {}", e))
}
}
}
@@ -295,7 +344,7 @@ mod tests {
.with_timezone(&Utc);
let secret_key = "test";
let region = "test";
- let scope = crate::signature::compute_scope(&datetime, region);
+ let scope = crate::signature::compute_scope(&datetime, region, "s3");
let signing_hmac =
crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();