aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml5
-rw-r--r--src/api/api_server.rs645
-rw-r--r--src/api/error.rs7
-rw-r--r--src/api/generic_server.rs202
-rw-r--r--src/api/helpers.rs188
-rw-r--r--src/api/k2v/api_server.rs195
-rw-r--r--src/api/k2v/batch.rs368
-rw-r--r--src/api/k2v/index.rs100
-rw-r--r--src/api/k2v/item.rs230
-rw-r--r--src/api/k2v/mod.rs8
-rw-r--r--src/api/k2v/range.rs96
-rw-r--r--src/api/k2v/router.rs252
-rw-r--r--src/api/lib.rs22
-rw-r--r--src/api/router_macros.rs190
-rw-r--r--src/api/s3/api_server.rs401
-rw-r--r--src/api/s3/bucket.rs (renamed from src/api/s3_bucket.rs)12
-rw-r--r--src/api/s3/copy.rs (renamed from src/api/s3_copy.rs)14
-rw-r--r--src/api/s3/cors.rs (renamed from src/api/s3_cors.rs)2
-rw-r--r--src/api/s3/delete.rs (renamed from src/api/s3_delete.rs)4
-rw-r--r--src/api/s3/get.rs (renamed from src/api/s3_get.rs)4
-rw-r--r--src/api/s3/list.rs (renamed from src/api/s3_list.rs)92
-rw-r--r--src/api/s3/mod.rs14
-rw-r--r--src/api/s3/post_object.rs (renamed from src/api/s3_post_object.rs)16
-rw-r--r--src/api/s3/put.rs (renamed from src/api/s3_put.rs)8
-rw-r--r--src/api/s3/router.rs (renamed from src/api/s3_router.rs)220
-rw-r--r--src/api/s3/website.rs (renamed from src/api/s3_website.rs)2
-rw-r--r--src/api/s3/xml.rs (renamed from src/api/s3_xml.rs)0
-rw-r--r--src/api/signature/mod.rs9
-rw-r--r--src/api/signature/payload.rs15
-rw-r--r--src/api/signature/streaming.rs61
30 files changed, 2401 insertions, 981 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 5e96b081..29b26e5e 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -18,7 +18,9 @@ garage_model = { version = "0.7.0", path = "../model" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
garage_util = { version = "0.7.0", path = "../util" }
+garage_rpc = { version = "0.7.0", path = "../rpc" }
+async-trait = "0.1.7"
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
@@ -52,3 +54,6 @@ quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"
opentelemetry = "0.17"
+
+[features]
+k2v = [ "garage_util/k2v", "garage_model/k2v" ]
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
deleted file mode 100644
index e7b86d9e..00000000
--- a/src/api/api_server.rs
+++ /dev/null
@@ -1,645 +0,0 @@
-use std::net::SocketAddr;
-use std::sync::Arc;
-
-use chrono::{DateTime, NaiveDateTime, Utc};
-use futures::future::Future;
-use futures::prelude::*;
-use hyper::header;
-use hyper::server::conn::AddrStream;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Method, Request, Response, Server};
-
-use opentelemetry::{
- global,
- metrics::{Counter, ValueRecorder},
- trace::{FutureExt, TraceContextExt, Tracer},
- Context, KeyValue,
-};
-
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-use garage_util::metrics::{gen_trace_id, RecordDuration};
-
-use garage_model::garage::Garage;
-use garage_model::key_table::Key;
-
-use garage_table::util::*;
-
-use crate::error::*;
-use crate::signature::compute_scope;
-use crate::signature::payload::check_payload_signature;
-use crate::signature::streaming::SignedPayloadStream;
-use crate::signature::LONG_DATETIME;
-
-use crate::helpers::*;
-use crate::s3_bucket::*;
-use crate::s3_copy::*;
-use crate::s3_cors::*;
-use crate::s3_delete::*;
-use crate::s3_get::*;
-use crate::s3_list::*;
-use crate::s3_post_object::handle_post_object;
-use crate::s3_put::*;
-use crate::s3_router::{Authorization, Endpoint};
-use crate::s3_website::*;
-
-struct ApiMetrics {
- request_counter: Counter<u64>,
- error_counter: Counter<u64>,
- request_duration: ValueRecorder<f64>,
-}
-
-impl ApiMetrics {
- fn new() -> Self {
- let meter = global::meter("garage/api");
- Self {
- request_counter: meter
- .u64_counter("api.request_counter")
- .with_description("Number of API calls to the various S3 API endpoints")
- .init(),
- error_counter: meter
- .u64_counter("api.error_counter")
- .with_description(
- "Number of API calls to the various S3 API endpoints that resulted in errors",
- )
- .init(),
- request_duration: meter
- .f64_value_recorder("api.request_duration")
- .with_description("Duration of API calls to the various S3 API endpoints")
- .init(),
- }
- }
-}
-
-/// Run the S3 API server
-pub async fn run_api_server(
- garage: Arc<Garage>,
- shutdown_signal: impl Future<Output = ()>,
-) -> Result<(), GarageError> {
- let addr = &garage.config.s3_api.api_bind_addr;
-
- let metrics = Arc::new(ApiMetrics::new());
-
- let service = make_service_fn(|conn: &AddrStream| {
- let garage = garage.clone();
- let metrics = metrics.clone();
-
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
- let garage = garage.clone();
- let metrics = metrics.clone();
-
- handler(garage, metrics, req, client_addr)
- }))
- }
- });
-
- let server = Server::bind(addr).serve(service);
-
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("API server listening on http://{}", addr);
-
- graceful.await?;
- Ok(())
-}
-
-async fn handler(
- garage: Arc<Garage>,
- metrics: Arc<ApiMetrics>,
- req: Request<Body>,
- addr: SocketAddr,
-) -> Result<Response<Body>, GarageError> {
- let uri = req.uri().clone();
- info!("{} {} {}", addr, req.method(), uri);
- debug!("{:?}", req);
-
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer
- .span_builder("S3 API call (unknown)")
- .with_trace_id(gen_trace_id())
- .with_attributes(vec![
- KeyValue::new("method", format!("{}", req.method())),
- KeyValue::new("uri", req.uri().to_string()),
- ])
- .start(&tracer);
-
- let res = handler_stage2(garage.clone(), metrics, req)
- .with_context(Context::current_with_span(span))
- .await;
-
- match res {
- Ok(x) => {
- debug!("{} {:?}", x.status(), x.headers());
- Ok(x)
- }
- Err(e) => {
- let body: Body = Body::from(e.aws_xml(&garage.config.s3_api.s3_region, uri.path()));
- let mut http_error_builder = Response::builder()
- .status(e.http_status_code())
- .header("Content-Type", "application/xml");
-
- if let Some(header_map) = http_error_builder.headers_mut() {
- e.add_headers(header_map)
- }
-
- let http_error = http_error_builder.body(body)?;
-
- if e.http_status_code().is_server_error() {
- warn!("Response: error {}, {}", e.http_status_code(), e);
- } else {
- info!("Response: error {}, {}", e.http_status_code(), e);
- }
- Ok(http_error)
- }
- }
-}
-
-async fn handler_stage2(
- garage: Arc<Garage>,
- metrics: Arc<ApiMetrics>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let authority = req
- .headers()
- .get(header::HOST)
- .ok_or_bad_request("Host header required")?
- .to_str()?;
-
- let host = authority_to_host(authority)?;
-
- let bucket_name = garage
- .config
- .s3_api
- .root_domain
- .as_ref()
- .and_then(|root_domain| host_to_bucket(&host, root_domain));
-
- let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?;
- debug!("Endpoint: {:?}", endpoint);
-
- let current_context = Context::current();
- let current_span = current_context.span();
- current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
- current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
- current_span.set_attribute(KeyValue::new(
- "bucket",
- bucket_name.clone().unwrap_or_default(),
- ));
-
- let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
-
- let res = handler_stage3(garage, req, endpoint, bucket_name)
- .record_duration(&metrics.request_duration, &metrics_tags[..])
- .await;
-
- metrics.request_counter.add(1, &metrics_tags[..]);
-
- let status_code = match &res {
- Ok(r) => r.status(),
- Err(e) => e.http_status_code(),
- };
- if status_code.is_client_error() || status_code.is_server_error() {
- metrics.error_counter.add(
- 1,
- &[
- metrics_tags[0].clone(),
- KeyValue::new("status_code", status_code.as_str().to_string()),
- ],
- );
- }
-
- res
-}
-
-async fn handler_stage3(
- garage: Arc<Garage>,
- req: Request<Body>,
- endpoint: Endpoint,
- bucket_name: Option<String>,
-) -> Result<Response<Body>, Error> {
- // Some endpoints are processed early, before we even check for an API key
- if let Endpoint::PostObject = endpoint {
- return handle_post_object(garage, req, bucket_name.unwrap()).await;
- }
- if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, bucket_name).await;
- }
-
- let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?;
- let api_key = api_key.ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?;
-
- let req = match req.headers().get("x-amz-content-sha256") {
- Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
- let signature = content_sha256
- .take()
- .ok_or_bad_request("No signature provided")?;
-
- let secret_key = &api_key
- .state
- .as_option()
- .ok_or_internal_error("Deleted key state")?
- .secret_key;
-
- let date = req
- .headers()
- .get("x-amz-date")
- .ok_or_bad_request("Missing X-Amz-Date field")?
- .to_str()?;
- let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
- .ok_or_bad_request("Invalid date")?;
- let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
-
- let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
- let signing_hmac = crate::signature::signing_hmac(
- &date,
- secret_key,
- &garage.config.s3_api.s3_region,
- "s3",
- )
- .ok_or_internal_error("Unable to build signing HMAC")?;
-
- req.map(move |body| {
- Body::wrap_stream(
- SignedPayloadStream::new(
- body.map_err(Error::from),
- signing_hmac,
- date,
- &scope,
- signature,
- )
- .map_err(Error::from),
- )
- })
- }
- _ => req,
- };
-
- let bucket_name = match bucket_name {
- None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
- Some(bucket) => bucket.to_string(),
- };
-
- // Special code path for CreateBucket API endpoint
- if let Endpoint::CreateBucket {} = endpoint {
- return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
- }
-
- let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
- let bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
-
- let allowed = match endpoint.authorization_type() {
- Authorization::Read => api_key.allow_read(&bucket_id),
- Authorization::Write => api_key.allow_write(&bucket_id),
- Authorization::Owner => api_key.allow_owner(&bucket_id),
- _ => unreachable!(),
- };
-
- if !allowed {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
- }
-
- // Look up what CORS rule might apply to response.
- // Requests for methods different than GET, HEAD or POST
- // are always preflighted, i.e. the browser should make
- // an OPTIONS call before to check it is allowed
- let matching_cors_rule = match *req.method() {
- Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
- _ => None,
- };
-
- let resp = match endpoint {
- Endpoint::HeadObject {
- key, part_number, ..
- } => handle_head(garage, &req, bucket_id, &key, part_number).await,
- Endpoint::GetObject {
- key, part_number, ..
- } => handle_get(garage, &req, bucket_id, &key, part_number).await,
- Endpoint::UploadPart {
- key,
- part_number,
- upload_id,
- } => {
- handle_put_part(
- garage,
- req,
- bucket_id,
- &key,
- part_number,
- &upload_id,
- content_sha256,
- )
- .await
- }
- Endpoint::CopyObject { key } => handle_copy(garage, &api_key, &req, bucket_id, &key).await,
- Endpoint::UploadPartCopy {
- key,
- part_number,
- upload_id,
- } => {
- handle_upload_part_copy(
- garage,
- &api_key,
- &req,
- bucket_id,
- &key,
- part_number,
- &upload_id,
- )
- .await
- }
- Endpoint::PutObject { key } => {
- handle_put(garage, req, bucket_id, &key, content_sha256).await
- }
- Endpoint::AbortMultipartUpload { key, upload_id } => {
- handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
- }
- Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
- Endpoint::CreateMultipartUpload { key } => {
- handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
- }
- Endpoint::CompleteMultipartUpload { key, upload_id } => {
- handle_complete_multipart_upload(
- garage,
- req,
- &bucket_name,
- bucket_id,
- &key,
- &upload_id,
- content_sha256,
- )
- .await
- }
- Endpoint::CreateBucket {} => unreachable!(),
- Endpoint::HeadBucket {} => {
- let empty_body: Body = Body::from(vec![]);
- let response = Response::builder().body(empty_body).unwrap();
- Ok(response)
- }
- Endpoint::DeleteBucket {} => {
- handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
- }
- Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
- Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
- Endpoint::ListObjects {
- delimiter,
- encoding_type,
- marker,
- max_keys,
- prefix,
- } => {
- handle_list(
- garage,
- &ListObjectsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- prefix: prefix.unwrap_or_default(),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- },
- is_v2: false,
- marker,
- continuation_token: None,
- start_after: None,
- },
- )
- .await
- }
- Endpoint::ListObjectsV2 {
- delimiter,
- encoding_type,
- max_keys,
- prefix,
- continuation_token,
- start_after,
- list_type,
- ..
- } => {
- if list_type == "2" {
- handle_list(
- garage,
- &ListObjectsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- prefix: prefix.unwrap_or_default(),
- },
- is_v2: true,
- marker: None,
- continuation_token,
- start_after,
- },
- )
- .await
- } else {
- Err(Error::BadRequest(format!(
- "Invalid endpoint: list-type={}",
- list_type
- )))
- }
- }
- Endpoint::ListMultipartUploads {
- delimiter,
- encoding_type,
- key_marker,
- max_uploads,
- prefix,
- upload_id_marker,
- } => {
- handle_list_multipart_upload(
- garage,
- &ListMultipartUploadsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- prefix: prefix.unwrap_or_default(),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- },
- key_marker,
- upload_id_marker,
- },
- )
- .await
- }
- Endpoint::ListParts {
- key,
- max_parts,
- part_number_marker,
- upload_id,
- } => {
- handle_list_parts(
- garage,
- &ListPartsQuery {
- bucket_name,
- bucket_id,
- key,
- upload_id,
- part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
- max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
- },
- )
- .await
- }
- Endpoint::DeleteObjects {} => {
- handle_delete_objects(garage, bucket_id, req, content_sha256).await
- }
- Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
- Endpoint::PutBucketWebsite {} => {
- handle_put_website(garage, bucket_id, req, content_sha256).await
- }
- Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
- Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
- Endpoint::PutBucketCors {} => handle_put_cors(garage, bucket_id, req, content_sha256).await,
- Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
- endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
- };
-
- // If request was a success and we have a CORS rule that applies to it,
- // add the corresponding CORS headers to the response
- let mut resp_ok = resp?;
- if let Some(rule) = matching_cors_rule {
- add_cors_headers(&mut resp_ok, rule)
- .ok_or_internal_error("Invalid bucket CORS configuration")?;
- }
-
- Ok(resp_ok)
-}
-
-async fn handle_request_without_bucket(
- garage: Arc<Garage>,
- _req: Request<Body>,
- api_key: Key,
- endpoint: Endpoint,
-) -> Result<Response<Body>, Error> {
- match endpoint {
- Endpoint::ListBuckets => handle_list_buckets(&garage, &api_key).await,
- endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
- }
-}
-
-#[allow(clippy::ptr_arg)]
-pub async fn resolve_bucket(
- garage: &Garage,
- bucket_name: &String,
- api_key: &Key,
-) -> Result<Uuid, Error> {
- let api_key_params = api_key
- .state
- .as_option()
- .ok_or_internal_error("Key should not be deleted at this point")?;
-
- if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
- Ok(*bucket_id)
- } else {
- Ok(garage
- .bucket_helper()
- .resolve_global_bucket_name(bucket_name)
- .await?
- .ok_or(Error::NoSuchBucket)?)
- }
-}
-
-/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in
-/// the host header of the request
-///
-/// S3 internally manages only buckets and keys. This function splits
-/// an HTTP path to get the corresponding bucket name and key.
-pub fn parse_bucket_key<'a>(
- path: &'a str,
- host_bucket: Option<&'a str>,
-) -> Result<(&'a str, Option<&'a str>), Error> {
- let path = path.trim_start_matches('/');
-
- if let Some(bucket) = host_bucket {
- if !path.is_empty() {
- return Ok((bucket, Some(path)));
- } else {
- return Ok((bucket, None));
- }
- }
-
- let (bucket, key) = match path.find('/') {
- Some(i) => {
- let key = &path[i + 1..];
- if !key.is_empty() {
- (&path[..i], Some(key))
- } else {
- (&path[..i], None)
- }
- }
- None => (path, None),
- };
- if bucket.is_empty() {
- return Err(Error::BadRequest("No bucket specified".to_string()));
- }
- Ok((bucket, key))
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn parse_bucket_containing_a_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?;
- assert_eq!(bucket, "my_bucket");
- assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
- Ok(())
- }
-
- #[test]
- fn parse_bucket_containing_no_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("/my_bucket/", None)?;
- assert_eq!(bucket, "my_bucket");
- assert!(key.is_none());
- let (bucket, key) = parse_bucket_key("/my_bucket", None)?;
- assert_eq!(bucket, "my_bucket");
- assert!(key.is_none());
- Ok(())
- }
-
- #[test]
- fn parse_bucket_containing_no_bucket() {
- let parsed = parse_bucket_key("", None);
- assert!(parsed.is_err());
- let parsed = parse_bucket_key("/", None);
- assert!(parsed.is_err());
- let parsed = parse_bucket_key("////", None);
- assert!(parsed.is_err());
- }
-
- #[test]
- fn parse_bucket_with_vhost_and_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?;
- assert_eq!(bucket, "my-bucket");
- assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
- Ok(())
- }
-
- #[test]
- fn parse_bucket_with_vhost_no_key() -> Result<(), Error> {
- let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?;
- assert_eq!(bucket, "my-bucket");
- assert!(key.is_none());
- let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?;
- assert_eq!(bucket, "my-bucket");
- assert!(key.is_none());
- Ok(())
- }
-}
diff --git a/src/api/error.rs b/src/api/error.rs
index f53ed1fd..4b7254d2 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -7,7 +7,7 @@ use hyper::{HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
use garage_util::error::Error as GarageError;
-use crate::s3_xml;
+use crate::s3::xml as s3_xml;
/// Errors of this crate
#[derive(Debug, Error)]
@@ -100,6 +100,10 @@ pub enum Error {
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
+ /// The client asked for an invalid return format (invalid Accept header)
+ #[error(display = "Not acceptable: {}", _0)]
+ NotAcceptable(String),
+
/// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String),
@@ -140,6 +144,7 @@ impl Error {
Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT,
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
+ Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
Error::InternalError(
GarageError::Timeout
| GarageError::RemoteError(_)
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
new file mode 100644
index 00000000..9281e596
--- /dev/null
+++ b/src/api/generic_server.rs
@@ -0,0 +1,202 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+
+use hyper::server::conn::AddrStream;
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{Body, Request, Response, Server};
+
+use opentelemetry::{
+ global,
+ metrics::{Counter, ValueRecorder},
+ trace::{FutureExt, SpanRef, TraceContextExt, Tracer},
+ Context, KeyValue,
+};
+
+use garage_util::error::Error as GarageError;
+use garage_util::metrics::{gen_trace_id, RecordDuration};
+
+use crate::error::*;
+
+pub(crate) trait ApiEndpoint: Send + Sync + 'static {
+ fn name(&self) -> &'static str;
+ fn add_span_attributes(&self, span: SpanRef<'_>);
+}
+
+#[async_trait]
+pub(crate) trait ApiHandler: Send + Sync + 'static {
+ const API_NAME: &'static str;
+ const API_NAME_DISPLAY: &'static str;
+
+ type Endpoint: ApiEndpoint;
+
+ fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Error>;
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: Self::Endpoint,
+ ) -> Result<Response<Body>, Error>;
+}
+
+pub(crate) struct ApiServer<A: ApiHandler> {
+ region: String,
+ api_handler: A,
+
+ // Metrics
+ request_counter: Counter<u64>,
+ error_counter: Counter<u64>,
+ request_duration: ValueRecorder<f64>,
+}
+
+impl<A: ApiHandler> ApiServer<A> {
+ pub fn new(region: String, api_handler: A) -> Arc<Self> {
+ let meter = global::meter("garage/api");
+ Arc::new(Self {
+ region,
+ api_handler,
+ request_counter: meter
+ .u64_counter(format!("api.{}.request_counter", A::API_NAME))
+ .with_description(format!(
+ "Number of API calls to the various {} API endpoints",
+ A::API_NAME_DISPLAY
+ ))
+ .init(),
+ error_counter: meter
+ .u64_counter(format!("api.{}.error_counter", A::API_NAME))
+ .with_description(format!(
+ "Number of API calls to the various {} API endpoints that resulted in errors",
+ A::API_NAME_DISPLAY
+ ))
+ .init(),
+ request_duration: meter
+ .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME))
+ .with_description(format!(
+ "Duration of API calls to the various {} API endpoints",
+ A::API_NAME_DISPLAY
+ ))
+ .init(),
+ })
+ }
+
+ pub async fn run_server(
+ self: Arc<Self>,
+ bind_addr: SocketAddr,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let service = make_service_fn(|conn: &AddrStream| {
+ let this = self.clone();
+
+ let client_addr = conn.remote_addr();
+ async move {
+ Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
+ let this = this.clone();
+
+ this.handler(req, client_addr)
+ }))
+ }
+ });
+
+ let server = Server::bind(&bind_addr).serve(service);
+
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ info!(
+ "{} API server listening on http://{}",
+ A::API_NAME_DISPLAY,
+ bind_addr
+ );
+
+ graceful.await?;
+ Ok(())
+ }
+
+ async fn handler(
+ self: Arc<Self>,
+ req: Request<Body>,
+ addr: SocketAddr,
+ ) -> Result<Response<Body>, GarageError> {
+ let uri = req.uri().clone();
+ info!("{} {} {}", addr, req.method(), uri);
+ debug!("{:?}", req);
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let span = tracer
+ .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY))
+ .with_trace_id(gen_trace_id())
+ .with_attributes(vec![
+ KeyValue::new("method", format!("{}", req.method())),
+ KeyValue::new("uri", req.uri().to_string()),
+ ])
+ .start(&tracer);
+
+ let res = self
+ .handler_stage2(req)
+ .with_context(Context::current_with_span(span))
+ .await;
+
+ match res {
+ Ok(x) => {
+ debug!("{} {:?}", x.status(), x.headers());
+ Ok(x)
+ }
+ Err(e) => {
+ let body: Body = Body::from(e.aws_xml(&self.region, uri.path()));
+ let mut http_error_builder = Response::builder()
+ .status(e.http_status_code())
+ .header("Content-Type", "application/xml");
+
+ if let Some(header_map) = http_error_builder.headers_mut() {
+ e.add_headers(header_map)
+ }
+
+ let http_error = http_error_builder.body(body)?;
+
+ if e.http_status_code().is_server_error() {
+ warn!("Response: error {}, {}", e.http_status_code(), e);
+ } else {
+ info!("Response: error {}, {}", e.http_status_code(), e);
+ }
+ Ok(http_error)
+ }
+ }
+ }
+
+ async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
+ let endpoint = self.api_handler.parse_endpoint(&req)?;
+ debug!("Endpoint: {}", endpoint.name());
+
+ let current_context = Context::current();
+ let current_span = current_context.span();
+ current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
+ current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
+ endpoint.add_span_attributes(current_span);
+
+ let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
+
+ let res = self
+ .api_handler
+ .handle(req, endpoint)
+ .record_duration(&self.request_duration, &metrics_tags[..])
+ .await;
+
+ self.request_counter.add(1, &metrics_tags[..]);
+
+ let status_code = match &res {
+ Ok(r) => r.status(),
+ Err(e) => e.http_status_code(),
+ };
+ if status_code.is_client_error() || status_code.is_server_error() {
+ self.error_counter.add(
+ 1,
+ &[
+ metrics_tags[0].clone(),
+ KeyValue::new("status_code", status_code.as_str().to_string()),
+ ],
+ );
+ }
+
+ res
+ }
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index c2709bb3..a994b82f 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,6 +1,25 @@
-use crate::Error;
use idna::domain_to_unicode;
+use garage_util::data::*;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+
+use crate::error::*;
+
+/// What kind of authorization is required to perform a given action
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Authorization {
+ /// No authorization is required
+ None,
+ /// Having Read permission on bucket
+ Read,
+ /// Having Write permission on bucket
+ Write,
+ /// Having Owner permission on bucket
+ Owner,
+}
+
/// Host to bucket
///
/// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket",
@@ -60,11 +79,143 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> {
authority.map(|h| domain_to_unicode(h).0)
}
+#[allow(clippy::ptr_arg)]
+pub async fn resolve_bucket(
+ garage: &Garage,
+ bucket_name: &String,
+ api_key: &Key,
+) -> Result<Uuid, Error> {
+ let api_key_params = api_key
+ .state
+ .as_option()
+ .ok_or_internal_error("Key should not be deleted at this point")?;
+
+ if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
+ Ok(*bucket_id)
+ } else {
+ Ok(garage
+ .bucket_helper()
+ .resolve_global_bucket_name(bucket_name)
+ .await?
+ .ok_or(Error::NoSuchBucket)?)
+ }
+}
+
+/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in
+/// the host header of the request
+///
+/// S3 internally manages only buckets and keys. This function splits
+/// an HTTP path to get the corresponding bucket name and key.
+pub fn parse_bucket_key<'a>(
+ path: &'a str,
+ host_bucket: Option<&'a str>,
+) -> Result<(&'a str, Option<&'a str>), Error> {
+ let path = path.trim_start_matches('/');
+
+ if let Some(bucket) = host_bucket {
+ if !path.is_empty() {
+ return Ok((bucket, Some(path)));
+ } else {
+ return Ok((bucket, None));
+ }
+ }
+
+ let (bucket, key) = match path.find('/') {
+ Some(i) => {
+ let key = &path[i + 1..];
+ if !key.is_empty() {
+ (&path[..i], Some(key))
+ } else {
+ (&path[..i], None)
+ }
+ }
+ None => (path, None),
+ };
+ if bucket.is_empty() {
+ return Err(Error::BadRequest("No bucket specified".to_string()));
+ }
+ Ok((bucket, key))
+}
+
+const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
+
+/// Compute the key after the prefix
+pub fn key_after_prefix(pfx: &str) -> Option<String> {
+ let mut next = pfx.to_string();
+ while !next.is_empty() {
+ let tail = next.pop().unwrap();
+ if tail >= char::MAX {
+ continue;
+ }
+
+ // Circumvent a limitation of RangeFrom that overflow earlier than needed
+ // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
+ let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
+ char::MAX
+ } else {
+ (tail..).nth(1).unwrap()
+ };
+
+ next.push(new_tail);
+ return Some(next);
+ }
+
+ None
+}
+
#[cfg(test)]
mod tests {
use super::*;
#[test]
+ fn parse_bucket_containing_a_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?;
+ assert_eq!(bucket, "my_bucket");
+ assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
+ Ok(())
+ }
+
+ #[test]
+ fn parse_bucket_containing_no_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("/my_bucket/", None)?;
+ assert_eq!(bucket, "my_bucket");
+ assert!(key.is_none());
+ let (bucket, key) = parse_bucket_key("/my_bucket", None)?;
+ assert_eq!(bucket, "my_bucket");
+ assert!(key.is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn parse_bucket_containing_no_bucket() {
+ let parsed = parse_bucket_key("", None);
+ assert!(parsed.is_err());
+ let parsed = parse_bucket_key("/", None);
+ assert!(parsed.is_err());
+ let parsed = parse_bucket_key("////", None);
+ assert!(parsed.is_err());
+ }
+
+ #[test]
+ fn parse_bucket_with_vhost_and_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?;
+ assert_eq!(bucket, "my-bucket");
+ assert_eq!(key.expect("key must be set"), "a/super/file.jpg");
+ Ok(())
+ }
+
+ #[test]
+ fn parse_bucket_with_vhost_no_key() -> Result<(), Error> {
+ let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?;
+ assert_eq!(bucket, "my-bucket");
+ assert!(key.is_none());
+ let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?;
+ assert_eq!(bucket, "my-bucket");
+ assert!(key.is_none());
+ Ok(())
+ }
+
+ #[test]
fn authority_to_host_with_port() -> Result<(), Error> {
let domain = authority_to_host("[::1]:3902")?;
assert_eq!(domain, "[::1]");
@@ -111,4 +262,39 @@ mod tests {
assert_eq!(host_to_bucket("not-garage.tld", "garage.tld"), None);
assert_eq!(host_to_bucket("not-garage.tld", ".garage.tld"), None);
}
+
+ #[test]
+ fn test_key_after_prefix() {
+ use std::iter::FromIterator;
+
+ assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
+ assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
+ assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭");
+ assert_eq!(
+ key_after_prefix("􏿽").unwrap().as_str(),
+ String::from(char::from_u32(0x10FFFE).unwrap())
+ );
+
+ // When the last character is the biggest UTF8 char
+ let a = String::from_iter(['a', char::MAX].iter());
+ assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
+
+ // When all characters are the biggest UTF8 char
+ let b = String::from_iter([char::MAX; 3].iter());
+ assert!(key_after_prefix(b.as_str()).is_none());
+
+ // Check utf8 surrogates
+ let c = String::from('\u{D7FF}');
+ assert_eq!(
+ key_after_prefix(c.as_str()).unwrap().as_str(),
+ String::from('\u{E000}')
+ );
+
+ // Check the character before the biggest one
+ let d = String::from('\u{10FFFE}');
+ assert_eq!(
+ key_after_prefix(d.as_str()).unwrap().as_str(),
+ String::from(char::MAX)
+ );
+ }
}
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
new file mode 100644
index 00000000..5f5e9030
--- /dev/null
+++ b/src/api/k2v/api_server.rs
@@ -0,0 +1,195 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_table::util::*;
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+
+use crate::error::*;
+use crate::generic_server::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::k2v::batch::*;
+use crate::k2v::index::*;
+use crate::k2v::item::*;
+use crate::k2v::router::Endpoint;
+use crate::s3::cors::*;
+
+pub struct K2VApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct K2VApiEndpoint {
+ bucket_name: String,
+ endpoint: Endpoint,
+}
+
+impl K2VApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ if let Some(cfg) = &garage.config.k2v_api {
+ let bind_addr = cfg.api_bind_addr;
+
+ ApiServer::new(
+ garage.config.s3_api.s3_region.clone(),
+ K2VApiServer { garage },
+ )
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ } else {
+ Ok(())
+ }
+ }
+}
+
+#[async_trait]
+impl ApiHandler for K2VApiServer {
+ const API_NAME: &'static str = "k2v";
+ const API_NAME_DISPLAY: &'static str = "K2V";
+
+ type Endpoint = K2VApiEndpoint;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
+ let (endpoint, bucket_name) = Endpoint::from_request(req)?;
+
+ Ok(K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: K2VApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // The OPTIONS method is procesed early, before we even check for an API key
+ if let Endpoint::Options = endpoint {
+ return handle_options_s3api(garage, &req, Some(bucket_name)).await;
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
+ let api_key = api_key.ok_or_else(|| {
+ Error::Forbidden("Garage does not support anonymous access yet".to_string())
+ })?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "k2v",
+ )?;
+
+ let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or(Error::NoSuchBucket)?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::Forbidden(
+ "Operation is not allowed for this key.".to_string(),
+ ));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::DeleteItem {
+ partition_key,
+ sort_key,
+ } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::InsertItem {
+ partition_key,
+ sort_key,
+ } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::ReadItem {
+ partition_key,
+ sort_key,
+ } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::PollItem {
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ } => {
+ handle_poll_item(
+ garage,
+ &req,
+ bucket_id,
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ )
+ .await
+ }
+ Endpoint::ReadIndex {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
+ Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
+ Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
+ Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ Endpoint::Options => unreachable!(),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for K2VApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone()));
+ }
+}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
new file mode 100644
index 00000000..4ecddeb9
--- /dev/null
+++ b/src/api/k2v/batch.rs
@@ -0,0 +1,368 @@
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::{EnumerationOrder, TableSchema};
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_insert_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let items: Vec<InsertBatchItem> =
+ serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+
+ let mut items2 = vec![];
+ for it in items {
+ let ct = it
+ .ct
+ .map(|s| CausalContext::parse(&s))
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+ let v = match it.v {
+ Some(vs) => {
+ DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
+ }
+ None => DvvsValue::Deleted,
+ };
+ items2.push((it.pk, it.sk, ct, v));
+ }
+
+ garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_read_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let queries: Vec<ReadBatchQuery> =
+ serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<ReadBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_read_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: ReadBatchQuery,
+) -> Result<ReadBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: !query.tombstones,
+ conflicts_only: query.conflicts_only,
+ };
+
+ let (items, more, next_start) = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
+ return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into()));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None),
+ None => (vec![], false, None),
+ }
+ } else {
+ let (items, more, next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ query.limit,
+ Some(filter),
+ EnumerationOrder::from_reverse(query.reverse),
+ )
+ .await?;
+
+ let items = items
+ .into_iter()
+ .map(ReadBatchResponseItem::from)
+ .collect::<Vec<_>>();
+
+ (items, more, next_start)
+ };
+
+ Ok(ReadBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ limit: query.limit,
+ reverse: query.reverse,
+ single_item: query.single_item,
+ conflicts_only: query.conflicts_only,
+ tombstones: query.tombstones,
+ items,
+ more,
+ next_start,
+ })
+}
+
+pub async fn handle_delete_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let queries: Vec<DeleteBatchQuery> =
+ serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<DeleteBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_delete_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: DeleteBatchQuery,
+) -> Result<DeleteBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: true,
+ conflicts_only: false,
+ };
+
+ let deleted_items = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() {
+ return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into()));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => {
+ let cc = i.causal_context();
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ .await?;
+ 1
+ }
+ None => 0,
+ }
+ } else {
+ let (items, more, _next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ None,
+ Some(filter),
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ assert!(!more);
+
+ // TODO delete items
+ let items = items
+ .into_iter()
+ .map(|i| {
+ let cc = i.causal_context();
+ (
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ })
+ .collect::<Vec<_>>();
+ let n = items.len();
+
+ garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+
+ n
+ };
+
+ Ok(DeleteBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ single_item: query.single_item,
+ deleted_items,
+ })
+}
+
+#[derive(Deserialize)]
+struct InsertBatchItem {
+ pk: String,
+ sk: String,
+ ct: Option<String>,
+ v: Option<String>,
+}
+
+#[derive(Deserialize)]
+struct ReadBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ limit: Option<u64>,
+ #[serde(default)]
+ reverse: bool,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+ #[serde(default, rename = "conflictsOnly")]
+ conflicts_only: bool,
+ #[serde(default)]
+ tombstones: bool,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+ #[serde(rename = "conflictsOnly")]
+ conflicts_only: bool,
+ tombstones: bool,
+
+ items: Vec<ReadBatchResponseItem>,
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponseItem {
+ sk: String,
+ ct: String,
+ v: Vec<Option<String>>,
+}
+
+impl ReadBatchResponseItem {
+ fn from(i: K2VItem) -> Self {
+ let ct = i.causal_context().serialize();
+ let v = i
+ .values()
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Value(x) => Some(base64::encode(x)),
+ DvvsValue::Deleted => None,
+ })
+ .collect::<Vec<_>>();
+ Self {
+ sk: i.sort_key,
+ ct,
+ v,
+ }
+ }
+}
+
+#[derive(Deserialize)]
+struct DeleteBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+}
+
+#[derive(Serialize)]
+struct DeleteBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+
+ #[serde(rename = "deletedItems")]
+ deleted_items: usize,
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
new file mode 100644
index 00000000..896dbcf0
--- /dev/null
+++ b/src/api/k2v/index.rs
@@ -0,0 +1,100 @@
+use std::sync::Arc;
+
+use hyper::{Body, Response, StatusCode};
+use serde::Serialize;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_rpc::ring::Ring;
+use garage_table::util::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+
+use crate::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_read_index(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+) -> Result<Response<Body>, Error> {
+ let reverse = reverse.unwrap_or(false);
+
+ let ring: Arc<Ring> = garage.system.ring.borrow().clone();
+
+ let (partition_keys, more, next_start) = read_range(
+ &garage.k2v.counter_table.table,
+ &bucket_id,
+ &prefix,
+ &start,
+ &end,
+ limit,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ EnumerationOrder::from_reverse(reverse),
+ )
+ .await?;
+
+ let s_entries = ENTRIES.to_string();
+ let s_conflicts = CONFLICTS.to_string();
+ let s_values = VALUES.to_string();
+ let s_bytes = BYTES.to_string();
+
+ let resp = ReadIndexResponse {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ partition_keys: partition_keys
+ .into_iter()
+ .map(|part| {
+ let vals = part.filtered_values(&ring);
+ ReadIndexResponseEntry {
+ pk: part.sk,
+ entries: *vals.get(&s_entries).unwrap_or(&0),
+ conflicts: *vals.get(&s_conflicts).unwrap_or(&0),
+ values: *vals.get(&s_values).unwrap_or(&0),
+ bytes: *vals.get(&s_bytes).unwrap_or(&0),
+ }
+ })
+ .collect::<Vec<_>>(),
+ more,
+ next_start,
+ };
+
+ let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponse {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+
+ #[serde(rename = "partitionKeys")]
+ partition_keys: Vec<ReadIndexResponseEntry>,
+
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponseEntry {
+ pk: String,
+ entries: i64,
+ conflicts: i64,
+ values: i64,
+ bytes: i64,
+}
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
new file mode 100644
index 00000000..1860863e
--- /dev/null
+++ b/src/api/k2v/item.rs
@@ -0,0 +1,230 @@
+use std::sync::Arc;
+
+use http::header;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_util::data::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::error::*;
+
+pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+
+pub enum ReturnFormat {
+ Json,
+ Binary,
+ Either,
+}
+
+impl ReturnFormat {
+ pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ let accept = match req.headers().get(header::ACCEPT) {
+ Some(a) => a.to_str()?,
+ None => return Ok(Self::Json),
+ };
+
+ let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>();
+ let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*");
+ let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*");
+
+ match (accept_json, accept_binary) {
+ (true, true) => Ok(Self::Either),
+ (true, false) => Ok(Self::Json),
+ (false, true) => Ok(Self::Binary),
+ (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())),
+ }
+ }
+
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ let vals = item.values();
+
+ if vals.is_empty() {
+ return Err(Error::NoSuchKey);
+ }
+
+ let ct = item.causal_context().serialize();
+ match self {
+ Self::Binary if vals.len() > 1 => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .status(StatusCode::CONFLICT)
+ .body(Body::empty())?),
+ Self::Binary => {
+ assert!(vals.len() == 1);
+ Self::make_binary_response(ct, vals[0])
+ }
+ Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]),
+ _ => Self::make_json_response(ct, &vals[..]),
+ }
+ }
+
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ match v {
+ DvvsValue::Deleted => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?),
+ DvvsValue::Value(v) => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::OK)
+ .body(Body::from(v.to_vec()))?),
+ }
+ }
+
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ let items = v
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => serde_json::Value::Null,
+ DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)),
+ })
+ .collect::<Vec<_>>();
+ let json_body =
+ serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?;
+ Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/json")
+ .status(StatusCode::OK)
+ .body(Body::from(json_body))?)
+ }
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_read_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &String,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let item = garage
+ .k2v
+ .item_table
+ .get(
+ &K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ sort_key,
+ )
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ format.make_response(&item)
+}
+
+pub async fn handle_insert_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let value = DvvsValue::Value(body.to_vec());
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_delete_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let value = DvvsValue::Deleted;
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_poll_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout_secs: Option<u64>,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let causal_context =
+ CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
+
+ let item = garage
+ .k2v
+ .rpc
+ .poll(
+ bucket_id,
+ partition_key,
+ sort_key,
+ causal_context,
+ timeout_secs.unwrap_or(300) * 1000,
+ )
+ .await?;
+
+ if let Some(item) = item {
+ format.make_response(&item)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs
new file mode 100644
index 00000000..ee210ad5
--- /dev/null
+++ b/src/api/k2v/mod.rs
@@ -0,0 +1,8 @@
+pub mod api_server;
+mod router;
+
+mod batch;
+mod index;
+mod item;
+
+mod range;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
new file mode 100644
index 00000000..cd019723
--- /dev/null
+++ b/src/api/k2v/range.rs
@@ -0,0 +1,96 @@
+//! Utility module for retrieving ranges of items in Garage tables
+//! Implements parameters (prefix, start, end, limit) as specified
+//! for endpoints ReadIndex, ReadBatch and DeleteBatch
+
+use std::sync::Arc;
+
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::error::*;
+use crate::helpers::key_after_prefix;
+
+/// Read range in a Garage table.
+/// Returns (entries, more?, nextStart)
+#[allow(clippy::too_many_arguments)]
+pub(crate) async fn read_range<F>(
+ table: &Arc<Table<F, TableShardedReplication>>,
+ partition_key: &F::P,
+ prefix: &Option<String>,
+ start: &Option<String>,
+ end: &Option<String>,
+ limit: Option<u64>,
+ filter: Option<F::Filter>,
+ enumeration_order: EnumerationOrder,
+) -> Result<(Vec<F::E>, bool, Option<String>), Error>
+where
+ F: TableSchema<S = String> + 'static,
+{
+ let (mut start, mut start_ignore) = match (prefix, start) {
+ (None, None) => (None, false),
+ (None, Some(s)) => (Some(s.clone()), false),
+ (Some(p), Some(s)) => {
+ if !s.starts_with(p) {
+ return Err(Error::BadRequest(format!(
+ "Start key '{}' does not start with prefix '{}'",
+ s, p
+ )));
+ }
+ (Some(s.clone()), false)
+ }
+ (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
+ let start = key_after_prefix(p)
+ .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
+ (Some(start), true)
+ }
+ (Some(p), None) => (Some(p.clone()), false),
+ };
+
+ let mut entries = vec![];
+ loop {
+ let n_get = std::cmp::min(
+ 1000,
+ limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
+ );
+ let get_ret = table
+ .get_range(
+ partition_key,
+ start.clone(),
+ filter.clone(),
+ n_get,
+ enumeration_order,
+ )
+ .await?;
+
+ let get_ret_len = get_ret.len();
+
+ for entry in get_ret {
+ if start_ignore && Some(entry.sort_key()) == start.as_ref() {
+ continue;
+ }
+ if let Some(p) = prefix {
+ if !entry.sort_key().starts_with(p) {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(e) = end {
+ if entry.sort_key() == e {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(l) = limit {
+ if entries.len() >= l as usize {
+ return Ok((entries, true, Some(entry.sort_key().clone())));
+ }
+ }
+ entries.push(entry);
+ }
+
+ if get_ret_len < n_get {
+ return Ok((entries, false, None));
+ }
+
+ start = Some(entries.last().unwrap().sort_key().clone());
+ start_ignore = true;
+ }
+}
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
new file mode 100644
index 00000000..f948ffce
--- /dev/null
+++ b/src/api/k2v/router.rs
@@ -0,0 +1,252 @@
+use crate::error::*;
+
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
+
+router_match! {@func
+
+
+/// List of all K2V API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ DeleteBatch {
+ },
+ DeleteItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ InsertBatch {
+ },
+ InsertItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ Options,
+ PollItem {
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout: Option<u64>,
+ },
+ ReadBatch {
+ },
+ ReadIndex {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+ },
+ ReadItem {
+ partition_key: String,
+ sort_key: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> {
+ let uri = req.uri();
+ let path = uri.path().trim_start_matches('/');
+ let query = uri.query();
+
+ let (bucket, partition_key) = path
+ .split_once('/')
+ .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
+ .unwrap_or((path.to_owned(), ""));
+
+ if bucket.is_empty() {
+ return Err(Error::BadRequest("Missing bucket name".to_owned()));
+ }
+
+ if *req.method() == Method::OPTIONS {
+ return Ok((Self::Options, bucket));
+ }
+
+ let partition_key = percent_encoding::percent_decode_str(partition_key)
+ .decode_utf8()?
+ .into_owned();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let method_search = Method::from_bytes(b"SEARCH").unwrap();
+ let res = match *req.method() {
+ Method::GET => Self::from_get(partition_key, &mut query)?,
+ //&Method::HEAD => Self::from_head(partition_key, &mut query)?,
+ Method::POST => Self::from_post(partition_key, &mut query)?,
+ Method::PUT => Self::from_put(partition_key, &mut query)?,
+ Method::DELETE => Self::from_delete(partition_key, &mut query)?,
+ _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
+ _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ };
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+ Ok((res, bucket))
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a GET.
+ fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout),
+ EMPTY => ReadItem (query::sort_key),
+ ],
+ no_key: [
+ EMPTY => ReadIndex (query_opt::prefix, query_opt::start, query_opt::end, opt_parse::limit, opt_parse::reverse),
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a SEARCH.
+ fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => ReadBatch,
+ ]
+ }
+ }
+
+ /*
+ /// Determine which endpoint a request is for, knowing it is a HEAD.
+ fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id),
+ ],
+ no_key: [
+ EMPTY => HeadBucket,
+ ]
+ }
+ }
+ */
+
+ /// Determine which endpoint a request is for, knowing it is a POST.
+ fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => InsertBatch,
+ DELETE => DeleteBatch,
+ SEARCH => ReadBatch,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a PUT.
+ fn from_put(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => InsertItem (query::sort_key),
+
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a DELETE.
+ fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => DeleteItem (query::sort_key),
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Get the partition key the request target. Returns None for requests which don't use a partition key.
+ #[allow(dead_code)]
+ pub fn get_partition_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ partition_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the sort key the request target. Returns None for requests which don't use a sort key.
+ #[allow(dead_code)]
+ pub fn get_sort_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ sort_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ let readonly = router_match! {
+ @match
+ self,
+ [
+ PollItem,
+ ReadBatch,
+ ReadIndex,
+ ReadItem,
+ ]
+ };
+ if readonly {
+ Authorization::Read
+ } else {
+ Authorization::Write
+ }
+ }
+}
+
+// parameter name => struct field
+generateQueryParameters! {
+ "prefix" => prefix,
+ "start" => start,
+ "causality_token" => causality_token,
+ "end" => end,
+ "limit" => limit,
+ "reverse" => reverse,
+ "sort_key" => sort_key,
+ "timeout" => timeout
+}
+
+mod keywords {
+ //! This module contain all query parameters with no associated value
+ //! used to differentiate endpoints.
+ pub const EMPTY: &str = "";
+
+ pub const DELETE: &str = "delete";
+ pub const SEARCH: &str = "search";
+}
diff --git a/src/api/lib.rs b/src/api/lib.rs
index de60ec53..0078f7b5 100644
--- a/src/api/lib.rs
+++ b/src/api/lib.rs
@@ -6,22 +6,12 @@ pub mod error;
pub use error::Error;
mod encoding;
-
-mod api_server;
-pub use api_server::run_api_server;
-
+mod generic_server;
+pub mod helpers;
+mod router_macros;
/// This mode is public only to help testing. Don't expect stability here
pub mod signature;
-pub mod helpers;
-mod s3_bucket;
-mod s3_copy;
-pub mod s3_cors;
-mod s3_delete;
-pub mod s3_get;
-mod s3_list;
-mod s3_post_object;
-mod s3_put;
-mod s3_router;
-mod s3_website;
-mod s3_xml;
+#[cfg(feature = "k2v")]
+pub mod k2v;
+pub mod s3;
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs
new file mode 100644
index 00000000..8471407c
--- /dev/null
+++ b/src/api/router_macros.rs
@@ -0,0 +1,190 @@
+/// This macro is used to generate very repetitive match {} blocks in this module
+/// It is _not_ made to be used anywhere else
+macro_rules! router_match {
+ (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{
+ // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] }
+ // returns true if the variant was one of the listed variants, false otherwise.
+ use Endpoint::*;
+ match $enum {
+ $(
+ $endpoint { .. } => true,
+ )*
+ _ => false
+ }
+ }};
+ (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{
+ // usage: router_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] }
+ // returns Some(field_value), or None if the variant was not one of the listed variants.
+ use Endpoint::*;
+ match $enum {
+ $(
+ $endpoint {$param, ..} => Some($param),
+ )*
+ _ => None
+ }
+ }};
+ (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr),
+ key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
+ no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
+ // usage: router_match {@gen_parser (keyword, key, query, header),
+ // key: [
+ // SOME_KEYWORD => VariantWithKey,
+ // ...
+ // ],
+ // no_key: [
+ // SOME_KEYWORD => VariantWithoutKey,
+ // ...
+ // ]
+ // }
+ // See in from_{method} for more detailed usage.
+ use Endpoint::*;
+ use keywords::*;
+ match ($keyword, !$key.is_empty()){
+ $(
+ ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k {
+ $key,
+ $($(
+ $param_k: router_match!(@@parse_param $query, $conv_k, $param_k),
+ )*)?
+ }),
+ )*
+ $(
+ ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk {
+ $($(
+ $param_nk: router_match!(@@parse_param $query, $conv_nk, $param_nk),
+ )*)?
+ }),
+ )*
+ (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw)))
+ }
+ }};
+
+ (@@parse_param $query:expr, query_opt, $param:ident) => {{
+ // extract optional query parameter
+ $query.$param.take().map(|param| param.into_owned())
+ }};
+ (@@parse_param $query:expr, query, $param:ident) => {{
+ // extract mendatory query parameter
+ $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned()
+ }};
+ (@@parse_param $query:expr, opt_parse, $param:ident) => {{
+ // extract and parse optional query parameter
+ // missing parameter is file, however parse error is reported as an error
+ $query.$param
+ .take()
+ .map(|param| param.parse())
+ .transpose()
+ .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
+ }};
+ (@@parse_param $query:expr, parse, $param:ident) => {{
+ // extract and parse mandatory query parameter
+ // both missing and un-parseable parameters are reported as errors
+ $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?
+ .parse()
+ .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
+ }};
+ (@func
+ $(#[$doc:meta])*
+ pub enum Endpoint {
+ $(
+ $(#[$outer:meta])*
+ $variant:ident $({
+ $($name:ident: $ty:ty,)*
+ })?,
+ )*
+ }) => {
+ $(#[$doc])*
+ pub enum Endpoint {
+ $(
+ $(#[$outer])*
+ $variant $({
+ $($name: $ty, )*
+ })?,
+ )*
+ }
+ impl Endpoint {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)*
+ }
+ }
+ }
+ };
+ (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => {
+ $($then)*
+ };
+ (@if () then ($($then:tt)*) else ($($else:tt)*)) => {
+ $($else)*
+ };
+}
+
+/// This macro is used to generate part of the code in this module. It must be called only one, and
+/// is useless outside of this module.
+macro_rules! generateQueryParameters {
+ ( $($rest:expr => $name:ident),* ) => {
+ /// Struct containing all query parameters used in endpoints. Think of it as an HashMap,
+ /// but with keys statically known.
+ #[derive(Debug, Default)]
+ struct QueryParameters<'a> {
+ keyword: Option<Cow<'a, str>>,
+ $(
+ $name: Option<Cow<'a, str>>,
+ )*
+ }
+
+ impl<'a> QueryParameters<'a> {
+ /// Build this struct from the query part of an URI.
+ fn from_query(query: &'a str) -> Result<Self, Error> {
+ let mut res: Self = Default::default();
+ for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
+ let repeated = match k.as_ref() {
+ $(
+ $rest => if !v.is_empty() {
+ res.$name.replace(v).is_some()
+ } else {
+ false
+ },
+ )*
+ _ => {
+ if k.starts_with("response-") || k.starts_with("X-Amz-") {
+ false
+ } else if v.as_ref().is_empty() {
+ if res.keyword.replace(k).is_some() {
+ return Err(Error::BadRequest("Multiple keywords".to_owned()));
+ }
+ continue;
+ } else {
+ debug!("Received an unknown query parameter: '{}'", k);
+ false
+ }
+ }
+ };
+ if repeated {
+ return Err(Error::BadRequest(format!(
+ "Query parameter repeated: '{}'",
+ k
+ )));
+ }
+ }
+ Ok(res)
+ }
+
+ /// Get an error message in case not all parameters where used when extracting them to
+ /// build an Enpoint variant
+ fn nonempty_message(&self) -> Option<&str> {
+ if self.keyword.is_some() {
+ Some("Keyword not used")
+ } $(
+ else if self.$name.is_some() {
+ Some(concat!("'", $rest, "'"))
+ }
+ )* else {
+ None
+ }
+ }
+ }
+ }
+}
+
+pub(crate) use generateQueryParameters;
+pub(crate) use router_match;
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
new file mode 100644
index 00000000..78a69d53
--- /dev/null
+++ b/src/api/s3/api_server.rs
@@ -0,0 +1,401 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::header;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_table::util::*;
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+
+use crate::error::*;
+use crate::generic_server::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::s3::bucket::*;
+use crate::s3::copy::*;
+use crate::s3::cors::*;
+use crate::s3::delete::*;
+use crate::s3::get::*;
+use crate::s3::list::*;
+use crate::s3::post_object::handle_post_object;
+use crate::s3::put::*;
+use crate::s3::router::Endpoint;
+use crate::s3::website::*;
+
+pub struct S3ApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct S3ApiEndpoint {
+ bucket_name: Option<String>,
+ endpoint: Endpoint,
+}
+
+impl S3ApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let addr = garage.config.s3_api.api_bind_addr;
+
+ ApiServer::new(
+ garage.config.s3_api.s3_region.clone(),
+ S3ApiServer { garage },
+ )
+ .run_server(addr, shutdown_signal)
+ .await
+ }
+
+ async fn handle_request_without_bucket(
+ &self,
+ _req: Request<Body>,
+ api_key: Key,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ match endpoint {
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ }
+ }
+}
+
+#[async_trait]
+impl ApiHandler for S3ApiServer {
+ const API_NAME: &'static str = "s3";
+ const API_NAME_DISPLAY: &'static str = "S3";
+
+ type Endpoint = S3ApiEndpoint;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ let authority = req
+ .headers()
+ .get(header::HOST)
+ .ok_or_bad_request("Host header required")?
+ .to_str()?;
+
+ let host = authority_to_host(authority)?;
+
+ let bucket_name = self
+ .garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .and_then(|root_domain| host_to_bucket(&host, root_domain));
+
+ let (endpoint, bucket_name) =
+ Endpoint::from_request(req, bucket_name.map(ToOwned::to_owned))?;
+
+ Ok(S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: S3ApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // Some endpoints are processed early, before we even check for an API key
+ if let Endpoint::PostObject = endpoint {
+ return handle_post_object(garage, req, bucket_name.unwrap()).await;
+ }
+ if let Endpoint::Options = endpoint {
+ return handle_options_s3api(garage, &req, bucket_name).await;
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
+ let api_key = api_key.ok_or_else(|| {
+ Error::Forbidden("Garage does not support anonymous access yet".to_string())
+ })?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "s3",
+ )?;
+
+ let bucket_name = match bucket_name {
+ None => {
+ return self
+ .handle_request_without_bucket(req, api_key, endpoint)
+ .await
+ }
+ Some(bucket) => bucket.to_string(),
+ };
+
+ // Special code path for CreateBucket API endpoint
+ if let Endpoint::CreateBucket {} = endpoint {
+ return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
+ }
+
+ let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or(Error::NoSuchBucket)?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::Forbidden(
+ "Operation is not allowed for this key.".to_string(),
+ ));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::HeadObject {
+ key, part_number, ..
+ } => handle_head(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::GetObject {
+ key, part_number, ..
+ } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::UploadPart {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_put_part(
+ garage,
+ req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CopyObject { key } => {
+ handle_copy(garage, &api_key, &req, bucket_id, &key).await
+ }
+ Endpoint::UploadPartCopy {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_upload_part_copy(
+ garage,
+ &api_key,
+ &req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ )
+ .await
+ }
+ Endpoint::PutObject { key } => {
+ handle_put(garage, req, bucket_id, &key, content_sha256).await
+ }
+ Endpoint::AbortMultipartUpload { key, upload_id } => {
+ handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
+ }
+ Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
+ Endpoint::CreateMultipartUpload { key } => {
+ handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
+ }
+ Endpoint::CompleteMultipartUpload { key, upload_id } => {
+ handle_complete_multipart_upload(
+ garage,
+ req,
+ &bucket_name,
+ bucket_id,
+ &key,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CreateBucket {} => unreachable!(),
+ Endpoint::HeadBucket {} => {
+ let empty_body: Body = Body::from(vec![]);
+ let response = Response::builder().body(empty_body).unwrap();
+ Ok(response)
+ }
+ Endpoint::DeleteBucket {} => {
+ handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
+ }
+ Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
+ Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
+ Endpoint::ListObjects {
+ delimiter,
+ encoding_type,
+ marker,
+ max_keys,
+ prefix,
+ } => {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ is_v2: false,
+ marker,
+ continuation_token: None,
+ start_after: None,
+ },
+ )
+ .await
+ }
+ Endpoint::ListObjectsV2 {
+ delimiter,
+ encoding_type,
+ max_keys,
+ prefix,
+ continuation_token,
+ start_after,
+ list_type,
+ ..
+ } => {
+ if list_type == "2" {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ prefix: prefix.unwrap_or_default(),
+ },
+ is_v2: true,
+ marker: None,
+ continuation_token,
+ start_after,
+ },
+ )
+ .await
+ } else {
+ Err(Error::BadRequest(format!(
+ "Invalid endpoint: list-type={}",
+ list_type
+ )))
+ }
+ }
+ Endpoint::ListMultipartUploads {
+ delimiter,
+ encoding_type,
+ key_marker,
+ max_uploads,
+ prefix,
+ upload_id_marker,
+ } => {
+ handle_list_multipart_upload(
+ garage,
+ &ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ key_marker,
+ upload_id_marker,
+ },
+ )
+ .await
+ }
+ Endpoint::ListParts {
+ key,
+ max_parts,
+ part_number_marker,
+ upload_id,
+ } => {
+ handle_list_parts(
+ garage,
+ &ListPartsQuery {
+ bucket_name,
+ bucket_id,
+ key,
+ upload_id,
+ part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
+ max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ },
+ )
+ .await
+ }
+ Endpoint::DeleteObjects {} => {
+ handle_delete_objects(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
+ Endpoint::PutBucketWebsite {} => {
+ handle_put_website(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
+ Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
+ Endpoint::PutBucketCors {} => {
+ handle_put_cors(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for S3ApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new(
+ "bucket",
+ self.bucket_name.clone().unwrap_or_default(),
+ ));
+ }
+}
diff --git a/src/api/s3_bucket.rs b/src/api/s3/bucket.rs
index 8a5407d3..93048a8c 100644
--- a/src/api/s3_bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -7,15 +7,15 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use garage_model::object_table::ObjectFilter;
use garage_model::permission::BucketKeyPerm;
+use garage_model::s3::object_table::ObjectFilter;
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
use crate::error::*;
-use crate::s3_xml;
+use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
@@ -230,7 +230,13 @@ pub async fn handle_delete_bucket(
// Check bucket is empty
let objects = garage
.object_table
- .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
+ .get_range(
+ &bucket_id,
+ None,
+ Some(ObjectFilter::IsData),
+ 10,
+ EnumerationOrder::Forward,
+ )
.await?;
if !objects.is_empty() {
return Err(Error::BucketNotEmpty);
diff --git a/src/api/s3_copy.rs b/src/api/s3/copy.rs
index fc4707e2..4e94d887 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3/copy.rs
@@ -12,16 +12,16 @@ use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
-use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
-use crate::api_server::{parse_bucket_key, resolve_bucket};
use crate::error::*;
-use crate::s3_put::{decode_upload_id, get_headers};
-use crate::s3_xml::{self, xmlns_tag};
+use crate::helpers::{parse_bucket_key, resolve_bucket};
+use crate::s3::put::{decode_upload_id, get_headers};
+use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
@@ -619,7 +619,7 @@ pub struct CopyPartResult {
#[cfg(test)]
mod tests {
use super::*;
- use crate::s3_xml::to_xml_with_header;
+ use crate::s3::xml::to_xml_with_header;
#[test]
fn copy_object_result() -> Result<(), Error> {
diff --git a/src/api/s3_cors.rs b/src/api/s3/cors.rs
index ab77e23a..37ea2e43 100644
--- a/src/api/s3_cors.rs
+++ b/src/api/s3/cors.rs
@@ -10,7 +10,7 @@ use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use crate::error::*;
-use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
diff --git a/src/api/s3_delete.rs b/src/api/s3/delete.rs
index b243d982..1e3f1249 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3/delete.rs
@@ -6,10 +6,10 @@ use garage_util::data::*;
use garage_util::time::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
+use garage_model::s3::object_table::*;
use crate::error::*;
-use crate::s3_xml;
+use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
async fn handle_delete_internal(
diff --git a/src/api/s3_get.rs b/src/api/s3/get.rs
index 7f647e15..3edf22a6 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3/get.rs
@@ -14,8 +14,8 @@ use garage_table::EmptyKey;
use garage_util::data::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
use crate::error::*;
diff --git a/src/api/s3_list.rs b/src/api/s3/list.rs
index 5852fc1b..e2848c57 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3/list.rs
@@ -10,15 +10,16 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::Version;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::Version;
-use garage_table::EmptyKey;
+use garage_table::{EmptyKey, EnumerationOrder};
use crate::encoding::*;
use crate::error::*;
-use crate::s3_put;
-use crate::s3_xml;
+use crate::helpers::key_after_prefix;
+use crate::s3::put as s3_put;
+use crate::s3::xml as s3_xml;
const DUMMY_NAME: &str = "Dummy Key";
const DUMMY_KEY: &str = "GKDummyKey";
@@ -66,8 +67,14 @@ pub async fn handle_list(
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
- t.get_range(&bucket, key, Some(ObjectFilter::IsData), count)
- .await
+ t.get_range(
+ &bucket,
+ key,
+ Some(ObjectFilter::IsData),
+ count,
+ EnumerationOrder::Forward,
+ )
+ .await
}
};
@@ -165,8 +172,14 @@ pub async fn handle_list_multipart_upload(
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
- t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count)
- .await
+ t.get_range(
+ &bucket,
+ key,
+ Some(ObjectFilter::IsUploading),
+ count,
+ EnumerationOrder::Forward,
+ )
+ .await
}
};
@@ -923,39 +936,13 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
}
}
-const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
-
-/// Compute the key after the prefix
-fn key_after_prefix(pfx: &str) -> Option<String> {
- let mut next = pfx.to_string();
- while !next.is_empty() {
- let tail = next.pop().unwrap();
- if tail >= char::MAX {
- continue;
- }
-
- // Circumvent a limitation of RangeFrom that overflow earlier than needed
- // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
- let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
- char::MAX
- } else {
- (tail..).nth(1).unwrap()
- };
-
- next.push(new_tail);
- return Some(next);
- }
-
- None
-}
-
/*
* Unit tests of this module
*/
#[cfg(test)]
mod tests {
use super::*;
- use garage_model::version_table::*;
+ use garage_model::s3::version_table::*;
use garage_util::*;
use std::iter::FromIterator;
@@ -1003,39 +990,6 @@ mod tests {
}
#[test]
- fn test_key_after_prefix() {
- assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
- assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
- assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭");
- assert_eq!(
- key_after_prefix("􏿽").unwrap().as_str(),
- String::from(char::from_u32(0x10FFFE).unwrap())
- );
-
- // When the last character is the biggest UTF8 char
- let a = String::from_iter(['a', char::MAX].iter());
- assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
-
- // When all characters are the biggest UTF8 char
- let b = String::from_iter([char::MAX; 3].iter());
- assert!(key_after_prefix(b.as_str()).is_none());
-
- // Check utf8 surrogates
- let c = String::from('\u{D7FF}');
- assert_eq!(
- key_after_prefix(c.as_str()).unwrap().as_str(),
- String::from('\u{E000}')
- );
-
- // Check the character before the biggest one
- let d = String::from('\u{10FFFE}');
- assert_eq!(
- key_after_prefix(d.as_str()).unwrap().as_str(),
- String::from(char::MAX)
- );
- }
-
- #[test]
fn test_common_prefixes() {
let mut query = query();
let objs = objs();
diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs
new file mode 100644
index 00000000..3f5c1915
--- /dev/null
+++ b/src/api/s3/mod.rs
@@ -0,0 +1,14 @@
+pub mod api_server;
+
+mod bucket;
+mod copy;
+pub mod cors;
+mod delete;
+pub mod get;
+mod list;
+mod post_object;
+mod put;
+mod website;
+
+mod router;
+pub mod xml;
diff --git a/src/api/s3_post_object.rs b/src/api/s3/post_object.rs
index 585e0304..86fa7880 100644
--- a/src/api/s3_post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -14,10 +14,10 @@ use serde::Deserialize;
use garage_model::garage::Garage;
-use crate::api_server::resolve_bucket;
use crate::error::*;
-use crate::s3_put::{get_headers, save_stream};
-use crate::s3_xml;
+use crate::helpers::resolve_bucket;
+use crate::s3::put::{get_headers, save_stream};
+use crate::s3::xml as s3_xml;
use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
@@ -119,7 +119,15 @@ pub async fn handle_post_object(
};
let date = parse_date(date)?;
- let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?;
+ let api_key = verify_v4(
+ &garage,
+ "s3",
+ credential,
+ &date,
+ signature,
+ policy.as_bytes(),
+ )
+ .await?;
let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
diff --git a/src/api/s3_put.rs b/src/api/s3/put.rs
index ed0bf00b..89aa8d84 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3/put.rs
@@ -14,13 +14,13 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
-use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
use crate::error::*;
-use crate::s3_xml;
+use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
pub async fn handle_put(
diff --git a/src/api/s3_router.rs b/src/api/s3/router.rs
index 95a7eceb..0525c649 100644
--- a/src/api/s3_router.rs
+++ b/src/api/s3/router.rs
@@ -5,127 +5,10 @@ use std::borrow::Cow;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, Method, Request};
-/// This macro is used to generate very repetitive match {} blocks in this module
-/// It is _not_ made to be used anywhere else
-macro_rules! s3_match {
- (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{
- // usage: s3_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] }
- // returns true if the variant was one of the listed variants, false otherwise.
- use Endpoint::*;
- match $enum {
- $(
- $endpoint { .. } => true,
- )*
- _ => false
- }
- }};
- (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{
- // usage: s3_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] }
- // returns Some(field_value), or None if the variant was not one of the listed variants.
- use Endpoint::*;
- match $enum {
- $(
- $endpoint {$param, ..} => Some($param),
- )*
- _ => None
- }
- }};
- (@gen_parser ($keyword:expr, $key:expr, $query:expr, $header:expr),
- key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
- no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
- // usage: s3_match {@gen_parser (keyword, key, query, header),
- // key: [
- // SOME_KEYWORD => VariantWithKey,
- // ...
- // ],
- // no_key: [
- // SOME_KEYWORD => VariantWithoutKey,
- // ...
- // ]
- // }
- // See in from_{method} for more detailed usage.
- use Endpoint::*;
- use keywords::*;
- match ($keyword, !$key.is_empty()){
- $(
- ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k {
- key: $key,
- $($(
- $param_k: s3_match!(@@parse_param $query, $conv_k, $param_k),
- )*)?
- }),
- )*
- $(
- ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk {
- $($(
- $param_nk: s3_match!(@@parse_param $query, $conv_nk, $param_nk),
- )*)?
- }),
- )*
- (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw)))
- }
- }};
-
- (@@parse_param $query:expr, query_opt, $param:ident) => {{
- // extract optional query parameter
- $query.$param.take().map(|param| param.into_owned())
- }};
- (@@parse_param $query:expr, query, $param:ident) => {{
- // extract mendatory query parameter
- $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned()
- }};
- (@@parse_param $query:expr, opt_parse, $param:ident) => {{
- // extract and parse optional query parameter
- // missing parameter is file, however parse error is reported as an error
- $query.$param
- .take()
- .map(|param| param.parse())
- .transpose()
- .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
- }};
- (@@parse_param $query:expr, parse, $param:ident) => {{
- // extract and parse mandatory query parameter
- // both missing and un-parseable parameters are reported as errors
- $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?
- .parse()
- .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))?
- }};
- (@func
- $(#[$doc:meta])*
- pub enum Endpoint {
- $(
- $(#[$outer:meta])*
- $variant:ident $({
- $($name:ident: $ty:ty,)*
- })?,
- )*
- }) => {
- $(#[$doc])*
- pub enum Endpoint {
- $(
- $(#[$outer])*
- $variant $({
- $($name: $ty, )*
- })?,
- )*
- }
- impl Endpoint {
- pub fn name(&self) -> &'static str {
- match self {
- $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)*
- }
- }
- }
- };
- (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => {
- $($then)*
- };
- (@if () then ($($then:tt)*) else ($($else:tt)*)) => {
- $($else)*
- };
-}
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
-s3_match! {@func
+router_match! {@func
/// List of all S3 API endpoints.
///
@@ -471,7 +354,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a GET.
fn from_get(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -528,7 +411,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a HEAD.
fn from_head(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -542,7 +425,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a POST.
fn from_post(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -564,7 +447,7 @@ impl Endpoint {
query: &mut QueryParameters<'_>,
headers: &HeaderMap<HeaderValue>,
) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, headers),
key: [
@@ -606,7 +489,7 @@ impl Endpoint {
/// Determine which endpoint a request is for, knowing it is a DELETE.
fn from_delete(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
- s3_match! {
+ router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
key: [
@@ -636,7 +519,7 @@ impl Endpoint {
/// Get the key the request target. Returns None for requests which don't use a key.
#[allow(dead_code)]
pub fn get_key(&self) -> Option<&str> {
- s3_match! {
+ router_match! {
@extract
self,
key,
@@ -673,7 +556,7 @@ impl Endpoint {
if let Endpoint::ListBuckets = self {
return Authorization::None;
};
- let readonly = s3_match! {
+ let readonly = router_match! {
@match
self,
[
@@ -717,7 +600,7 @@ impl Endpoint {
SelectObjectContent,
]
};
- let owner = s3_match! {
+ let owner = router_match! {
@match
self,
[
@@ -740,87 +623,6 @@ impl Endpoint {
}
}
-/// What kind of authorization is required to perform a given action
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum Authorization {
- /// No authorization is required
- None,
- /// Having Read permission on bucket
- Read,
- /// Having Write permission on bucket
- Write,
- /// Having Owner permission on bucket
- Owner,
-}
-
-/// This macro is used to generate part of the code in this module. It must be called only one, and
-/// is useless outside of this module.
-macro_rules! generateQueryParameters {
- ( $($rest:expr => $name:ident),* ) => {
- /// Struct containing all query parameters used in endpoints. Think of it as an HashMap,
- /// but with keys statically known.
- #[derive(Debug, Default)]
- struct QueryParameters<'a> {
- keyword: Option<Cow<'a, str>>,
- $(
- $name: Option<Cow<'a, str>>,
- )*
- }
-
- impl<'a> QueryParameters<'a> {
- /// Build this struct from the query part of an URI.
- fn from_query(query: &'a str) -> Result<Self, Error> {
- let mut res: Self = Default::default();
- for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
- let repeated = match k.as_ref() {
- $(
- $rest => if !v.is_empty() {
- res.$name.replace(v).is_some()
- } else {
- false
- },
- )*
- _ => {
- if k.starts_with("response-") || k.starts_with("X-Amz-") {
- false
- } else if v.as_ref().is_empty() {
- if res.keyword.replace(k).is_some() {
- return Err(Error::BadRequest("Multiple keywords".to_owned()));
- }
- continue;
- } else {
- debug!("Received an unknown query parameter: '{}'", k);
- false
- }
- }
- };
- if repeated {
- return Err(Error::BadRequest(format!(
- "Query parameter repeated: '{}'",
- k
- )));
- }
- }
- Ok(res)
- }
-
- /// Get an error message in case not all parameters where used when extracting them to
- /// build an Enpoint variant
- fn nonempty_message(&self) -> Option<&str> {
- if self.keyword.is_some() {
- Some("Keyword not used")
- } $(
- else if self.$name.is_some() {
- Some(concat!("'", $rest, "'"))
- }
- )* else {
- None
- }
- }
- }
- }
-}
-
// parameter name => struct field
generateQueryParameters! {
"continuation-token" => continuation_token,
diff --git a/src/api/s3_website.rs b/src/api/s3/website.rs
index b464dd45..561130dc 100644
--- a/src/api/s3_website.rs
+++ b/src/api/s3/website.rs
@@ -5,7 +5,7 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use crate::error::*;
-use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::*;
diff --git a/src/api/s3_xml.rs b/src/api/s3/xml.rs
index 75ec4559..75ec4559 100644
--- a/src/api/s3_xml.rs
+++ b/src/api/s3/xml.rs
diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs
index ebdee6da..5646f4fa 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/signature/mod.rs
@@ -42,6 +42,11 @@ pub fn signing_hmac(
Ok(hmac)
}
-pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String {
- format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,)
+pub fn compute_scope(datetime: &DateTime<Utc>, region: &str, service: &str) -> String {
+ format!(
+ "{}/{}/{}/aws4_request",
+ datetime.format(SHORT_DATE),
+ region,
+ service
+ )
}
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index 2a41b307..9137dd2d 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -11,14 +11,15 @@ use garage_util::data::Hash;
use garage_model::garage::Garage;
use garage_model::key_table::*;
-use super::signing_hmac;
-use super::{LONG_DATETIME, SHORT_DATE};
+use super::LONG_DATETIME;
+use super::{compute_scope, signing_hmac};
use crate::encoding::uri_encode;
use crate::error::*;
pub async fn check_payload_signature(
garage: &Garage,
+ service: &str,
request: &Request<Body>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
@@ -64,6 +65,7 @@ pub async fn check_payload_signature(
let key = verify_v4(
garage,
+ service,
&authorization.credential,
&authorization.date,
&authorization.signature,
@@ -281,6 +283,7 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
pub async fn verify_v4(
garage: &Garage,
+ service: &str,
credential: &str,
date: &DateTime<Utc>,
signature: &str,
@@ -288,11 +291,7 @@ pub async fn verify_v4(
) -> Result<Key, Error> {
let (key_id, scope) = parse_credential(credential)?;
- let scope_expected = format!(
- "{}/{}/s3/aws4_request",
- date.format(SHORT_DATE),
- garage.config.s3_api.s3_region
- );
+ let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service);
if scope != scope_expected {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
}
@@ -309,7 +308,7 @@ pub async fn verify_v4(
date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
- "s3",
+ service,
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(payload);
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index 969a45d6..ded9d993 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -1,19 +1,68 @@
use std::pin::Pin;
-use chrono::{DateTime, Utc};
+use chrono::{DateTime, NaiveDateTime, Utc};
use futures::prelude::*;
use futures::task;
+use garage_model::key_table::Key;
+use hmac::Mac;
use hyper::body::Bytes;
+use hyper::{Body, Request};
use garage_util::data::Hash;
-use hmac::Mac;
-use super::sha256sum;
-use super::HmacSha256;
-use super::LONG_DATETIME;
+use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
use crate::error::*;
+pub fn parse_streaming_body(
+ api_key: &Key,
+ req: Request<Body>,
+ content_sha256: &mut Option<Hash>,
+ region: &str,
+ service: &str,
+) -> Result<Request<Body>, Error> {
+ match req.headers().get("x-amz-content-sha256") {
+ Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
+ let signature = content_sha256
+ .take()
+ .ok_or_bad_request("No signature provided")?;
+
+ let secret_key = &api_key
+ .state
+ .as_option()
+ .ok_or_internal_error("Deleted key state")?
+ .secret_key;
+
+ let date = req
+ .headers()
+ .get("x-amz-date")
+ .ok_or_bad_request("Missing X-Amz-Date field")?
+ .to_str()?;
+ let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
+ .ok_or_bad_request("Invalid date")?;
+ let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
+
+ let scope = compute_scope(&date, region, service);
+ let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
+ .ok_or_internal_error("Unable to build signing HMAC")?;
+
+ Ok(req.map(move |body| {
+ Body::wrap_stream(
+ SignedPayloadStream::new(
+ body.map_err(Error::from),
+ signing_hmac,
+ date,
+ &scope,
+ signature,
+ )
+ .map_err(Error::from),
+ )
+ }))
+ }
+ _ => Ok(req),
+ }
+}
+
/// Result of `sha256("")`
const EMPTY_STRING_HEX_DIGEST: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
@@ -295,7 +344,7 @@ mod tests {
.with_timezone(&Utc);
let secret_key = "test";
let region = "test";
- let scope = crate::signature::compute_scope(&datetime, region);
+ let scope = crate::signature::compute_scope(&datetime, region, "s3");
let signing_hmac =
crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();