aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/api_server.rs401
-rw-r--r--src/api/s3/bucket.rs358
-rw-r--r--src/api/s3/copy.rs660
-rw-r--r--src/api/s3/cors.rs442
-rw-r--r--src/api/s3/delete.rs170
-rw-r--r--src/api/s3/get.rs461
-rw-r--r--src/api/s3/list.rs1337
-rw-r--r--src/api/s3/mod.rs14
-rw-r--r--src/api/s3/post_object.rs507
-rw-r--r--src/api/s3/put.rs753
-rw-r--r--src/api/s3/router.rs1080
-rw-r--r--src/api/s3/website.rs369
-rw-r--r--src/api/s3/xml.rs844
13 files changed, 7396 insertions, 0 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
new file mode 100644
index 00000000..78a69d53
--- /dev/null
+++ b/src/api/s3/api_server.rs
@@ -0,0 +1,401 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::header;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_table::util::*;
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+
+use crate::error::*;
+use crate::generic_server::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::s3::bucket::*;
+use crate::s3::copy::*;
+use crate::s3::cors::*;
+use crate::s3::delete::*;
+use crate::s3::get::*;
+use crate::s3::list::*;
+use crate::s3::post_object::handle_post_object;
+use crate::s3::put::*;
+use crate::s3::router::Endpoint;
+use crate::s3::website::*;
+
+pub struct S3ApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct S3ApiEndpoint {
+ bucket_name: Option<String>,
+ endpoint: Endpoint,
+}
+
+impl S3ApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let addr = garage.config.s3_api.api_bind_addr;
+
+ ApiServer::new(
+ garage.config.s3_api.s3_region.clone(),
+ S3ApiServer { garage },
+ )
+ .run_server(addr, shutdown_signal)
+ .await
+ }
+
+ async fn handle_request_without_bucket(
+ &self,
+ _req: Request<Body>,
+ api_key: Key,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ match endpoint {
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ }
+ }
+}
+
+#[async_trait]
+impl ApiHandler for S3ApiServer {
+ const API_NAME: &'static str = "s3";
+ const API_NAME_DISPLAY: &'static str = "S3";
+
+ type Endpoint = S3ApiEndpoint;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ let authority = req
+ .headers()
+ .get(header::HOST)
+ .ok_or_bad_request("Host header required")?
+ .to_str()?;
+
+ let host = authority_to_host(authority)?;
+
+ let bucket_name = self
+ .garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .and_then(|root_domain| host_to_bucket(&host, root_domain));
+
+ let (endpoint, bucket_name) =
+ Endpoint::from_request(req, bucket_name.map(ToOwned::to_owned))?;
+
+ Ok(S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: S3ApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // Some endpoints are processed early, before we even check for an API key
+ if let Endpoint::PostObject = endpoint {
+ return handle_post_object(garage, req, bucket_name.unwrap()).await;
+ }
+ if let Endpoint::Options = endpoint {
+ return handle_options_s3api(garage, &req, bucket_name).await;
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
+ let api_key = api_key.ok_or_else(|| {
+ Error::Forbidden("Garage does not support anonymous access yet".to_string())
+ })?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "s3",
+ )?;
+
+ let bucket_name = match bucket_name {
+ None => {
+ return self
+ .handle_request_without_bucket(req, api_key, endpoint)
+ .await
+ }
+ Some(bucket) => bucket.to_string(),
+ };
+
+ // Special code path for CreateBucket API endpoint
+ if let Endpoint::CreateBucket {} = endpoint {
+ return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
+ }
+
+ let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or(Error::NoSuchBucket)?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::Forbidden(
+ "Operation is not allowed for this key.".to_string(),
+ ));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::HeadObject {
+ key, part_number, ..
+ } => handle_head(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::GetObject {
+ key, part_number, ..
+ } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::UploadPart {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_put_part(
+ garage,
+ req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CopyObject { key } => {
+ handle_copy(garage, &api_key, &req, bucket_id, &key).await
+ }
+ Endpoint::UploadPartCopy {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_upload_part_copy(
+ garage,
+ &api_key,
+ &req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ )
+ .await
+ }
+ Endpoint::PutObject { key } => {
+ handle_put(garage, req, bucket_id, &key, content_sha256).await
+ }
+ Endpoint::AbortMultipartUpload { key, upload_id } => {
+ handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
+ }
+ Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
+ Endpoint::CreateMultipartUpload { key } => {
+ handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
+ }
+ Endpoint::CompleteMultipartUpload { key, upload_id } => {
+ handle_complete_multipart_upload(
+ garage,
+ req,
+ &bucket_name,
+ bucket_id,
+ &key,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CreateBucket {} => unreachable!(),
+ Endpoint::HeadBucket {} => {
+ let empty_body: Body = Body::from(vec![]);
+ let response = Response::builder().body(empty_body).unwrap();
+ Ok(response)
+ }
+ Endpoint::DeleteBucket {} => {
+ handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
+ }
+ Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
+ Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
+ Endpoint::ListObjects {
+ delimiter,
+ encoding_type,
+ marker,
+ max_keys,
+ prefix,
+ } => {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ is_v2: false,
+ marker,
+ continuation_token: None,
+ start_after: None,
+ },
+ )
+ .await
+ }
+ Endpoint::ListObjectsV2 {
+ delimiter,
+ encoding_type,
+ max_keys,
+ prefix,
+ continuation_token,
+ start_after,
+ list_type,
+ ..
+ } => {
+ if list_type == "2" {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ prefix: prefix.unwrap_or_default(),
+ },
+ is_v2: true,
+ marker: None,
+ continuation_token,
+ start_after,
+ },
+ )
+ .await
+ } else {
+ Err(Error::BadRequest(format!(
+ "Invalid endpoint: list-type={}",
+ list_type
+ )))
+ }
+ }
+ Endpoint::ListMultipartUploads {
+ delimiter,
+ encoding_type,
+ key_marker,
+ max_uploads,
+ prefix,
+ upload_id_marker,
+ } => {
+ handle_list_multipart_upload(
+ garage,
+ &ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ key_marker,
+ upload_id_marker,
+ },
+ )
+ .await
+ }
+ Endpoint::ListParts {
+ key,
+ max_parts,
+ part_number_marker,
+ upload_id,
+ } => {
+ handle_list_parts(
+ garage,
+ &ListPartsQuery {
+ bucket_name,
+ bucket_id,
+ key,
+ upload_id,
+ part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
+ max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ },
+ )
+ .await
+ }
+ Endpoint::DeleteObjects {} => {
+ handle_delete_objects(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
+ Endpoint::PutBucketWebsite {} => {
+ handle_put_website(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
+ Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
+ Endpoint::PutBucketCors {} => {
+ handle_put_cors(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for S3ApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new(
+ "bucket",
+ self.bucket_name.clone().unwrap_or_default(),
+ ));
+ }
+}
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
new file mode 100644
index 00000000..93048a8c
--- /dev/null
+++ b/src/api/s3/bucket.rs
@@ -0,0 +1,358 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::Bucket;
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+use garage_model::permission::BucketKeyPerm;
+use garage_model::s3::object_table::ObjectFilter;
+use garage_table::util::*;
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::time::*;
+
+use crate::error::*;
+use crate::s3::xml as s3_xml;
+use crate::signature::verify_signed_content;
+
+pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
+ let loc = s3_xml::LocationConstraint {
+ xmlns: (),
+ region: garage.config.s3_api.s3_region.to_string(),
+ };
+ let xml = s3_xml::to_xml_with_header(&loc)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
+ let versioning = s3_xml::VersioningConfiguration {
+ xmlns: (),
+ status: None,
+ };
+
+ let xml = s3_xml::to_xml_with_header(&versioning)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> {
+ let key_p = api_key.params().ok_or_internal_error(
+ "Key should not be in deleted state at this point (in handle_list_buckets)",
+ )?;
+
+ // Collect buckets user has access to
+ let ids = api_key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ .filter(|(_, perms)| perms.is_any())
+ .map(|(id, _)| *id)
+ .collect::<Vec<_>>();
+
+ let mut buckets_by_id = HashMap::new();
+ let mut aliases = HashMap::new();
+
+ for bucket_id in ids.iter() {
+ let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?;
+ if let Some(bucket) = bucket {
+ for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) {
+ let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?;
+ if let Some(alias_ent) = alias_opt {
+ if *alias_ent.state.get() == Some(*bucket_id) {
+ aliases.insert(alias_ent.name().to_string(), *bucket_id);
+ }
+ }
+ }
+ if let Deletable::Present(param) = bucket.state {
+ buckets_by_id.insert(bucket_id, param);
+ }
+ }
+ }
+
+ for (alias, _, id_opt) in key_p.local_aliases.items() {
+ if let Some(id) = id_opt {
+ aliases.insert(alias.clone(), *id);
+ }
+ }
+
+ // Generate response
+ let list_buckets = s3_xml::ListAllMyBucketsResult {
+ owner: s3_xml::Owner {
+ display_name: s3_xml::Value(key_p.name.get().to_string()),
+ id: s3_xml::Value(api_key.key_id.to_string()),
+ },
+ buckets: s3_xml::BucketList {
+ entries: aliases
+ .iter()
+ .filter_map(|(name, id)| buckets_by_id.get(id).map(|p| (name, id, p)))
+ .map(|(name, _id, param)| s3_xml::Bucket {
+ creation_date: s3_xml::Value(msec_to_rfc3339(param.creation_date)),
+ name: s3_xml::Value(name.to_string()),
+ })
+ .collect(),
+ },
+ };
+
+ let xml = s3_xml::to_xml_with_header(&list_buckets)?;
+ trace!("xml: {}", xml);
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml))?)
+}
+
+pub async fn handle_create_bucket(
+ garage: &Garage,
+ req: Request<Body>,
+ content_sha256: Option<Hash>,
+ api_key: Key,
+ bucket_name: String,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let cmd =
+ parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
+
+ if let Some(location_constraint) = cmd {
+ if location_constraint != garage.config.s3_api.s3_region {
+ return Err(Error::BadRequest(format!(
+ "Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`",
+ location_constraint,
+ garage.config.s3_api.s3_region
+ )));
+ }
+ }
+
+ let key_params = api_key
+ .params()
+ .ok_or_internal_error("Key should not be deleted at this point")?;
+
+ let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name)
+ {
+ Some(*bucket_id)
+ } else {
+ garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&bucket_name)
+ .await?
+ };
+
+ if let Some(bucket_id) = existing_bucket {
+ // Check we have write or owner permission on the bucket,
+ // in that case it's fine, return 200 OK, bucket exists;
+ // otherwise return a forbidden error.
+ let kp = api_key.bucket_permissions(&bucket_id);
+ if !(kp.allow_write || kp.allow_owner) {
+ return Err(Error::BucketAlreadyExists);
+ }
+ } else {
+ // Create the bucket!
+ if !is_valid_bucket_name(&bucket_name) {
+ return Err(Error::BadRequest(format!(
+ "{}: {}",
+ bucket_name, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ let bucket = Bucket::new();
+ garage.bucket_table.insert(&bucket).await?;
+
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS)
+ .await?;
+
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket.id, &api_key.key_id, &bucket_name)
+ .await?;
+ }
+
+ Ok(Response::builder()
+ .header("Location", format!("/{}", bucket_name))
+ .body(Body::empty())
+ .unwrap())
+}
+
+pub async fn handle_delete_bucket(
+ garage: &Garage,
+ bucket_id: Uuid,
+ bucket_name: String,
+ api_key: Key,
+) -> Result<Response<Body>, Error> {
+ let key_params = api_key
+ .params()
+ .ok_or_internal_error("Key should not be deleted at this point")?;
+
+ let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_)));
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+
+ // If the bucket has no other aliases, this is a true deletion.
+ // Otherwise, it is just an alias removal.
+
+ let has_other_global_aliases = bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(n, _, _)| is_local_alias || (*n != bucket_name));
+
+ let has_other_local_aliases = bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id);
+
+ if !has_other_global_aliases && !has_other_local_aliases {
+ // Delete bucket
+
+ // Check bucket is empty
+ let objects = garage
+ .object_table
+ .get_range(
+ &bucket_id,
+ None,
+ Some(ObjectFilter::IsData),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ if !objects.is_empty() {
+ return Err(Error::BucketNotEmpty);
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete bucket alias
+ if is_local_alias {
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
+ .await?;
+ } else {
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &bucket_name)
+ .await?;
+ }
+
+ // 2. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 3. delete bucket
+ bucket.state = Deletable::delete();
+ garage.bucket_table.insert(&bucket).await?;
+ } else if is_local_alias {
+ // Just unalias
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
+ .await?;
+ } else {
+ // Just unalias (but from global namespace)
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &bucket_name)
+ .await?;
+ }
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
+ // Returns None if invalid data
+ // Returns Some(None) if no location constraint is given
+ // Returns Some(Some("xxxx")) where xxxx is the given location constraint
+
+ let xml_str = std::str::from_utf8(xml_bytes).ok()?;
+ if xml_str.trim_matches(char::is_whitespace).is_empty() {
+ return Some(None);
+ }
+
+ let xml = roxmltree::Document::parse(xml_str).ok()?;
+
+ let cbc = xml.root().first_child()?;
+ if !cbc.has_tag_name("CreateBucketConfiguration") {
+ return None;
+ }
+
+ let mut ret = None;
+ for item in cbc.children() {
+ println!("{:?}", item);
+ if item.has_tag_name("LocationConstraint") {
+ if ret != None {
+ return None;
+ }
+ ret = Some(item.text()?.to_string());
+ } else if !item.is_text() {
+ return None;
+ }
+ }
+
+ Some(ret)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn create_bucket() {
+ assert_eq!(parse_create_bucket_xml(br#""#), Some(None));
+ assert_eq!(
+ parse_create_bucket_xml(
+ br#"
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ </CreateBucketConfiguration >
+ "#
+ ),
+ Some(None)
+ );
+ assert_eq!(
+ parse_create_bucket_xml(
+ br#"
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <LocationConstraint>Europe</LocationConstraint>
+ </CreateBucketConfiguration >
+ "#
+ ),
+ Some(Some("Europe".into()))
+ );
+ assert_eq!(
+ parse_create_bucket_xml(
+ br#"
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ </Crea >
+ "#
+ ),
+ None
+ );
+ }
+}
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
new file mode 100644
index 00000000..4e94d887
--- /dev/null
+++ b/src/api/s3/copy.rs
@@ -0,0 +1,660 @@
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
+use md5::{Digest as Md5Digest, Md5};
+
+use hyper::{Body, Request, Response};
+use serde::Serialize;
+
+use garage_table::*;
+use garage_util::data::*;
+use garage_util::time::*;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::error::*;
+use crate::helpers::{parse_bucket_key, resolve_bucket};
+use crate::s3::put::{decode_upload_id, get_headers};
+use crate::s3::xml::{self as s3_xml, xmlns_tag};
+
+pub async fn handle_copy(
+ garage: Arc<Garage>,
+ api_key: &Key,
+ req: &Request<Body>,
+ dest_bucket_id: Uuid,
+ dest_key: &str,
+) -> Result<Response<Body>, Error> {
+ let copy_precondition = CopyPreconditionHeaders::parse(req)?;
+
+ let source_object = get_copy_source(&garage, api_key, req).await?;
+
+ let (source_version, source_version_data, source_version_meta) =
+ extract_source_info(&source_object)?;
+
+ // Check precondition, e.g. x-amz-copy-source-if-match
+ copy_precondition.check(source_version, &source_version_meta.etag)?;
+
+ // Generate parameters for copied object
+ let new_uuid = gen_uuid();
+ let new_timestamp = now_msec();
+
+ // Implement x-amz-metadata-directive: REPLACE
+ let new_meta = match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
+ headers: get_headers(req.headers())?,
+ size: source_version_meta.size,
+ etag: source_version_meta.etag.clone(),
+ },
+ _ => source_version_meta.clone(),
+ };
+
+ let etag = new_meta.etag.to_string();
+
+ // Save object copy
+ match source_version_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ new_meta,
+ bytes.clone(),
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket_id,
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
+ garage.object_table.insert(&dest_object).await?;
+ }
+ ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
+ // Get block list from source version
+ let source_version = garage
+ .version_table
+ .get(&source_version.uuid, &EmptyKey)
+ .await?;
+ let source_version = source_version.ok_or(Error::NoSuchKey)?;
+
+ // Write an "uploading" marker in Object table
+ // This holds a reference to the object in the Version table
+ // so that it won't be deleted, e.g. by repair_versions.
+ let tmp_dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Uploading(new_meta.headers.clone()),
+ };
+ let tmp_dest_object = Object::new(
+ dest_bucket_id,
+ dest_key.to_string(),
+ vec![tmp_dest_object_version],
+ );
+ garage.object_table.insert(&tmp_dest_object).await?;
+
+ // Write version in the version table. Even with empty block list,
+ // this means that the BlockRef entries linked to this version cannot be
+ // marked as deleted (they are marked as deleted only if the Version
+ // doesn't exist or is marked as deleted).
+ let mut dest_version =
+ Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false);
+ garage.version_table.insert(&dest_version).await?;
+
+ // Fill in block list for version and insert block refs
+ for (bk, bv) in source_version.blocks.items().iter() {
+ dest_version.blocks.put(*bk, *bv);
+ }
+ let dest_block_refs = dest_version
+ .blocks
+ .items()
+ .iter()
+ .map(|b| BlockRef {
+ block: b.1.hash,
+ version: new_uuid,
+ deleted: false.into(),
+ })
+ .collect::<Vec<_>>();
+ futures::try_join!(
+ garage.version_table.insert(&dest_version),
+ garage.block_ref_table.insert_many(&dest_block_refs[..]),
+ )?;
+
+ // Insert final object
+ // We do this last because otherwise there is a race condition in the case where
+ // the copy call has the same source and destination (this happens, rclone does
+ // it to update the modification timestamp for instance). If we did this concurrently
+ // with the stuff before, the block's reference counts could be decremented before
+ // they are incremented again for the new version, leading to data being deleted.
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ new_meta,
+ *first_block_hash,
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket_id,
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
+ garage.object_table.insert(&dest_object).await?;
+ }
+ }
+
+ let last_modified = msec_to_rfc3339(new_timestamp);
+ let result = CopyObjectResult {
+ last_modified: s3_xml::Value(last_modified),
+ etag: s3_xml::Value(format!("\"{}\"", etag)),
+ };
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .header("x-amz-version-id", hex::encode(new_uuid))
+ .header(
+ "x-amz-copy-source-version-id",
+ hex::encode(source_version.uuid),
+ )
+ .body(Body::from(xml))?)
+}
+
+pub async fn handle_upload_part_copy(
+ garage: Arc<Garage>,
+ api_key: &Key,
+ req: &Request<Body>,
+ dest_bucket_id: Uuid,
+ dest_key: &str,
+ part_number: u64,
+ upload_id: &str,
+) -> Result<Response<Body>, Error> {
+ let copy_precondition = CopyPreconditionHeaders::parse(req)?;
+
+ let dest_version_uuid = decode_upload_id(upload_id)?;
+
+ let dest_key = dest_key.to_string();
+ let (source_object, dest_object) = futures::try_join!(
+ get_copy_source(&garage, api_key, req),
+ garage
+ .object_table
+ .get(&dest_bucket_id, &dest_key)
+ .map_err(Error::from),
+ )?;
+ let dest_object = dest_object.ok_or(Error::NoSuchKey)?;
+
+ let (source_object_version, source_version_data, source_version_meta) =
+ extract_source_info(&source_object)?;
+
+ // Check precondition on source, e.g. x-amz-copy-source-if-match
+ copy_precondition.check(source_object_version, &source_version_meta.etag)?;
+
+ // Check source range is valid
+ let source_range = match req.headers().get("x-amz-copy-source-range") {
+ Some(range) => {
+ let range_str = range.to_str()?;
+ let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size)
+ .map_err(|e| (e, source_version_meta.size))?;
+ if ranges.len() != 1 {
+ return Err(Error::BadRequest(
+ "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(),
+ ));
+ } else {
+ ranges.pop().unwrap()
+ }
+ }
+ None => http_range::HttpRange {
+ start: 0,
+ length: source_version_meta.size,
+ },
+ };
+
+ // Check destination version is indeed in uploading state
+ if !dest_object
+ .versions()
+ .iter()
+ .any(|v| v.uuid == dest_version_uuid && v.is_uploading())
+ {
+ return Err(Error::NoSuchUpload);
+ }
+
+ // Check source version is not inlined
+ match source_version_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, _bytes) => {
+ // This is only for small files, we don't bother handling this.
+ // (in AWS UploadPartCopy works for parts at least 5MB which
+ // is never the case of an inline object)
+ return Err(Error::BadRequest(
+ "Source object is too small (minimum part size is 5Mb)".into(),
+ ));
+ }
+ ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
+ };
+
+ // Fetch source versin with its block list,
+ // and destination version to check part hasn't yet been uploaded
+ let (source_version, dest_version) = futures::try_join!(
+ garage
+ .version_table
+ .get(&source_object_version.uuid, &EmptyKey),
+ garage.version_table.get(&dest_version_uuid, &EmptyKey),
+ )?;
+ let source_version = source_version.ok_or(Error::NoSuchKey)?;
+
+ // Check this part number hasn't yet been uploaded
+ if let Some(dv) = dest_version {
+ if dv.has_part_number(part_number) {
+ return Err(Error::BadRequest(format!(
+ "Part number {} has already been uploaded",
+ part_number
+ )));
+ }
+ }
+
+ // We want to reuse blocks from the source version as much as possible.
+ // However, we still need to get the data from these blocks
+ // because we need to know it to calculate the MD5sum of the part
+ // which is used as its ETag.
+
+ // First, calculate what blocks we want to keep,
+ // and the subrange of the block to take, if the bounds of the
+ // requested range are in the middle.
+ let (range_begin, range_end) = (source_range.start, source_range.start + source_range.length);
+
+ let mut blocks_to_copy = vec![];
+ let mut current_offset = 0;
+ for (_bk, block) in source_version.blocks.items().iter() {
+ let (block_begin, block_end) = (current_offset, current_offset + block.size);
+
+ if block_begin < range_end && block_end > range_begin {
+ let subrange_begin = if block_begin < range_begin {
+ Some(range_begin - block_begin)
+ } else {
+ None
+ };
+ let subrange_end = if block_end > range_end {
+ Some(range_end - block_begin)
+ } else {
+ None
+ };
+ let range_to_copy = match (subrange_begin, subrange_end) {
+ (Some(b), Some(e)) => Some(b as usize..e as usize),
+ (None, Some(e)) => Some(0..e as usize),
+ (Some(b), None) => Some(b as usize..block.size as usize),
+ (None, None) => None,
+ };
+
+ blocks_to_copy.push((block.hash, range_to_copy));
+ }
+
+ current_offset = block_end;
+ }
+
+ // Now, actually copy the blocks
+ let mut md5hasher = Md5::new();
+
+ // First, create a stream that is able to read the source blocks
+ // and extract the subrange if necessary.
+ // The second returned value is an Option<Hash>, that is Some
+ // if and only if the block returned is a block that already existed
+ // in the Garage data store (thus we don't need to save it again).
+ let garage2 = garage.clone();
+ let source_blocks = stream::iter(blocks_to_copy)
+ .flat_map(|(block_hash, range_to_copy)| {
+ let garage3 = garage2.clone();
+ stream::once(async move {
+ let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ match range_to_copy {
+ Some(r) => Ok((data[r].to_vec(), None)),
+ None => Ok((data, Some(block_hash))),
+ }
+ })
+ })
+ .peekable();
+
+ // The defragmenter is a custom stream (defined below) that concatenates
+ // consecutive block parts when they are too small.
+ // It returns a series of (Vec<u8>, Option<Hash>).
+ // When it is done, it returns an empty vec.
+ // Same as the previous iterator, the Option is Some(_) if and only if
+ // it's an existing block of the Garage data store.
+ let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
+
+ let mut current_offset = 0;
+ let mut next_block = defragmenter.next().await?;
+
+ loop {
+ let (data, existing_block_hash) = next_block;
+ if data.is_empty() {
+ break;
+ }
+
+ md5hasher.update(&data[..]);
+
+ let must_upload = existing_block_hash.is_none();
+ let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
+
+ let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
+ version.blocks.put(
+ VersionBlockKey {
+ part_number,
+ offset: current_offset,
+ },
+ VersionBlock {
+ hash: final_hash,
+ size: data.len() as u64,
+ },
+ );
+ current_offset += data.len() as u64;
+
+ let block_ref = BlockRef {
+ block: final_hash,
+ version: dest_version_uuid,
+ deleted: false.into(),
+ };
+
+ let garage2 = garage.clone();
+ let res = futures::try_join!(
+ // Thing 1: if the block is not exactly a block that existed before,
+ // we need to insert that data as a new block.
+ async move {
+ if must_upload {
+ garage2.block_manager.rpc_put_block(final_hash, data).await
+ } else {
+ Ok(())
+ }
+ },
+ // Thing 2: we need to insert the block in the version
+ garage.version_table.insert(&version),
+ // Thing 3: we need to add a block reference
+ garage.block_ref_table.insert(&block_ref),
+ // Thing 4: we need to prefetch the next block
+ defragmenter.next(),
+ )?;
+ next_block = res.3;
+ }
+
+ let data_md5sum = md5hasher.finalize();
+ let etag = hex::encode(data_md5sum);
+
+ // Put the part's ETag in the Versiontable
+ let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
+ version.parts_etags.put(part_number, etag.clone());
+ garage.version_table.insert(&version).await?;
+
+ // LGTM
+ let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult {
+ xmlns: (),
+ etag: s3_xml::Value(format!("\"{}\"", etag)),
+ last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)),
+ })?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .header(
+ "x-amz-copy-source-version-id",
+ hex::encode(source_object_version.uuid),
+ )
+ .body(Body::from(resp_xml))?)
+}
+
+async fn get_copy_source(
+ garage: &Garage,
+ api_key: &Key,
+ req: &Request<Body>,
+) -> Result<Object, Error> {
+ let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
+ let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
+
+ let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
+ let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?;
+
+ if !api_key.allow_read(&source_bucket_id) {
+ return Err(Error::Forbidden(format!(
+ "Reading from bucket {} not allowed for this key",
+ source_bucket
+ )));
+ }
+
+ let source_key = source_key.ok_or_bad_request("No source key specified")?;
+
+ let source_object = garage
+ .object_table
+ .get(&source_bucket_id, &source_key.to_string())
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ Ok(source_object)
+}
+
+fn extract_source_info(
+ source_object: &Object,
+) -> Result<(&ObjectVersion, &ObjectVersionData, &ObjectVersionMeta), Error> {
+ let source_version = source_object
+ .versions()
+ .iter()
+ .rev()
+ .find(|v| v.is_complete())
+ .ok_or(Error::NoSuchKey)?;
+
+ let source_version_data = match &source_version.state {
+ ObjectVersionState::Complete(x) => x,
+ _ => unreachable!(),
+ };
+
+ let source_version_meta = match source_version_data {
+ ObjectVersionData::DeleteMarker => {
+ return Err(Error::NoSuchKey);
+ }
+ ObjectVersionData::Inline(meta, _bytes) => meta,
+ ObjectVersionData::FirstBlock(meta, _fbh) => meta,
+ };
+
+ Ok((source_version, source_version_data, source_version_meta))
+}
+
+struct CopyPreconditionHeaders {
+ copy_source_if_match: Option<Vec<String>>,
+ copy_source_if_modified_since: Option<SystemTime>,
+ copy_source_if_none_match: Option<Vec<String>>,
+ copy_source_if_unmodified_since: Option<SystemTime>,
+}
+
+impl CopyPreconditionHeaders {
+ fn parse(req: &Request<Body>) -> Result<Self, Error> {
+ Ok(Self {
+ copy_source_if_match: req
+ .headers()
+ .get("x-amz-copy-source-if-match")
+ .map(|x| x.to_str())
+ .transpose()?
+ .map(|x| {
+ x.split(',')
+ .map(|m| m.trim().trim_matches('"').to_string())
+ .collect::<Vec<_>>()
+ }),
+ copy_source_if_modified_since: req
+ .headers()
+ .get("x-amz-copy-source-if-modified-since")
+ .map(|x| x.to_str())
+ .transpose()?
+ .map(httpdate::parse_http_date)
+ .transpose()
+ .ok_or_bad_request("Invalid date in x-amz-copy-source-if-modified-since")?,
+ copy_source_if_none_match: req
+ .headers()
+ .get("x-amz-copy-source-if-none-match")
+ .map(|x| x.to_str())
+ .transpose()?
+ .map(|x| {
+ x.split(',')
+ .map(|m| m.trim().trim_matches('"').to_string())
+ .collect::<Vec<_>>()
+ }),
+ copy_source_if_unmodified_since: req
+ .headers()
+ .get("x-amz-copy-source-if-unmodified-since")
+ .map(|x| x.to_str())
+ .transpose()?
+ .map(httpdate::parse_http_date)
+ .transpose()
+ .ok_or_bad_request("Invalid date in x-amz-copy-source-if-unmodified-since")?,
+ })
+ }
+
+ fn check(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> {
+ let v_date = UNIX_EPOCH + Duration::from_millis(v.timestamp);
+
+ let ok = match (
+ &self.copy_source_if_match,
+ &self.copy_source_if_unmodified_since,
+ &self.copy_source_if_none_match,
+ &self.copy_source_if_modified_since,
+ ) {
+ // TODO I'm not sure all of the conditions are evaluated correctly here
+
+ // If we have both if-match and if-unmodified-since,
+ // basically we don't care about if-unmodified-since,
+ // because in the spec it says that if if-match evaluates to
+ // true but if-unmodified-since evaluates to false,
+ // the copy is still done.
+ (Some(im), _, None, None) => im.iter().any(|x| x == etag || x == "*"),
+ (None, Some(ius), None, None) => v_date <= *ius,
+
+ // If we have both if-none-match and if-modified-since,
+ // then both of the two conditions must evaluate to true
+ (None, None, Some(inm), Some(ims)) => {
+ !inm.iter().any(|x| x == etag || x == "*") && v_date > *ims
+ }
+ (None, None, Some(inm), None) => !inm.iter().any(|x| x == etag || x == "*"),
+ (None, None, None, Some(ims)) => v_date > *ims,
+ (None, None, None, None) => true,
+ _ => {
+ return Err(Error::BadRequest(
+ "Invalid combination of x-amz-copy-source-if-xxxxx headers".into(),
+ ))
+ }
+ };
+
+ if ok {
+ Ok(())
+ } else {
+ Err(Error::PreconditionFailed)
+ }
+ }
+}
+
+type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
+
+struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
+ block_size: usize,
+ block_stream: Pin<Box<stream::Peekable<S>>>,
+ buffer: Vec<u8>,
+ hash: Option<Hash>,
+}
+
+impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
+ fn new(block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>) -> Self {
+ Self {
+ block_size,
+ block_stream,
+ buffer: vec![],
+ hash: None,
+ }
+ }
+
+ async fn next(&mut self) -> BlockStreamItem {
+ // Fill buffer while we can
+ while let Some(res) = self.block_stream.as_mut().peek().await {
+ let (peeked_next_block, _) = match res {
+ Ok(t) => t,
+ Err(_) => {
+ self.block_stream.next().await.unwrap()?;
+ unreachable!()
+ }
+ };
+
+ if self.buffer.is_empty() {
+ let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
+ self.buffer = next_block;
+ self.hash = next_block_hash;
+ } else if self.buffer.len() + peeked_next_block.len() > self.block_size {
+ break;
+ } else {
+ let (next_block, _) = self.block_stream.next().await.unwrap()?;
+ self.buffer.extend(next_block);
+ self.hash = None;
+ }
+ }
+
+ Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ }
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct CopyObjectResult {
+ #[serde(rename = "LastModified")]
+ pub last_modified: s3_xml::Value,
+ #[serde(rename = "ETag")]
+ pub etag: s3_xml::Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct CopyPartResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "LastModified")]
+ pub last_modified: s3_xml::Value,
+ #[serde(rename = "ETag")]
+ pub etag: s3_xml::Value,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::s3::xml::to_xml_with_header;
+
+ #[test]
+ fn copy_object_result() -> Result<(), Error> {
+ let copy_result = CopyObjectResult {
+ last_modified: s3_xml::Value(msec_to_rfc3339(0)),
+ etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".to_string()),
+ };
+ assert_eq!(
+ to_xml_with_header(&copy_result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<CopyObjectResult>\
+ <LastModified>1970-01-01T00:00:00.000Z</LastModified>\
+ <ETag>&quot;9b2cf535f27731c974343645a3985328&quot;</ETag>\
+</CopyObjectResult>\
+ "
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn serialize_copy_part_result() -> Result<(), Error> {
+ let expected_retval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<CopyPartResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <LastModified>2011-04-11T20:34:56.000Z</LastModified>\
+ <ETag>&quot;9b2cf535f27731c974343645a3985328&quot;</ETag>\
+</CopyPartResult>";
+ let v = CopyPartResult {
+ xmlns: (),
+ last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()),
+ etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()),
+ };
+ println!("{}", to_xml_with_header(&v)?);
+
+ assert_eq!(to_xml_with_header(&v)?, expected_retval);
+
+ Ok(())
+ }
+}
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
new file mode 100644
index 00000000..37ea2e43
--- /dev/null
+++ b/src/api/s3/cors.rs
@@ -0,0 +1,442 @@
+use quick_xml::de::from_reader;
+use std::sync::Arc;
+
+use http::header::{
+ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
+ ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
+};
+use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
+
+use serde::{Deserialize, Serialize};
+
+use crate::error::*;
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::signature::verify_signed_content;
+
+use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
+use garage_model::garage::Garage;
+use garage_table::*;
+use garage_util::data::*;
+
+pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
+ let param = bucket
+ .params()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ if let Some(cors) = param.cors_config.get() {
+ let wc = CorsConfiguration {
+ xmlns: (),
+ cors_rules: cors
+ .iter()
+ .map(CorsRule::from_garage_cors_rule)
+ .collect::<Vec<_>>(),
+ };
+ let xml = to_xml_with_header(&wc)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, "application/xml")
+ .body(Body::from(xml))?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+ }
+}
+
+pub async fn handle_delete_cors(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+) -> Result<Response<Body>, Error> {
+ let mut bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .ok_or(Error::NoSuchBucket)?;
+
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ param.cors_config.update(None);
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_put_cors(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let mut bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .ok_or(Error::NoSuchBucket)?;
+
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ let conf: CorsConfiguration = from_reader(&body as &[u8])?;
+ conf.validate()?;
+
+ param
+ .cors_config
+ .update(Some(conf.into_garage_cors_config()?));
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_options_s3api(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_name: Option<String>,
+) -> Result<Response<Body>, Error> {
+ // FIXME: CORS rules of buckets with local aliases are
+ // not taken into account.
+
+ // If the bucket name is a global bucket name,
+ // we try to apply the CORS rules of that bucket.
+ // If a user has a local bucket name that has
+ // the same name, its CORS rules won't be applied
+ // and will be shadowed by the rules of the globally
+ // existing bucket (but this is inevitable because
+ // OPTIONS calls are not auhtenticated).
+ if let Some(bn) = bucket_name {
+ let helper = garage.bucket_helper();
+ let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
+ if let Some(id) = bucket_id {
+ let bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or(Error::NoSuchBucket)?;
+ handle_options_for_bucket(req, &bucket)
+ } else {
+ // If there is a bucket name in the request, but that name
+ // does not correspond to a global alias for a bucket,
+ // then it's either a non-existing bucket or a local bucket.
+ // We have no way of knowing, because the request is not
+ // authenticated and thus we can't resolve local aliases.
+ // We take the permissive approach of allowing everything,
+ // because we don't want to prevent web apps that use
+ // local bucket names from making API calls.
+ Ok(Response::builder()
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "*")
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+ }
+ } else {
+ // If there is no bucket name in the request,
+ // we are doing a ListBuckets call, which we want to allow
+ // for all origins.
+ Ok(Response::builder()
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+ }
+}
+
+pub fn handle_options_for_bucket(
+ req: &Request<Body>,
+ bucket: &Bucket,
+) -> Result<Response<Body>, Error> {
+ let origin = req
+ .headers()
+ .get("Origin")
+ .ok_or_bad_request("Missing Origin header")?
+ .to_str()?;
+ let request_method = req
+ .headers()
+ .get(ACCESS_CONTROL_REQUEST_METHOD)
+ .ok_or_bad_request("Missing Access-Control-Request-Method header")?
+ .to_str()?;
+ let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
+ Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
+ None => vec![],
+ };
+
+ if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
+ let matching_rule = cors_config
+ .iter()
+ .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
+ if let Some(rule) = matching_rule {
+ let mut resp = Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?;
+ add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
+ return Ok(resp);
+ }
+ }
+
+ Err(Error::Forbidden("This CORS request is not allowed.".into()))
+}
+
+pub fn find_matching_cors_rule<'a>(
+ bucket: &'a Bucket,
+ req: &Request<Body>,
+) -> Result<Option<&'a GarageCorsRule>, Error> {
+ if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
+ if let Some(origin) = req.headers().get("Origin") {
+ let origin = origin.to_str()?;
+ let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
+ Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
+ None => vec![],
+ };
+ return Ok(cors_config.iter().find(|rule| {
+ cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
+ }));
+ }
+ }
+ Ok(None)
+}
+
+fn cors_rule_matches<'a, HI, S>(
+ rule: &GarageCorsRule,
+ origin: &'a str,
+ method: &'a str,
+ mut request_headers: HI,
+) -> bool
+where
+ HI: Iterator<Item = S>,
+ S: AsRef<str>,
+{
+ rule.allow_origins.iter().any(|x| x == "*" || x == origin)
+ && rule.allow_methods.iter().any(|x| x == "*" || x == method)
+ && request_headers.all(|h| {
+ rule.allow_headers
+ .iter()
+ .any(|x| x == "*" || x == h.as_ref())
+ })
+}
+
+pub fn add_cors_headers(
+ resp: &mut Response<Body>,
+ rule: &GarageCorsRule,
+) -> Result<(), http::header::InvalidHeaderValue> {
+ let h = resp.headers_mut();
+ h.insert(
+ ACCESS_CONTROL_ALLOW_ORIGIN,
+ rule.allow_origins.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_ALLOW_METHODS,
+ rule.allow_methods.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_ALLOW_HEADERS,
+ rule.allow_headers.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_EXPOSE_HEADERS,
+ rule.expose_headers.join(", ").parse()?,
+ );
+ Ok(())
+}
+
+// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+#[serde(rename = "CORSConfiguration")]
+pub struct CorsConfiguration {
+ #[serde(serialize_with = "xmlns_tag", skip_deserializing)]
+ pub xmlns: (),
+ #[serde(rename = "CORSRule")]
+ pub cors_rules: Vec<CorsRule>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct CorsRule {
+ #[serde(rename = "ID")]
+ pub id: Option<Value>,
+ #[serde(rename = "MaxAgeSeconds")]
+ pub max_age_seconds: Option<IntValue>,
+ #[serde(rename = "AllowedOrigin")]
+ pub allowed_origins: Vec<Value>,
+ #[serde(rename = "AllowedMethod")]
+ pub allowed_methods: Vec<Value>,
+ #[serde(rename = "AllowedHeader", default)]
+ pub allowed_headers: Vec<Value>,
+ #[serde(rename = "ExposeHeader", default)]
+ pub expose_headers: Vec<Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct AllowedMethod {
+ #[serde(rename = "AllowedMethod")]
+ pub allowed_method: Value,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct AllowedHeader {
+ #[serde(rename = "AllowedHeader")]
+ pub allowed_header: Value,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct ExposeHeader {
+ #[serde(rename = "ExposeHeader")]
+ pub expose_header: Value,
+}
+
+impl CorsConfiguration {
+ pub fn validate(&self) -> Result<(), Error> {
+ for r in self.cors_rules.iter() {
+ r.validate()?;
+ }
+ Ok(())
+ }
+
+ pub fn into_garage_cors_config(self) -> Result<Vec<GarageCorsRule>, Error> {
+ Ok(self
+ .cors_rules
+ .iter()
+ .map(CorsRule::to_garage_cors_rule)
+ .collect())
+ }
+}
+
+impl CorsRule {
+ pub fn validate(&self) -> Result<(), Error> {
+ for method in self.allowed_methods.iter() {
+ method
+ .0
+ .parse::<Method>()
+ .ok_or_bad_request("Invalid CORSRule method")?;
+ }
+ for header in self
+ .allowed_headers
+ .iter()
+ .chain(self.expose_headers.iter())
+ {
+ header
+ .0
+ .parse::<HeaderName>()
+ .ok_or_bad_request("Invalid HTTP header name")?;
+ }
+ Ok(())
+ }
+
+ pub fn to_garage_cors_rule(&self) -> GarageCorsRule {
+ let convert_vec =
+ |vval: &[Value]| vval.iter().map(|x| x.0.to_owned()).collect::<Vec<String>>();
+ GarageCorsRule {
+ id: self.id.as_ref().map(|x| x.0.to_owned()),
+ max_age_seconds: self.max_age_seconds.as_ref().map(|x| x.0 as u64),
+ allow_origins: convert_vec(&self.allowed_origins),
+ allow_methods: convert_vec(&self.allowed_methods),
+ allow_headers: convert_vec(&self.allowed_headers),
+ expose_headers: convert_vec(&self.expose_headers),
+ }
+ }
+
+ pub fn from_garage_cors_rule(rule: &GarageCorsRule) -> Self {
+ let convert_vec = |vval: &[String]| {
+ vval.iter()
+ .map(|x| Value(x.clone()))
+ .collect::<Vec<Value>>()
+ };
+ Self {
+ id: rule.id.as_ref().map(|x| Value(x.clone())),
+ max_age_seconds: rule.max_age_seconds.map(|x| IntValue(x as i64)),
+ allowed_origins: convert_vec(&rule.allow_origins),
+ allowed_methods: convert_vec(&rule.allow_methods),
+ allowed_headers: convert_vec(&rule.allow_headers),
+ expose_headers: convert_vec(&rule.expose_headers),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use quick_xml::de::from_str;
+
+ #[test]
+ fn test_deserialize() -> Result<(), Error> {
+ let message = r#"<?xml version="1.0" encoding="UTF-8"?>
+<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <CORSRule>
+ <AllowedOrigin>http://www.example.com</AllowedOrigin>
+
+ <AllowedMethod>PUT</AllowedMethod>
+ <AllowedMethod>POST</AllowedMethod>
+ <AllowedMethod>DELETE</AllowedMethod>
+
+ <AllowedHeader>*</AllowedHeader>
+ </CORSRule>
+ <CORSRule>
+ <AllowedOrigin>*</AllowedOrigin>
+ <AllowedMethod>GET</AllowedMethod>
+ </CORSRule>
+ <CORSRule>
+ <ID>qsdfjklm</ID>
+ <MaxAgeSeconds>12345</MaxAgeSeconds>
+ <AllowedOrigin>https://perdu.com</AllowedOrigin>
+
+ <AllowedMethod>GET</AllowedMethod>
+ <AllowedMethod>DELETE</AllowedMethod>
+ <AllowedHeader>*</AllowedHeader>
+ <ExposeHeader>*</ExposeHeader>
+ </CORSRule>
+</CORSConfiguration>"#;
+ let conf: CorsConfiguration = from_str(message).unwrap();
+ let ref_value = CorsConfiguration {
+ xmlns: (),
+ cors_rules: vec![
+ CorsRule {
+ id: None,
+ max_age_seconds: None,
+ allowed_origins: vec!["http://www.example.com".into()],
+ allowed_methods: vec!["PUT".into(), "POST".into(), "DELETE".into()],
+ allowed_headers: vec!["*".into()],
+ expose_headers: vec![],
+ },
+ CorsRule {
+ id: None,
+ max_age_seconds: None,
+ allowed_origins: vec!["*".into()],
+ allowed_methods: vec!["GET".into()],
+ allowed_headers: vec![],
+ expose_headers: vec![],
+ },
+ CorsRule {
+ id: Some("qsdfjklm".into()),
+ max_age_seconds: Some(IntValue(12345)),
+ allowed_origins: vec!["https://perdu.com".into()],
+ allowed_methods: vec!["GET".into(), "DELETE".into()],
+ allowed_headers: vec!["*".into()],
+ expose_headers: vec!["*".into()],
+ },
+ ],
+ };
+ assert_eq! {
+ ref_value,
+ conf
+ };
+
+ let message2 = to_xml_with_header(&ref_value)?;
+
+ let cleanup = |c: &str| c.replace(char::is_whitespace, "");
+ assert_eq!(cleanup(message), cleanup(&message2));
+
+ Ok(())
+ }
+}
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
new file mode 100644
index 00000000..1e3f1249
--- /dev/null
+++ b/src/api/s3/delete.rs
@@ -0,0 +1,170 @@
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_util::data::*;
+use garage_util::time::*;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
+
+use crate::error::*;
+use crate::s3::xml as s3_xml;
+use crate::signature::verify_signed_content;
+
+async fn handle_delete_internal(
+ garage: &Garage,
+ bucket_id: Uuid,
+ key: &str,
+) -> Result<(Uuid, Uuid), Error> {
+ let object = garage
+ .object_table
+ .get(&bucket_id, &key.to_string())
+ .await?
+ .ok_or(Error::NoSuchKey)?; // No need to delete
+
+ let interesting_versions = object.versions().iter().filter(|v| {
+ !matches!(
+ v.state,
+ ObjectVersionState::Aborted
+ | ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
+ )
+ });
+
+ let mut version_to_delete = None;
+ let mut timestamp = now_msec();
+ for v in interesting_versions {
+ if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
+ version_to_delete = Some(v.uuid);
+ }
+ timestamp = std::cmp::max(timestamp, v.timestamp + 1);
+ }
+
+ let deleted_version = version_to_delete.ok_or(Error::NoSuchKey)?;
+
+ let version_uuid = gen_uuid();
+
+ let object = Object::new(
+ bucket_id,
+ key.into(),
+ vec![ObjectVersion {
+ uuid: version_uuid,
+ timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+
+ garage.object_table.insert(&object).await?;
+
+ Ok((deleted_version, version_uuid))
+}
+
+pub async fn handle_delete(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ key: &str,
+) -> Result<Response<Body>, Error> {
+ let (_deleted_version, delete_marker_version) =
+ handle_delete_internal(&garage, bucket_id, key).await?;
+
+ Ok(Response::builder()
+ .header("x-amz-version-id", hex::encode(delete_marker_version))
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::from(vec![]))
+ .unwrap())
+}
+
+pub async fn handle_delete_objects(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
+ let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
+
+ let mut ret_deleted = Vec::new();
+ let mut ret_errors = Vec::new();
+
+ for obj in cmd.objects.iter() {
+ match handle_delete_internal(&garage, bucket_id, &obj.key).await {
+ Ok((deleted_version, delete_marker_version)) => {
+ if cmd.quiet {
+ continue;
+ }
+ ret_deleted.push(s3_xml::Deleted {
+ key: s3_xml::Value(obj.key.clone()),
+ version_id: s3_xml::Value(hex::encode(deleted_version)),
+ delete_marker_version_id: s3_xml::Value(hex::encode(delete_marker_version)),
+ });
+ }
+ Err(e) => {
+ ret_errors.push(s3_xml::DeleteError {
+ code: s3_xml::Value(e.aws_code().to_string()),
+ key: Some(s3_xml::Value(obj.key.clone())),
+ message: s3_xml::Value(format!("{}", e)),
+ version_id: None,
+ });
+ }
+ }
+ }
+
+ let xml = s3_xml::to_xml_with_header(&s3_xml::DeleteResult {
+ xmlns: (),
+ deleted: ret_deleted,
+ errors: ret_errors,
+ })?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml))?)
+}
+
+struct DeleteRequest {
+ quiet: bool,
+ objects: Vec<DeleteObject>,
+}
+
+struct DeleteObject {
+ key: String,
+}
+
+fn parse_delete_objects_xml(xml: &roxmltree::Document) -> Option<DeleteRequest> {
+ let mut ret = DeleteRequest {
+ quiet: false,
+ objects: vec![],
+ };
+
+ let root = xml.root();
+ let delete = root.first_child()?;
+
+ if !delete.has_tag_name("Delete") {
+ return None;
+ }
+
+ for item in delete.children() {
+ if item.has_tag_name("Object") {
+ let key = item.children().find(|e| e.has_tag_name("Key"))?;
+ let key_str = key.text()?;
+ ret.objects.push(DeleteObject {
+ key: key_str.to_string(),
+ });
+ } else if item.has_tag_name("Quiet") {
+ if item.text()? == "true" {
+ ret.quiet = true;
+ } else {
+ ret.quiet = false;
+ }
+ } else {
+ return None;
+ }
+ }
+
+ Some(ret)
+}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
new file mode 100644
index 00000000..3edf22a6
--- /dev/null
+++ b/src/api/s3/get.rs
@@ -0,0 +1,461 @@
+//! Function related to GET and HEAD requests
+use std::sync::Arc;
+use std::time::{Duration, UNIX_EPOCH};
+
+use futures::stream::*;
+use http::header::{
+ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
+ IF_NONE_MATCH, LAST_MODIFIED, RANGE,
+};
+use hyper::body::Bytes;
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_table::EmptyKey;
+use garage_util::data::*;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::error::*;
+
+const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
+
+fn object_headers(
+ version: &ObjectVersion,
+ version_meta: &ObjectVersionMeta,
+) -> http::response::Builder {
+ debug!("Version meta: {:?}", version_meta);
+
+ let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
+ let date_str = httpdate::fmt_http_date(date);
+
+ let mut resp = Response::builder()
+ .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
+ .header(LAST_MODIFIED, date_str)
+ .header(ACCEPT_RANGES, "bytes".to_string());
+
+ if !version_meta.etag.is_empty() {
+ resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
+ }
+
+ for (k, v) in version_meta.headers.other.iter() {
+ resp = resp.header(k, v.to_string());
+ }
+
+ resp
+}
+
+fn try_answer_cached(
+ version: &ObjectVersion,
+ version_meta: &ObjectVersionMeta,
+ req: &Request<Body>,
+) -> Option<Response<Body>> {
+ // <trinity> It is possible, and is even usually the case, [that both If-None-Match and
+ // If-Modified-Since] are present in a request. In this situation If-None-Match takes
+ // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
+ // being that etag based matching is more accurate, it has no issue with sub-second precision
+ // for instance (in case of very fast updates)
+ let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
+ let none_match = none_match.to_str().ok()?;
+ let expected = format!("\"{}\"", version_meta.etag);
+ let found = none_match
+ .split(',')
+ .map(str::trim)
+ .any(|etag| etag == expected || etag == "\"*\"");
+ found
+ } else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
+ let modified_since = modified_since.to_str().ok()?;
+ let client_date = httpdate::parse_http_date(modified_since).ok()?;
+ let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
+ client_date >= server_date
+ } else {
+ false
+ };
+
+ if cached {
+ Some(
+ Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())
+ .unwrap(),
+ )
+ } else {
+ None
+ }
+}
+
+/// Handle HEAD request
+pub async fn handle_head(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ key: &str,
+ part_number: Option<u64>,
+) -> Result<Response<Body>, Error> {
+ let object = garage
+ .object_table
+ .get(&bucket_id, &key.to_string())
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ let object_version = object
+ .versions()
+ .iter()
+ .rev()
+ .find(|v| v.is_data())
+ .ok_or(Error::NoSuchKey)?;
+
+ let version_data = match &object_version.state {
+ ObjectVersionState::Complete(c) => c,
+ _ => unreachable!(),
+ };
+
+ let version_meta = match version_data {
+ ObjectVersionData::Inline(meta, _) => meta,
+ ObjectVersionData::FirstBlock(meta, _) => meta,
+ _ => unreachable!(),
+ };
+
+ if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
+ return Ok(cached);
+ }
+
+ if let Some(pn) = part_number {
+ match version_data {
+ ObjectVersionData::Inline(_, bytes) => {
+ if pn != 1 {
+ return Err(Error::InvalidPart);
+ }
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", bytes.len()))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, "1")
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(Body::empty())?)
+ }
+ ObjectVersionData::FirstBlock(_, _) => {
+ let version = garage
+ .version_table
+ .get(&object_version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ let (part_offset, part_end) =
+ calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
+ let n_parts = version.parts_etags.items().len();
+
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
+ .header(
+ CONTENT_RANGE,
+ format!(
+ "bytes {}-{}/{}",
+ part_offset,
+ part_end - 1,
+ version_meta.size
+ ),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(Body::empty())?)
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", version_meta.size))
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+ }
+}
+
+/// Handle GET request
+pub async fn handle_get(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ key: &str,
+ part_number: Option<u64>,
+) -> Result<Response<Body>, Error> {
+ let object = garage
+ .object_table
+ .get(&bucket_id, &key.to_string())
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ let last_v = object
+ .versions()
+ .iter()
+ .rev()
+ .find(|v| v.is_complete())
+ .ok_or(Error::NoSuchKey)?;
+
+ let last_v_data = match &last_v.state {
+ ObjectVersionState::Complete(x) => x,
+ _ => unreachable!(),
+ };
+ let last_v_meta = match last_v_data {
+ ObjectVersionData::DeleteMarker => return Err(Error::NoSuchKey),
+ ObjectVersionData::Inline(meta, _) => meta,
+ ObjectVersionData::FirstBlock(meta, _) => meta,
+ };
+
+ if let Some(cached) = try_answer_cached(last_v, last_v_meta, req) {
+ return Ok(cached);
+ }
+
+ match (part_number, parse_range_header(req, last_v_meta.size)?) {
+ (Some(_), Some(_)) => {
+ return Err(Error::BadRequest(
+ "Cannot specify both partNumber and Range header".into(),
+ ));
+ }
+ (Some(pn), None) => {
+ return handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await;
+ }
+ (None, Some(range)) => {
+ return handle_get_range(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ range.start,
+ range.start + range.length,
+ )
+ .await;
+ }
+ (None, None) => (),
+ }
+
+ let resp_builder = object_headers(last_v, last_v_meta)
+ .header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
+ .status(StatusCode::OK);
+
+ match &last_v_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_, bytes) => {
+ let body: Body = Body::from(bytes.to_vec());
+ Ok(resp_builder.body(body)?)
+ }
+ ObjectVersionData::FirstBlock(_, first_block_hash) => {
+ let read_first_block = garage.block_manager.rpc_get_block(first_block_hash);
+ let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
+
+ let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
+ let version = version.ok_or(Error::NoSuchKey)?;
+
+ let mut blocks = version
+ .blocks
+ .items()
+ .iter()
+ .map(|(_, vb)| (vb.hash, None))
+ .collect::<Vec<_>>();
+ blocks[0].1 = Some(first_block);
+
+ let body_stream = futures::stream::iter(blocks)
+ .map(move |(hash, data_opt)| {
+ let garage = garage.clone();
+ async move {
+ if let Some(data) = data_opt {
+ Ok(Bytes::from(data))
+ } else {
+ garage
+ .block_manager
+ .rpc_get_block(&hash)
+ .await
+ .map(Bytes::from)
+ }
+ }
+ })
+ .buffered(2);
+
+ let body = hyper::body::Body::wrap_stream(body_stream);
+ Ok(resp_builder.body(body)?)
+ }
+ }
+}
+
+async fn handle_get_range(
+ garage: Arc<Garage>,
+ version: &ObjectVersion,
+ version_data: &ObjectVersionData,
+ version_meta: &ObjectVersionMeta,
+ begin: u64,
+ end: u64,
+) -> Result<Response<Body>, Error> {
+ let resp_builder = object_headers(version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", end - begin))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
+ )
+ .status(StatusCode::PARTIAL_CONTENT);
+
+ match &version_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
+ if end as usize <= bytes.len() {
+ let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
+ Ok(resp_builder.body(body)?)
+ } else {
+ None.ok_or_internal_error(
+ "Requested range not present in inline bytes when it should have been",
+ )
+ }
+ }
+ ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
+ let version = garage
+ .version_table
+ .get(&version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ Ok(resp_builder.body(body)?)
+ }
+ }
+}
+
+async fn handle_get_part(
+ garage: Arc<Garage>,
+ object_version: &ObjectVersion,
+ version_data: &ObjectVersionData,
+ version_meta: &ObjectVersionMeta,
+ part_number: u64,
+) -> Result<Response<Body>, Error> {
+ let resp_builder =
+ object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
+
+ match version_data {
+ ObjectVersionData::Inline(_, bytes) => {
+ if part_number != 1 {
+ return Err(Error::InvalidPart);
+ }
+ Ok(resp_builder
+ .header(CONTENT_LENGTH, format!("{}", bytes.len()))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, "1")
+ .body(Body::from(bytes.to_vec()))?)
+ }
+ ObjectVersionData::FirstBlock(_, _) => {
+ let version = garage
+ .version_table
+ .get(&object_version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ let (begin, end) =
+ calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
+ let n_parts = version.parts_etags.items().len();
+
+ let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+
+ Ok(resp_builder
+ .header(CONTENT_LENGTH, format!("{}", end - begin))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
+ .body(body)?)
+ }
+ _ => unreachable!(),
+ }
+}
+
+fn parse_range_header(
+ req: &Request<Body>,
+ total_size: u64,
+) -> Result<Option<http_range::HttpRange>, Error> {
+ let range = match req.headers().get(RANGE) {
+ Some(range) => {
+ let range_str = range.to_str()?;
+ let mut ranges =
+ http_range::HttpRange::parse(range_str, total_size).map_err(|e| (e, total_size))?;
+ if ranges.len() > 1 {
+ // garage does not support multi-range requests yet, so we respond with the entire
+ // object when multiple ranges are requested
+ None
+ } else {
+ ranges.pop()
+ }
+ }
+ None => None,
+ };
+ Ok(range)
+}
+
+fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
+ let mut offset = 0;
+ for (i, (bk, bv)) in v.blocks.items().iter().enumerate() {
+ if bk.part_number == part_number {
+ let size: u64 = v.blocks.items()[i..]
+ .iter()
+ .take_while(|(k, _)| k.part_number == part_number)
+ .map(|(_, v)| v.size)
+ .sum();
+ return Some((offset, offset + size));
+ }
+ offset += bv.size;
+ }
+ None
+}
+
+fn body_from_blocks_range(
+ garage: Arc<Garage>,
+ all_blocks: &[(VersionBlockKey, VersionBlock)],
+ begin: u64,
+ end: u64,
+) -> Body {
+ // We will store here the list of blocks that have an intersection with the requested
+ // range, as well as their "true offset", which is their actual offset in the complete
+ // file (whereas block.offset designates the offset of the block WITHIN THE PART
+ // block.part_number, which is not the same in the case of a multipart upload)
+ let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
+ all_blocks.len(),
+ 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
+ ));
+ let mut true_offset = 0;
+ for (_, b) in all_blocks.iter() {
+ if true_offset >= end {
+ break;
+ }
+ // Keep only blocks that have an intersection with the requested range
+ if true_offset < end && true_offset + b.size > begin {
+ blocks.push((*b, true_offset));
+ }
+ true_offset += b.size;
+ }
+
+ let body_stream = futures::stream::iter(blocks)
+ .map(move |(block, true_offset)| {
+ let garage = garage.clone();
+ async move {
+ let data = garage.block_manager.rpc_get_block(&block.hash).await?;
+ let data = Bytes::from(data);
+ let start_in_block = if true_offset > begin {
+ 0
+ } else {
+ begin - true_offset
+ };
+ let end_in_block = if true_offset + block.size < end {
+ block.size
+ } else {
+ end - true_offset
+ };
+ Result::<Bytes, Error>::Ok(
+ data.slice(start_in_block as usize..end_in_block as usize),
+ )
+ }
+ })
+ .buffered(2);
+
+ hyper::body::Body::wrap_stream(body_stream)
+}
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
new file mode 100644
index 00000000..e2848c57
--- /dev/null
+++ b/src/api/s3/list.rs
@@ -0,0 +1,1337 @@
+use std::cmp::Ordering;
+use std::collections::{BTreeMap, BTreeSet};
+use std::iter::{Iterator, Peekable};
+use std::sync::Arc;
+
+use hyper::{Body, Response};
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::*;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::Version;
+
+use garage_table::{EmptyKey, EnumerationOrder};
+
+use crate::encoding::*;
+use crate::error::*;
+use crate::helpers::key_after_prefix;
+use crate::s3::put as s3_put;
+use crate::s3::xml as s3_xml;
+
+const DUMMY_NAME: &str = "Dummy Key";
+const DUMMY_KEY: &str = "GKDummyKey";
+
+#[derive(Debug)]
+pub struct ListQueryCommon {
+ pub bucket_name: String,
+ pub bucket_id: Uuid,
+ pub delimiter: Option<String>,
+ pub page_size: usize,
+ pub prefix: String,
+ pub urlencode_resp: bool,
+}
+
+#[derive(Debug)]
+pub struct ListObjectsQuery {
+ pub is_v2: bool,
+ pub marker: Option<String>,
+ pub continuation_token: Option<String>,
+ pub start_after: Option<String>,
+ pub common: ListQueryCommon,
+}
+
+#[derive(Debug)]
+pub struct ListMultipartUploadsQuery {
+ pub key_marker: Option<String>,
+ pub upload_id_marker: Option<String>,
+ pub common: ListQueryCommon,
+}
+
+#[derive(Debug)]
+pub struct ListPartsQuery {
+ pub bucket_name: String,
+ pub bucket_id: Uuid,
+ pub key: String,
+ pub upload_id: String,
+ pub part_number_marker: Option<u64>,
+ pub max_parts: u64,
+}
+
+pub async fn handle_list(
+ garage: Arc<Garage>,
+ query: &ListObjectsQuery,
+) -> Result<Response<Body>, Error> {
+ let io = |bucket, key, count| {
+ let t = &garage.object_table;
+ async move {
+ t.get_range(
+ &bucket,
+ key,
+ Some(ObjectFilter::IsData),
+ count,
+ EnumerationOrder::Forward,
+ )
+ .await
+ }
+ };
+
+ debug!("ListObjects {:?}", query);
+ let mut acc = query.build_accumulator();
+ let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
+
+ let result = s3_xml::ListBucketResult {
+ xmlns: (),
+ // Sending back request information
+ name: s3_xml::Value(query.common.bucket_name.to_string()),
+ prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
+ max_keys: s3_xml::IntValue(query.common.page_size as i64),
+ delimiter: query
+ .common
+ .delimiter
+ .as_ref()
+ .map(|x| uriencode_maybe(x, query.common.urlencode_resp)),
+ encoding_type: match query.common.urlencode_resp {
+ true => Some(s3_xml::Value("url".to_string())),
+ false => None,
+ },
+ marker: match (!query.is_v2, &query.marker) {
+ (true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
+ _ => None,
+ },
+ start_after: match (query.is_v2, &query.start_after) {
+ (true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)),
+ _ => None,
+ },
+ continuation_token: match (query.is_v2, &query.continuation_token) {
+ (true, Some(ct)) => Some(s3_xml::Value(ct.to_string())),
+ _ => None,
+ },
+
+ // Pagination
+ is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
+ key_count: Some(s3_xml::IntValue(
+ acc.keys.len() as i64 + acc.common_prefixes.len() as i64,
+ )),
+ next_marker: match (!query.is_v2, &pagination) {
+ (true, Some(RangeBegin::AfterKey { key: k }))
+ | (
+ true,
+ Some(RangeBegin::IncludingKey {
+ fallback_key: Some(k),
+ ..
+ }),
+ ) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
+ _ => None,
+ },
+ next_continuation_token: match (query.is_v2, &pagination) {
+ (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!(
+ "]{}",
+ base64::encode(key.as_bytes())
+ ))),
+ (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!(
+ "[{}",
+ base64::encode(key.as_bytes())
+ ))),
+ _ => None,
+ },
+
+ // Body
+ contents: acc
+ .keys
+ .iter()
+ .map(|(key, info)| s3_xml::ListBucketItem {
+ key: uriencode_maybe(key, query.common.urlencode_resp),
+ last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)),
+ size: s3_xml::IntValue(info.size as i64),
+ etag: s3_xml::Value(format!("\"{}\"", info.etag)),
+ storage_class: s3_xml::Value("STANDARD".to_string()),
+ })
+ .collect(),
+ common_prefixes: acc
+ .common_prefixes
+ .iter()
+ .map(|pfx| s3_xml::CommonPrefix {
+ prefix: uriencode_maybe(pfx, query.common.urlencode_resp),
+ })
+ .collect(),
+ };
+
+ let xml = s3_xml::to_xml_with_header(&result)?;
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub async fn handle_list_multipart_upload(
+ garage: Arc<Garage>,
+ query: &ListMultipartUploadsQuery,
+) -> Result<Response<Body>, Error> {
+ let io = |bucket, key, count| {
+ let t = &garage.object_table;
+ async move {
+ t.get_range(
+ &bucket,
+ key,
+ Some(ObjectFilter::IsUploading),
+ count,
+ EnumerationOrder::Forward,
+ )
+ .await
+ }
+ };
+
+ debug!("ListMultipartUploads {:?}", query);
+ let mut acc = query.build_accumulator();
+ let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
+
+ let result = s3_xml::ListMultipartUploadsResult {
+ xmlns: (),
+
+ // Sending back some information about the request
+ bucket: s3_xml::Value(query.common.bucket_name.to_string()),
+ prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
+ delimiter: query
+ .common
+ .delimiter
+ .as_ref()
+ .map(|d| uriencode_maybe(d, query.common.urlencode_resp)),
+ max_uploads: s3_xml::IntValue(query.common.page_size as i64),
+ key_marker: query
+ .key_marker
+ .as_ref()
+ .map(|m| uriencode_maybe(m, query.common.urlencode_resp)),
+ upload_id_marker: query
+ .upload_id_marker
+ .as_ref()
+ .map(|m| s3_xml::Value(m.to_string())),
+ encoding_type: match query.common.urlencode_resp {
+ true => Some(s3_xml::Value("url".to_string())),
+ false => None,
+ },
+
+ // Handling pagination
+ is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
+ next_key_marker: match &pagination {
+ None => None,
+ Some(RangeBegin::AfterKey { key })
+ | Some(RangeBegin::AfterUpload { key, .. })
+ | Some(RangeBegin::IncludingKey { key, .. }) => {
+ Some(uriencode_maybe(key, query.common.urlencode_resp))
+ }
+ },
+ next_upload_id_marker: match pagination {
+ Some(RangeBegin::AfterUpload { upload, .. }) => {
+ Some(s3_xml::Value(hex::encode(upload)))
+ }
+ Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())),
+ _ => None,
+ },
+
+ // Result body
+ upload: acc
+ .keys
+ .iter()
+ .map(|(uuid, info)| s3_xml::ListMultipartItem {
+ initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)),
+ key: uriencode_maybe(&info.key, query.common.urlencode_resp),
+ upload_id: s3_xml::Value(hex::encode(uuid)),
+ storage_class: s3_xml::Value("STANDARD".to_string()),
+ initiator: s3_xml::Initiator {
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
+ },
+ owner: s3_xml::Owner {
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
+ },
+ })
+ .collect(),
+ common_prefixes: acc
+ .common_prefixes
+ .iter()
+ .map(|c| s3_xml::CommonPrefix {
+ prefix: s3_xml::Value(c.to_string()),
+ })
+ .collect(),
+ };
+
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub async fn handle_list_parts(
+ garage: Arc<Garage>,
+ query: &ListPartsQuery,
+) -> Result<Response<Body>, Error> {
+ debug!("ListParts {:?}", query);
+
+ let upload_id = s3_put::decode_upload_id(&query.upload_id)?;
+
+ let (object, version) = futures::try_join!(
+ garage.object_table.get(&query.bucket_id, &query.key),
+ garage.version_table.get(&upload_id, &EmptyKey),
+ )?;
+
+ let (info, next) = fetch_part_info(query, object, version, upload_id)?;
+
+ let result = s3_xml::ListPartsResult {
+ xmlns: (),
+ bucket: s3_xml::Value(query.bucket_name.to_string()),
+ key: s3_xml::Value(query.key.to_string()),
+ upload_id: s3_xml::Value(query.upload_id.to_string()),
+ part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)),
+ next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)),
+ max_parts: s3_xml::IntValue(query.max_parts as i64),
+ is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()),
+ parts: info
+ .iter()
+ .map(|part| s3_xml::PartItem {
+ etag: s3_xml::Value(format!("\"{}\"", part.etag)),
+ last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
+ part_number: s3_xml::IntValue(part.part_number as i64),
+ size: s3_xml::IntValue(part.size as i64),
+ })
+ .collect(),
+ initiator: s3_xml::Initiator {
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
+ },
+ owner: s3_xml::Owner {
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
+ },
+ storage_class: s3_xml::Value("STANDARD".to_string()),
+ };
+
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+/*
+ * Private enums and structs
+ */
+
+#[derive(Debug)]
+struct ObjectInfo {
+ last_modified: u64,
+ size: u64,
+ etag: String,
+}
+
+#[derive(Debug, PartialEq)]
+struct UploadInfo {
+ key: String,
+ timestamp: u64,
+}
+
+#[derive(Debug, PartialEq)]
+struct PartInfo {
+ etag: String,
+ timestamp: u64,
+ part_number: u64,
+ size: u64,
+}
+
+enum ExtractionResult {
+ NoMore,
+ Filled,
+ FilledAtUpload {
+ key: String,
+ upload: Uuid,
+ },
+ Extracted {
+ key: String,
+ },
+ // Fallback key is used for legacy APIs that only support
+ // exlusive pagination (and not inclusive one).
+ SkipTo {
+ key: String,
+ fallback_key: Option<String>,
+ },
+}
+
+#[derive(PartialEq, Clone, Debug)]
+enum RangeBegin {
+ // Fallback key is used for legacy APIs that only support
+ // exlusive pagination (and not inclusive one).
+ IncludingKey {
+ key: String,
+ fallback_key: Option<String>,
+ },
+ AfterKey {
+ key: String,
+ },
+ AfterUpload {
+ key: String,
+ upload: Uuid,
+ },
+}
+type Pagination = Option<RangeBegin>;
+
+/*
+ * Fetch list entries
+ */
+
+async fn fetch_list_entries<R, F>(
+ query: &ListQueryCommon,
+ begin: RangeBegin,
+ acc: &mut impl ExtractAccumulator,
+ mut io: F,
+) -> Result<Pagination, Error>
+where
+ R: futures::Future<Output = Result<Vec<Object>, GarageError>>,
+ F: FnMut(Uuid, Option<String>, usize) -> R,
+{
+ let mut cursor = begin;
+ // +1 is needed as we may need to skip the 1st key
+ // (range is inclusive while most S3 requests are exclusive)
+ let count = query.page_size + 1;
+
+ loop {
+ let start_key = match cursor {
+ RangeBegin::AfterKey { ref key }
+ | RangeBegin::AfterUpload { ref key, .. }
+ | RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()),
+ };
+
+ // Fetch objects
+ let objects = io(query.bucket_id, start_key.clone(), count).await?;
+
+ debug!(
+ "List: get range {:?} (max {}), results: {}",
+ start_key,
+ count,
+ objects.len()
+ );
+ let server_more = objects.len() >= count;
+
+ let prev_req_cursor = cursor.clone();
+ let mut iter = objects.iter().peekable();
+
+ // Drop the first key if needed
+ // Only AfterKey requires it according to the S3 spec and our implem.
+ match (&cursor, iter.peek()) {
+ (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(),
+ (_, _) => None,
+ };
+
+ while let Some(object) = iter.peek() {
+ if !object.key.starts_with(&query.prefix) {
+ // If the key is not in the requested prefix, we're done
+ return Ok(None);
+ }
+
+ cursor = match acc.extract(query, &cursor, &mut iter) {
+ ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key },
+ ExtractionResult::SkipTo { key, fallback_key } => {
+ RangeBegin::IncludingKey { key, fallback_key }
+ }
+ ExtractionResult::FilledAtUpload { key, upload } => {
+ return Ok(Some(RangeBegin::AfterUpload { key, upload }))
+ }
+ ExtractionResult::Filled => return Ok(Some(cursor)),
+ ExtractionResult::NoMore => return Ok(None),
+ };
+ }
+
+ if !server_more {
+ // We did not fully fill the accumulator despite exhausting all the data we have,
+ // we're done
+ return Ok(None);
+ }
+
+ if prev_req_cursor == cursor {
+ unreachable!("No progress has been done in the loop. This is a bug, please report it.");
+ }
+ }
+}
+
+fn fetch_part_info(
+ query: &ListPartsQuery,
+ object: Option<Object>,
+ version: Option<Version>,
+ upload_id: Uuid,
+) -> Result<(Vec<PartInfo>, Option<u64>), Error> {
+ // Check results
+ let object = object.ok_or(Error::NoSuchKey)?;
+
+ let obj_version = object
+ .versions()
+ .iter()
+ .find(|v| v.uuid == upload_id && v.is_uploading())
+ .ok_or(Error::NoSuchUpload)?;
+
+ let version = version.ok_or(Error::NoSuchKey)?;
+
+ // Cut the beginning of our 2 vectors if required
+ let (etags, blocks) = match &query.part_number_marker {
+ Some(marker) => {
+ let next = marker + 1;
+
+ let part_idx = into_ok_or_err(
+ version
+ .parts_etags
+ .items()
+ .binary_search_by(|(part_num, _)| part_num.cmp(&next)),
+ );
+ let parts = &version.parts_etags.items()[part_idx..];
+
+ let block_idx = into_ok_or_err(
+ version
+ .blocks
+ .items()
+ .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)),
+ );
+ let blocks = &version.blocks.items()[block_idx..];
+
+ (parts, blocks)
+ }
+ None => (version.parts_etags.items(), version.blocks.items()),
+ };
+
+ // Use the block vector to compute a (part_number, size) vector
+ let mut size = Vec::<(u64, u64)>::new();
+ blocks.iter().for_each(|(key, val)| {
+ let mut new_size = val.size;
+ match size.pop() {
+ Some((part_number, size)) if part_number == key.part_number => new_size += size,
+ Some(v) => size.push(v),
+ None => (),
+ }
+ size.push((key.part_number, new_size))
+ });
+
+ // Merge the etag vector and size vector to build a PartInfo vector
+ let max_parts = query.max_parts as usize;
+ let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable());
+
+ let mut info = Vec::<PartInfo>::with_capacity(max_parts);
+
+ while info.len() < max_parts {
+ match (etag_iter.peek(), size_iter.peek()) {
+ (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) {
+ Ordering::Less => {
+ debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query);
+ etag_iter.next();
+ }
+ Ordering::Equal => {
+ info.push(PartInfo {
+ etag: etag.to_string(),
+ timestamp: obj_version.timestamp,
+ part_number: *ep,
+ size: *size,
+ });
+ etag_iter.next();
+ size_iter.next();
+ }
+ Ordering::Greater => {
+ debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query);
+ size_iter.next();
+ }
+ },
+ (None, None) => return Ok((info, None)),
+ _ => {
+ debug!(
+ "Additional block or ETag information ignored. Query: {:?}",
+ query
+ );
+ return Ok((info, None));
+ }
+ }
+ }
+
+ match info.last() {
+ Some(part_info) => {
+ let pagination = Some(part_info.part_number);
+ Ok((info, pagination))
+ }
+ None => Ok((info, None)),
+ }
+}
+
+/*
+ * ListQuery logic
+ */
+
+/// Determine the key from where we want to start fetch objects from the database
+///
+/// We choose whether the object at this key must
+/// be included or excluded from the response.
+/// This key can be the prefix in the base case, or intermediate
+/// points in the dataset if we are continuing a previous listing.
+impl ListObjectsQuery {
+ fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> {
+ Accumulator::<String, ObjectInfo>::new(self.common.page_size)
+ }
+
+ fn begin(&self) -> Result<RangeBegin, Error> {
+ if self.is_v2 {
+ match (&self.continuation_token, &self.start_after) {
+ // In V2 mode, the continuation token is defined as an opaque
+ // string in the spec, so we can do whatever we want with it.
+ // In our case, it is defined as either [ or ] (for include
+ // representing the key to start with.
+ (Some(token), _) => match &token[..1] {
+ "[" => Ok(RangeBegin::IncludingKey {
+ key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ fallback_key: None,
+ }),
+ "]" => Ok(RangeBegin::AfterKey {
+ key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ }),
+ _ => Err(Error::BadRequest("Invalid continuation token".to_string())),
+ },
+
+ // StartAfter has defined semantics in the spec:
+ // start listing at the first key immediately after.
+ (_, Some(key)) => Ok(RangeBegin::AfterKey {
+ key: key.to_string(),
+ }),
+
+ // In the case where neither is specified, we start
+ // listing at the specified prefix. If an object has this
+ // exact same key, we include it. (@TODO is this correct?)
+ _ => Ok(RangeBegin::IncludingKey {
+ key: self.common.prefix.to_string(),
+ fallback_key: None,
+ }),
+ }
+ } else {
+ match &self.marker {
+ // In V1 mode, the spec defines the Marker value to mean
+ // the same thing as the StartAfter value in V2 mode.
+ Some(key) => Ok(RangeBegin::AfterKey {
+ key: key.to_string(),
+ }),
+ _ => Ok(RangeBegin::IncludingKey {
+ key: self.common.prefix.to_string(),
+ fallback_key: None,
+ }),
+ }
+ }
+ }
+}
+
+impl ListMultipartUploadsQuery {
+ fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> {
+ Accumulator::<Uuid, UploadInfo>::new(self.common.page_size)
+ }
+
+ fn begin(&self) -> Result<RangeBegin, Error> {
+ match (&self.upload_id_marker, &self.key_marker) {
+ // If both the upload id marker and the key marker are sets,
+ // the spec specifies that we must start listing uploads INCLUDING the given key,
+ // AFTER the specified upload id (sorted in a lexicographic order).
+ // To enable some optimisations, we emulate "IncludingKey" by extending the upload id
+ // semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques
+ // while Garage's ones are 32 bytes hex encoded which enables us to extend this query
+ // with a specific "include" upload id.
+ (Some(up_marker), Some(key_marker)) => match &up_marker[..] {
+ "include" => Ok(RangeBegin::IncludingKey {
+ key: key_marker.to_string(),
+ fallback_key: None,
+ }),
+ uuid => Ok(RangeBegin::AfterUpload {
+ key: key_marker.to_string(),
+ upload: s3_put::decode_upload_id(uuid)?,
+ }),
+ },
+
+ // If only the key marker is specified, the spec says that we must start listing
+ // uploads AFTER the specified key.
+ (None, Some(key_marker)) => Ok(RangeBegin::AfterKey {
+ key: key_marker.to_string(),
+ }),
+ _ => Ok(RangeBegin::IncludingKey {
+ key: self.common.prefix.to_string(),
+ fallback_key: None,
+ }),
+ }
+ }
+}
+
+/*
+ * Accumulator logic
+ */
+
+trait ExtractAccumulator {
+ fn extract<'a>(
+ &mut self,
+ query: &ListQueryCommon,
+ cursor: &RangeBegin,
+ iter: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ ) -> ExtractionResult;
+}
+
+struct Accumulator<K, V> {
+ common_prefixes: BTreeSet<String>,
+ keys: BTreeMap<K, V>,
+ max_capacity: usize,
+}
+
+type ObjectAccumulator = Accumulator<String, ObjectInfo>;
+type UploadAccumulator = Accumulator<Uuid, UploadInfo>;
+
+impl<K: std::cmp::Ord, V> Accumulator<K, V> {
+ fn new(page_size: usize) -> Accumulator<K, V> {
+ Accumulator {
+ common_prefixes: BTreeSet::<String>::new(),
+ keys: BTreeMap::<K, V>::new(),
+ max_capacity: page_size,
+ }
+ }
+
+ /// Observe the Object iterator and try to extract a single common prefix
+ ///
+ /// This function can consume an arbitrary number of items as long as they share the same
+ /// common prefix.
+ fn extract_common_prefix<'a>(
+ &mut self,
+ objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ query: &ListQueryCommon,
+ ) -> Option<ExtractionResult> {
+ // Get the next object from the iterator
+ let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+
+ // Check if this is a common prefix (requires a passed delimiter and its value in the key)
+ let pfx = match common_prefix(object, query) {
+ Some(p) => p,
+ None => return None,
+ };
+
+ // Try to register this prefix
+ // If not possible, we can return early
+ if !self.try_insert_common_prefix(pfx.to_string()) {
+ return Some(ExtractionResult::Filled);
+ }
+
+ // We consume the whole common prefix from the iterator
+ let mut last_pfx_key = &object.key;
+ loop {
+ last_pfx_key = match objects.peek() {
+ Some(o) if o.key.starts_with(pfx) => &o.key,
+ Some(_) => {
+ return Some(ExtractionResult::Extracted {
+ key: last_pfx_key.to_owned(),
+ })
+ }
+ None => {
+ return match key_after_prefix(pfx) {
+ Some(next) => Some(ExtractionResult::SkipTo {
+ key: next,
+ fallback_key: Some(last_pfx_key.to_owned()),
+ }),
+ None => Some(ExtractionResult::NoMore),
+ }
+ }
+ };
+
+ objects.next();
+ }
+ }
+
+ fn is_full(&mut self) -> bool {
+ self.keys.len() + self.common_prefixes.len() >= self.max_capacity
+ }
+
+ fn try_insert_common_prefix(&mut self, key: String) -> bool {
+ // If we already have an entry, we can continue
+ if self.common_prefixes.contains(&key) {
+ return true;
+ }
+
+ // Otherwise, we need to check if we can add it
+ match self.is_full() {
+ true => false,
+ false => {
+ self.common_prefixes.insert(key);
+ true
+ }
+ }
+ }
+
+ fn try_insert_entry(&mut self, key: K, value: V) -> bool {
+ // It is impossible to add twice a key, this is an error
+ assert!(!self.keys.contains_key(&key));
+
+ match self.is_full() {
+ true => false,
+ false => {
+ self.keys.insert(key, value);
+ true
+ }
+ }
+ }
+}
+
+impl ExtractAccumulator for ObjectAccumulator {
+ fn extract<'a>(
+ &mut self,
+ query: &ListQueryCommon,
+ _cursor: &RangeBegin,
+ objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ ) -> ExtractionResult {
+ if let Some(e) = self.extract_common_prefix(objects, query) {
+ return e;
+ }
+
+ let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+
+ let version = match object.versions().iter().find(|x| x.is_data()) {
+ Some(v) => v,
+ None => unreachable!(
+ "Expect to have objects having data due to earlier filtering. This is a logic bug."
+ ),
+ };
+
+ let meta = match &version.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
+ ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
+ _ => unreachable!(),
+ };
+ let info = ObjectInfo {
+ last_modified: version.timestamp,
+ size: meta.size,
+ etag: meta.etag.to_string(),
+ };
+
+ match self.try_insert_entry(object.key.clone(), info) {
+ true => ExtractionResult::Extracted {
+ key: object.key.clone(),
+ },
+ false => ExtractionResult::Filled,
+ }
+ }
+}
+
+impl ExtractAccumulator for UploadAccumulator {
+ /// Observe the iterator, process a single key, and try to extract one or more upload entries
+ ///
+ /// This function processes a single object from the iterator that can contain an arbitrary
+ /// number of versions, and thus "uploads".
+ fn extract<'a>(
+ &mut self,
+ query: &ListQueryCommon,
+ cursor: &RangeBegin,
+ objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ ) -> ExtractionResult {
+ if let Some(e) = self.extract_common_prefix(objects, query) {
+ return e;
+ }
+
+ // Get the next object from the iterator
+ let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+
+ let mut uploads_for_key = object
+ .versions()
+ .iter()
+ .filter(|x| x.is_uploading())
+ .collect::<Vec<&ObjectVersion>>();
+
+ // S3 logic requires lexicographically sorted upload ids.
+ uploads_for_key.sort_unstable_by_key(|e| e.uuid);
+
+ // Skip results if an upload marker is provided
+ if let RangeBegin::AfterUpload { upload, .. } = cursor {
+ // Because our data are sorted, we can use a binary search to find the UUID
+ // or to find where it should have been added. Once this position is found,
+ // we use it to discard the first part of the array.
+ let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) {
+ // we start after the found uuid so we need to discard the pointed value.
+ // In the worst case, the UUID is the last element, which lead us to an empty array
+ // but we are never out of bound.
+ Ok(i) => i + 1,
+ // if the UUID is not found, the upload may have been discarded between the 2 request,
+ // this function returns where it could have been inserted,
+ // the pointed value is thus greater than our marker and we need to keep it.
+ Err(i) => i,
+ };
+ uploads_for_key = uploads_for_key[idx..].to_vec();
+ }
+
+ let mut iter = uploads_for_key.iter();
+
+ // The first entry is a specific case
+ // as it changes our result enum type
+ let first_upload = match iter.next() {
+ Some(u) => u,
+ None => {
+ return ExtractionResult::Extracted {
+ key: object.key.clone(),
+ }
+ }
+ };
+ let first_up_info = UploadInfo {
+ key: object.key.to_string(),
+ timestamp: first_upload.timestamp,
+ };
+ if !self.try_insert_entry(first_upload.uuid, first_up_info) {
+ return ExtractionResult::Filled;
+ }
+
+ // We can then collect the remaining uploads in a loop
+ let mut prev_uuid = first_upload.uuid;
+ for upload in iter {
+ let up_info = UploadInfo {
+ key: object.key.to_string(),
+ timestamp: upload.timestamp,
+ };
+
+ // Insert data in our accumulator
+ // If it is full, return information to paginate.
+ if !self.try_insert_entry(upload.uuid, up_info) {
+ return ExtractionResult::FilledAtUpload {
+ key: object.key.clone(),
+ upload: prev_uuid,
+ };
+ }
+ // Update our last added UUID
+ prev_uuid = upload.uuid;
+ }
+
+ // We successfully collected all the uploads
+ ExtractionResult::Extracted {
+ key: object.key.clone(),
+ }
+ }
+}
+
+/*
+ * Utility functions
+ */
+
+/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable
+fn into_ok_or_err<T>(r: Result<T, T>) -> T {
+ match r {
+ Ok(r) => r,
+ Err(r) => r,
+ }
+}
+
+/// Returns the common prefix of the object given the query prefix and delimiter
+fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
+ match &query.delimiter {
+ Some(delimiter) => object.key[query.prefix.len()..]
+ .find(delimiter)
+ .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
+ None => None,
+ }
+}
+
+/// URIencode a value if needed
+fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
+ if yes {
+ s3_xml::Value(uri_encode(s, true))
+ } else {
+ s3_xml::Value(s.to_string())
+ }
+}
+
+/*
+ * Unit tests of this module
+ */
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use garage_model::s3::version_table::*;
+ use garage_util::*;
+ use std::iter::FromIterator;
+
+ const TS: u64 = 1641394898314;
+
+ fn bucket() -> Uuid {
+ Uuid::from([0x42; 32])
+ }
+
+ fn query() -> ListMultipartUploadsQuery {
+ ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ prefix: "".to_string(),
+ delimiter: Some("/".to_string()),
+ page_size: 1000,
+ urlencode_resp: false,
+ bucket_name: "a".to_string(),
+ bucket_id: Uuid::from([0x00; 32]),
+ },
+ key_marker: None,
+ upload_id_marker: None,
+ }
+ }
+
+ fn objs() -> Vec<Object> {
+ vec![
+ Object::new(
+ bucket(),
+ "a/b/c".to_string(),
+ vec![objup_version([0x01; 32])],
+ ),
+ Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]),
+ ]
+ }
+
+ fn objup_version(uuid: [u8; 32]) -> ObjectVersion {
+ ObjectVersion {
+ uuid: Uuid::from(uuid),
+ timestamp: TS,
+ state: ObjectVersionState::Uploading(ObjectVersionHeaders {
+ content_type: "text/plain".to_string(),
+ other: BTreeMap::<String, String>::new(),
+ }),
+ }
+ }
+
+ #[test]
+ fn test_common_prefixes() {
+ let mut query = query();
+ let objs = objs();
+
+ query.common.prefix = "a/".to_string();
+ assert_eq!(
+ common_prefix(objs.get(0).unwrap(), &query.common),
+ Some("a/b/")
+ );
+
+ query.common.prefix = "a/b/".to_string();
+ assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None);
+ }
+
+ #[test]
+ fn test_extract_common_prefix() {
+ let mut query = query();
+ query.common.prefix = "a/".to_string();
+ let objs = objs();
+ let mut acc = UploadAccumulator::new(query.common.page_size);
+
+ let mut iter = objs.iter().peekable();
+ match acc.extract_common_prefix(&mut iter, &query.common) {
+ Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()),
+ _ => panic!("wrong result"),
+ }
+ assert_eq!(acc.common_prefixes.len(), 1);
+ assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/");
+ }
+
+ #[test]
+ fn test_extract_upload() {
+ let objs = vec![
+ Object::new(
+ bucket(),
+ "b".to_string(),
+ vec![
+ objup_version([0x01; 32]),
+ objup_version([0x80; 32]),
+ objup_version([0x8f; 32]),
+ objup_version([0xdd; 32]),
+ ],
+ ),
+ Object::new(bucket(), "c".to_string(), vec![]),
+ ];
+
+ let mut acc = UploadAccumulator::new(2);
+ let mut start = RangeBegin::AfterUpload {
+ key: "b".to_string(),
+ upload: Uuid::from([0x01; 32]),
+ };
+
+ let mut iter = objs.iter().peekable();
+
+ // Check the case where we skip some uploads
+ match acc.extract(&(query().common), &start, &mut iter) {
+ ExtractionResult::FilledAtUpload { key, upload } => {
+ assert_eq!(key, "b");
+ assert_eq!(upload, Uuid::from([0x8f; 32]));
+ }
+ _ => panic!("wrong result"),
+ };
+
+ assert_eq!(acc.keys.len(), 2);
+ assert_eq!(
+ acc.keys.get(&Uuid::from([0x80; 32])).unwrap(),
+ &UploadInfo {
+ timestamp: TS,
+ key: "b".to_string()
+ }
+ );
+ assert_eq!(
+ acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(),
+ &UploadInfo {
+ timestamp: TS,
+ key: "b".to_string()
+ }
+ );
+
+ acc = UploadAccumulator::new(2);
+ start = RangeBegin::AfterUpload {
+ key: "b".to_string(),
+ upload: Uuid::from([0xff; 32]),
+ };
+ iter = objs.iter().peekable();
+
+ // Check the case where we skip all the uploads
+ match acc.extract(&(query().common), &start, &mut iter) {
+ ExtractionResult::Extracted { key } if key.as_str() == "b" => (),
+ _ => panic!("wrong result"),
+ };
+ }
+
+ #[tokio::test]
+ async fn test_fetch_uploads_no_result() -> Result<(), Error> {
+ let query = query();
+ let mut acc = query.build_accumulator();
+ let page = fetch_list_entries(
+ &query.common,
+ query.begin()?,
+ &mut acc,
+ |_, _, _| async move { Ok(vec![]) },
+ )
+ .await?;
+ assert_eq!(page, None);
+ assert_eq!(acc.common_prefixes.len(), 0);
+ assert_eq!(acc.keys.len(), 0);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_fetch_uploads_basic() -> Result<(), Error> {
+ let query = query();
+ let mut acc = query.build_accumulator();
+ let mut fake_io = |_, _, _| async move { Ok(objs()) };
+ let page =
+ fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
+ assert_eq!(page, None);
+ assert_eq!(acc.common_prefixes.len(), 1);
+ assert_eq!(acc.keys.len(), 1);
+ assert!(acc.common_prefixes.contains("a/"));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_fetch_uploads_advanced() -> Result<(), Error> {
+ let mut query = query();
+ query.common.page_size = 2;
+
+ let mut fake_io = |_, k: Option<String>, _| async move {
+ Ok(match k.as_deref() {
+ Some("") => vec![
+ Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]),
+ ],
+ Some("b0") => vec![
+ Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]),
+ ],
+ Some("c0") => vec![Object::new(
+ bucket(),
+ "d".to_string(),
+ vec![objup_version([0x01; 32])],
+ )],
+ _ => panic!("wrong value {:?}", k),
+ })
+ };
+
+ let mut acc = query.build_accumulator();
+ let page =
+ fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
+ assert_eq!(
+ page,
+ Some(RangeBegin::IncludingKey {
+ key: "c0".to_string(),
+ fallback_key: Some("c/c".to_string())
+ })
+ );
+ assert_eq!(acc.common_prefixes.len(), 2);
+ assert_eq!(acc.keys.len(), 0);
+ assert!(acc.common_prefixes.contains("b/"));
+ assert!(acc.common_prefixes.contains("c/"));
+
+ Ok(())
+ }
+
+ fn version() -> Version {
+ let uuid = Uuid::from([0x08; 32]);
+
+ let blocks = vec![
+ (
+ VersionBlockKey {
+ part_number: 1,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 3,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 1,
+ offset: 2,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 2,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 2,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 8,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 5,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 7,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 8,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 5,
+ },
+ ),
+ ];
+ let etags = vec![
+ (1, "etag1".to_string()),
+ (3, "etag2".to_string()),
+ (5, "etag3".to_string()),
+ (8, "etag4".to_string()),
+ (9, "etag5".to_string()),
+ ];
+
+ Version {
+ bucket_id: uuid,
+ key: "a".to_string(),
+ uuid,
+ deleted: false.into(),
+ blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
+ parts_etags: crdt::Map::<u64, String>::from_iter(etags),
+ }
+ }
+
+ fn obj() -> Object {
+ Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])])
+ }
+
+ #[test]
+ fn test_fetch_part_info() -> Result<(), Error> {
+ let uuid = Uuid::from([0x08; 32]);
+ let mut query = ListPartsQuery {
+ bucket_name: "a".to_string(),
+ bucket_id: uuid,
+ key: "a".to_string(),
+ upload_id: "xx".to_string(),
+ part_number_marker: None,
+ max_parts: 2,
+ };
+
+ assert!(
+ fetch_part_info(&query, None, None, uuid).is_err(),
+ "No object and version should fail"
+ );
+ assert!(
+ fetch_part_info(&query, Some(obj()), None, uuid).is_err(),
+ "No version should faild"
+ );
+ assert!(
+ fetch_part_info(&query, None, Some(version()), uuid).is_err(),
+ "No object should fail"
+ );
+
+ // Start from the beginning but with limited size to trigger pagination
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert_eq!(pagination.unwrap(), 5);
+ assert_eq!(
+ info,
+ vec![
+ PartInfo {
+ etag: "etag1".to_string(),
+ timestamp: TS,
+ part_number: 1,
+ size: 5
+ },
+ PartInfo {
+ etag: "etag3".to_string(),
+ timestamp: TS,
+ part_number: 5,
+ size: 7
+ },
+ ]
+ );
+
+ // Use previous pagination to make a new request
+ query.part_number_marker = Some(pagination.unwrap());
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert!(pagination.is_none());
+ assert_eq!(
+ info,
+ vec![PartInfo {
+ etag: "etag4".to_string(),
+ timestamp: TS,
+ part_number: 8,
+ size: 5
+ },]
+ );
+
+ // Trying to access a part that is way larger than registered ones
+ query.part_number_marker = Some(9999);
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert!(pagination.is_none());
+ assert_eq!(info, vec![]);
+
+ // Try without any limitation
+ query.max_parts = 1000;
+ query.part_number_marker = None;
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert!(pagination.is_none());
+ assert_eq!(
+ info,
+ vec![
+ PartInfo {
+ etag: "etag1".to_string(),
+ timestamp: TS,
+ part_number: 1,
+ size: 5
+ },
+ PartInfo {
+ etag: "etag3".to_string(),
+ timestamp: TS,
+ part_number: 5,
+ size: 7
+ },
+ PartInfo {
+ etag: "etag4".to_string(),
+ timestamp: TS,
+ part_number: 8,
+ size: 5
+ },
+ ]
+ );
+
+ Ok(())
+ }
+}
diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs
new file mode 100644
index 00000000..3f5c1915
--- /dev/null
+++ b/src/api/s3/mod.rs
@@ -0,0 +1,14 @@
+pub mod api_server;
+
+mod bucket;
+mod copy;
+pub mod cors;
+mod delete;
+pub mod get;
+mod list;
+mod post_object;
+mod put;
+mod website;
+
+mod router;
+pub mod xml;
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
new file mode 100644
index 00000000..86fa7880
--- /dev/null
+++ b/src/api/s3/post_object.rs
@@ -0,0 +1,507 @@
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::ops::RangeInclusive;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use bytes::Bytes;
+use chrono::{DateTime, Duration, Utc};
+use futures::{Stream, StreamExt};
+use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
+use hyper::{Body, Request, Response, StatusCode};
+use multer::{Constraints, Multipart, SizeLimit};
+use serde::Deserialize;
+
+use garage_model::garage::Garage;
+
+use crate::error::*;
+use crate::helpers::resolve_bucket;
+use crate::s3::put::{get_headers, save_stream};
+use crate::s3::xml as s3_xml;
+use crate::signature::payload::{parse_date, verify_v4};
+
+pub async fn handle_post_object(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket: String,
+) -> Result<Response<Body>, Error> {
+ let boundary = req
+ .headers()
+ .get(header::CONTENT_TYPE)
+ .and_then(|ct| ct.to_str().ok())
+ .and_then(|ct| multer::parse_boundary(ct).ok())
+ .ok_or_bad_request("Counld not get multipart boundary")?;
+
+ // 16k seems plenty for a header. 5G is the max size of a single part, so it seems reasonable
+ // for a PostObject
+ let constraints = Constraints::new().size_limit(
+ SizeLimit::new()
+ .per_field(16 * 1024)
+ .for_field("file", 5 * 1024 * 1024 * 1024),
+ );
+
+ let (head, body) = req.into_parts();
+ let mut multipart = Multipart::with_constraints(body, boundary, constraints);
+
+ let mut params = HeaderMap::new();
+ let field = loop {
+ let field = if let Some(field) = multipart.next_field().await? {
+ field
+ } else {
+ return Err(Error::BadRequest(
+ "Request did not contain a file".to_owned(),
+ ));
+ };
+ let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
+ name
+ } else {
+ continue;
+ };
+ if name == "file" {
+ break field;
+ }
+
+ if let Ok(content) = HeaderValue::from_str(&field.text().await?) {
+ match name.as_str() {
+ "tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
+ "acl" => {
+ if params.insert("x-amz-acl", content).is_some() {
+ return Err(Error::BadRequest(
+ "Field 'acl' provided more than one time".to_string(),
+ ));
+ }
+ }
+ _ => {
+ if params.insert(&name, content).is_some() {
+ return Err(Error::BadRequest(format!(
+ "Field '{}' provided more than one time",
+ name
+ )));
+ }
+ }
+ }
+ }
+ };
+
+ // Current part is file. Do some checks before handling to PutObject code
+ let key = params
+ .get("key")
+ .ok_or_bad_request("No key was provided")?
+ .to_str()?;
+ let credential = params
+ .get("x-amz-credential")
+ .ok_or_else(|| {
+ Error::Forbidden("Garage does not support anonymous access yet".to_string())
+ })?
+ .to_str()?;
+ let policy = params
+ .get("policy")
+ .ok_or_bad_request("No policy was provided")?
+ .to_str()?;
+ let signature = params
+ .get("x-amz-signature")
+ .ok_or_bad_request("No signature was provided")?
+ .to_str()?;
+ let date = params
+ .get("x-amz-date")
+ .ok_or_bad_request("No date was provided")?
+ .to_str()?;
+
+ let key = if key.contains("${filename}") {
+ // if no filename is provided, don't replace. This matches the behavior of AWS.
+ if let Some(filename) = field.file_name() {
+ key.replace("${filename}", filename)
+ } else {
+ key.to_owned()
+ }
+ } else {
+ key.to_owned()
+ };
+
+ let date = parse_date(date)?;
+ let api_key = verify_v4(
+ &garage,
+ "s3",
+ credential,
+ &date,
+ signature,
+ policy.as_bytes(),
+ )
+ .await?;
+
+ let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
+
+ if !api_key.allow_write(&bucket_id) {
+ return Err(Error::Forbidden(
+ "Operation is not allowed for this key.".to_string(),
+ ));
+ }
+
+ let decoded_policy = base64::decode(&policy)?;
+ let decoded_policy: Policy =
+ serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
+
+ let expiration: DateTime<Utc> = DateTime::parse_from_rfc3339(&decoded_policy.expiration)
+ .ok_or_bad_request("Invalid expiration date")?
+ .into();
+ if Utc::now() - expiration > Duration::zero() {
+ return Err(Error::BadRequest(
+ "Expiration date is in the paste".to_string(),
+ ));
+ }
+
+ let mut conditions = decoded_policy.into_conditions()?;
+
+ for (param_key, value) in params.iter() {
+ let mut param_key = param_key.to_string();
+ param_key.make_ascii_lowercase();
+ match param_key.as_str() {
+ "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
+ "content-type" => {
+ let conds = conditions.params.remove("content-type").ok_or_else(|| {
+ Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ })?;
+ for cond in conds {
+ let ok = match cond {
+ Operation::Equal(s) => s.as_str() == value,
+ Operation::StartsWith(s) => {
+ value.to_str()?.split(',').all(|v| v.starts_with(&s))
+ }
+ };
+ if !ok {
+ return Err(Error::BadRequest(format!(
+ "Key '{}' has value not allowed in policy",
+ param_key
+ )));
+ }
+ }
+ }
+ "key" => {
+ let conds = conditions.params.remove("key").ok_or_else(|| {
+ Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ })?;
+ for cond in conds {
+ let ok = match cond {
+ Operation::Equal(s) => s == key,
+ Operation::StartsWith(s) => key.starts_with(&s),
+ };
+ if !ok {
+ return Err(Error::BadRequest(format!(
+ "Key '{}' has value not allowed in policy",
+ param_key
+ )));
+ }
+ }
+ }
+ _ => {
+ if param_key.starts_with("x-ignore-") {
+ // if a x-ignore is provided in policy, it's not removed here, so it will be
+ // rejected as provided in policy but not in the request. As odd as it is, it's
+ // how aws seems to behave.
+ continue;
+ }
+ let conds = conditions.params.remove(&param_key).ok_or_else(|| {
+ Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
+ })?;
+ for cond in conds {
+ let ok = match cond {
+ Operation::Equal(s) => s.as_str() == value,
+ Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()),
+ };
+ if !ok {
+ return Err(Error::BadRequest(format!(
+ "Key '{}' has value not allowed in policy",
+ param_key
+ )));
+ }
+ }
+ }
+ }
+ }
+
+ if let Some((param_key, _)) = conditions.params.iter().next() {
+ return Err(Error::BadRequest(format!(
+ "Key '{}' is required in policy, but no value was provided",
+ param_key
+ )));
+ }
+
+ let headers = get_headers(&params)?;
+
+ let stream = field.map(|r| r.map_err(Into::into));
+ let (_, md5) = save_stream(
+ garage,
+ headers,
+ StreamLimiter::new(stream, conditions.content_length),
+ bucket_id,
+ &key,
+ None,
+ None,
+ )
+ .await?;
+
+ let etag = format!("\"{}\"", md5);
+
+ let resp = if let Some(mut target) = params
+ .get("success_action_redirect")
+ .and_then(|h| h.to_str().ok())
+ .and_then(|u| url::Url::parse(u).ok())
+ .filter(|u| u.scheme() == "https" || u.scheme() == "http")
+ {
+ target
+ .query_pairs_mut()
+ .append_pair("bucket", &bucket)
+ .append_pair("key", &key)
+ .append_pair("etag", &etag);
+ let target = target.to_string();
+ Response::builder()
+ .status(StatusCode::SEE_OTHER)
+ .header(header::LOCATION, target.clone())
+ .header(header::ETAG, etag)
+ .body(target.into())?
+ } else {
+ let path = head
+ .uri
+ .into_parts()
+ .path_and_query
+ .map(|paq| paq.path().to_string())
+ .unwrap_or_else(|| "/".to_string());
+ let authority = head
+ .headers
+ .get(header::HOST)
+ .and_then(|h| h.to_str().ok())
+ .unwrap_or_default();
+ let proto = if !authority.is_empty() {
+ "https://"
+ } else {
+ ""
+ };
+
+ let url_key: String = form_urlencoded::byte_serialize(key.as_bytes())
+ .flat_map(str::chars)
+ .collect();
+ let location = format!("{}{}{}{}", proto, authority, path, url_key);
+
+ let action = params
+ .get("success_action_status")
+ .and_then(|h| h.to_str().ok())
+ .unwrap_or("204");
+ let builder = Response::builder()
+ .header(header::LOCATION, location.clone())
+ .header(header::ETAG, etag.clone());
+ match action {
+ "200" => builder.status(StatusCode::OK).body(Body::empty())?,
+ "201" => {
+ let xml = s3_xml::PostObject {
+ xmlns: (),
+ location: s3_xml::Value(location),
+ bucket: s3_xml::Value(bucket),
+ key: s3_xml::Value(key),
+ etag: s3_xml::Value(etag),
+ };
+ let body = s3_xml::to_xml_with_header(&xml)?;
+ builder
+ .status(StatusCode::CREATED)
+ .body(Body::from(body.into_bytes()))?
+ }
+ _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
+ }
+ };
+
+ Ok(resp)
+}
+
+#[derive(Deserialize)]
+struct Policy {
+ expiration: String,
+ conditions: Vec<PolicyCondition>,
+}
+
+impl Policy {
+ fn into_conditions(self) -> Result<Conditions, Error> {
+ let mut params = HashMap::<_, Vec<_>>::new();
+
+ let mut length = (0, u64::MAX);
+ for condition in self.conditions {
+ match condition {
+ PolicyCondition::Equal(map) => {
+ if map.len() != 1 {
+ return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ }
+ let (mut k, v) = map.into_iter().next().expect("size was verified");
+ k.make_ascii_lowercase();
+ params.entry(k).or_default().push(Operation::Equal(v));
+ }
+ PolicyCondition::OtherOp([cond, mut key, value]) => {
+ if key.remove(0) != '$' {
+ return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ }
+ key.make_ascii_lowercase();
+ match cond.as_str() {
+ "eq" => {
+ params.entry(key).or_default().push(Operation::Equal(value));
+ }
+ "starts-with" => {
+ params
+ .entry(key)
+ .or_default()
+ .push(Operation::StartsWith(value));
+ }
+ _ => return Err(Error::BadRequest("Invalid policy item".to_owned())),
+ }
+ }
+ PolicyCondition::SizeRange(key, min, max) => {
+ if key == "content-length-range" {
+ length.0 = length.0.max(min);
+ length.1 = length.1.min(max);
+ } else {
+ return Err(Error::BadRequest("Invalid policy item".to_owned()));
+ }
+ }
+ }
+ }
+ Ok(Conditions {
+ params,
+ content_length: RangeInclusive::new(length.0, length.1),
+ })
+ }
+}
+
+/// A single condition from a policy
+#[derive(Debug, Deserialize)]
+#[serde(untagged)]
+enum PolicyCondition {
+ // will contain a single key-value pair
+ Equal(HashMap<String, String>),
+ OtherOp([String; 3]),
+ SizeRange(String, u64, u64),
+}
+
+#[derive(Debug)]
+struct Conditions {
+ params: HashMap<String, Vec<Operation>>,
+ content_length: RangeInclusive<u64>,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+enum Operation {
+ Equal(String),
+ StartsWith(String),
+}
+
+struct StreamLimiter<T> {
+ inner: T,
+ length: RangeInclusive<u64>,
+ read: u64,
+}
+
+impl<T> StreamLimiter<T> {
+ fn new(stream: T, length: RangeInclusive<u64>) -> Self {
+ StreamLimiter {
+ inner: stream,
+ length,
+ read: 0,
+ }
+ }
+}
+
+impl<T> Stream for StreamLimiter<T>
+where
+ T: Stream<Item = Result<Bytes, Error>> + Unpin,
+{
+ type Item = Result<Bytes, Error>;
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let res = std::pin::Pin::new(&mut self.inner).poll_next(ctx);
+ match &res {
+ Poll::Ready(Some(Ok(bytes))) => {
+ self.read += bytes.len() as u64;
+ // optimization to fail early when we know before the end it's too long
+ if self.length.end() < &self.read {
+ return Poll::Ready(Some(Err(Error::BadRequest(
+ "File size does not match policy".to_owned(),
+ ))));
+ }
+ }
+ Poll::Ready(None) => {
+ if !self.length.contains(&self.read) {
+ return Poll::Ready(Some(Err(Error::BadRequest(
+ "File size does not match policy".to_owned(),
+ ))));
+ }
+ }
+ _ => {}
+ }
+ res
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_policy_1() {
+ let policy_json = br#"
+{ "expiration": "2007-12-01T12:00:00.000Z",
+ "conditions": [
+ {"acl": "public-read" },
+ {"bucket": "johnsmith" },
+ ["starts-with", "$key", "user/eric/"]
+ ]
+}
+ "#;
+ let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap();
+ let mut conditions = policy_2.into_conditions().unwrap();
+
+ assert_eq!(
+ conditions.params.remove(&"acl".to_string()),
+ Some(vec![Operation::Equal("public-read".into())])
+ );
+ assert_eq!(
+ conditions.params.remove(&"bucket".to_string()),
+ Some(vec![Operation::Equal("johnsmith".into())])
+ );
+ assert_eq!(
+ conditions.params.remove(&"key".to_string()),
+ Some(vec![Operation::StartsWith("user/eric/".into())])
+ );
+ assert!(conditions.params.is_empty());
+ assert_eq!(conditions.content_length, 0..=u64::MAX);
+ }
+
+ #[test]
+ fn test_policy_2() {
+ let policy_json = br#"
+{ "expiration": "2007-12-01T12:00:00.000Z",
+ "conditions": [
+ [ "eq", "$acl", "public-read" ],
+ ["starts-with", "$Content-Type", "image/"],
+ ["starts-with", "$success_action_redirect", ""],
+ ["content-length-range", 1048576, 10485760]
+ ]
+}
+ "#;
+ let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap();
+ let mut conditions = policy_2.into_conditions().unwrap();
+
+ assert_eq!(
+ conditions.params.remove(&"acl".to_string()),
+ Some(vec![Operation::Equal("public-read".into())])
+ );
+ assert_eq!(
+ conditions.params.remove("content-type").unwrap(),
+ vec![Operation::StartsWith("image/".into())]
+ );
+ assert_eq!(
+ conditions
+ .params
+ .remove(&"success_action_redirect".to_string()),
+ Some(vec![Operation::StartsWith("".into())])
+ );
+ assert!(conditions.params.is_empty());
+ assert_eq!(conditions.content_length, 1048576..=10485760);
+ }
+}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
new file mode 100644
index 00000000..89aa8d84
--- /dev/null
+++ b/src/api/s3/put.rs
@@ -0,0 +1,753 @@
+use std::collections::{BTreeMap, BTreeSet, VecDeque};
+use std::sync::Arc;
+
+use futures::prelude::*;
+use hyper::body::{Body, Bytes};
+use hyper::header::{HeaderMap, HeaderValue};
+use hyper::{Request, Response};
+use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
+use sha2::Sha256;
+
+use garage_table::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::*;
+
+use garage_block::manager::INLINE_THRESHOLD;
+use garage_model::garage::Garage;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::error::*;
+use crate::s3::xml as s3_xml;
+use crate::signature::verify_signed_content;
+
+pub async fn handle_put(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ key: &str,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ // Retrieve interesting headers from request
+ let headers = get_headers(req.headers())?;
+ debug!("Object headers: {:?}", headers);
+
+ let content_md5 = match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ };
+
+ let (_head, body) = req.into_parts();
+ let body = body.map_err(Error::from);
+
+ save_stream(
+ garage,
+ headers,
+ body,
+ bucket_id,
+ key,
+ content_md5,
+ content_sha256,
+ )
+ .await
+ .map(|(uuid, md5)| put_response(uuid, md5))
+}
+
+pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
+ garage: Arc<Garage>,
+ headers: ObjectVersionHeaders,
+ body: S,
+ bucket_id: Uuid,
+ key: &str,
+ content_md5: Option<String>,
+ content_sha256: Option<FixedBytes32>,
+) -> Result<(Uuid, String), Error> {
+ // Generate identity of new version
+ let version_uuid = gen_uuid();
+ let version_timestamp = now_msec();
+
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let first_block = chunker.next().await?.unwrap_or_default();
+
+ // If body is small enough, store it directly in the object table
+ // as "inline data". We can then return immediately.
+ if first_block.len() < INLINE_THRESHOLD {
+ let mut md5sum = Md5::new();
+ md5sum.update(&first_block[..]);
+ let data_md5sum = md5sum.finalize();
+ let data_md5sum_hex = hex::encode(data_md5sum);
+
+ let data_sha256sum = sha256sum(&first_block[..]);
+
+ ensure_checksum_matches(
+ data_md5sum.as_slice(),
+ data_sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )?;
+
+ let object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: version_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ ObjectVersionMeta {
+ headers,
+ size: first_block.len() as u64,
+ etag: data_md5sum_hex.clone(),
+ },
+ first_block,
+ )),
+ };
+
+ let object = Object::new(bucket_id, key.into(), vec![object_version]);
+ garage.object_table.insert(&object).await?;
+
+ return Ok((version_uuid, data_md5sum_hex));
+ }
+
+ // Write version identifier in object table so that we have a trace
+ // that we are uploading something
+ let mut object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: version_timestamp,
+ state: ObjectVersionState::Uploading(headers.clone()),
+ };
+ let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
+ garage.object_table.insert(&object).await?;
+
+ // Initialize corresponding entry in version table
+ // Write this entry now, even with empty block list,
+ // to prevent block_ref entries from being deleted (they can be deleted
+ // if the reference a version that isn't found in the version table)
+ let version = Version::new(version_uuid, bucket_id, key.into(), false);
+ garage.version_table.insert(&version).await?;
+
+ // Transfer data and verify checksum
+ let first_block_hash = blake2sum(&first_block[..]);
+ let tx_result = read_and_put_blocks(
+ &garage,
+ &version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await
+ .and_then(|(total_size, data_md5sum, data_sha256sum)| {
+ ensure_checksum_matches(
+ data_md5sum.as_slice(),
+ data_sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )
+ .map(|()| (total_size, data_md5sum))
+ });
+
+ // If something went wrong, clean up
+ let (total_size, md5sum_arr) = match tx_result {
+ Ok(rv) => rv,
+ Err(e) => {
+ // Mark object as aborted, this will free the blocks further down
+ object_version.state = ObjectVersionState::Aborted;
+ let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
+ garage.object_table.insert(&object).await?;
+ return Err(e);
+ }
+ };
+
+ // Save final object state, marked as Complete
+ let md5sum_hex = hex::encode(md5sum_arr);
+ object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ ObjectVersionMeta {
+ headers,
+ size: total_size,
+ etag: md5sum_hex.clone(),
+ },
+ first_block_hash,
+ ));
+ let object = Object::new(bucket_id, key.into(), vec![object_version]);
+ garage.object_table.insert(&object).await?;
+
+ Ok((version_uuid, md5sum_hex))
+}
+
+/// Validate MD5 sum against content-md5 header
+/// and sha256sum against signed content-sha256
+fn ensure_checksum_matches(
+ data_md5sum: &[u8],
+ data_sha256sum: garage_util::data::FixedBytes32,
+ content_md5: Option<&str>,
+ content_sha256: Option<garage_util::data::FixedBytes32>,
+) -> Result<(), Error> {
+ if let Some(expected_sha256) = content_sha256 {
+ if expected_sha256 != data_sha256sum {
+ return Err(Error::BadRequest(
+ "Unable to validate x-amz-content-sha256".to_string(),
+ ));
+ } else {
+ trace!("Successfully validated x-amz-content-sha256");
+ }
+ }
+ if let Some(expected_md5) = content_md5 {
+ if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
+ return Err(Error::BadRequest(
+ "Unable to validate content-md5".to_string(),
+ ));
+ } else {
+ trace!("Successfully validated content-md5");
+ }
+ }
+ Ok(())
+}
+
+async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
+ garage: &Garage,
+ version: &Version,
+ part_number: u64,
+ first_block: Vec<u8>,
+ first_block_hash: Hash,
+ chunker: &mut StreamChunker<S>,
+) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
+ let mut md5hasher = Md5::new();
+ let mut sha256hasher = Sha256::new();
+ md5hasher.update(&first_block[..]);
+ sha256hasher.update(&first_block[..]);
+
+ let mut next_offset = first_block.len();
+ let mut put_curr_version_block = put_block_meta(
+ garage,
+ version,
+ part_number,
+ 0,
+ first_block_hash,
+ first_block.len() as u64,
+ );
+ let mut put_curr_block = garage
+ .block_manager
+ .rpc_put_block(first_block_hash, first_block);
+
+ loop {
+ let (_, _, next_block) = futures::try_join!(
+ put_curr_block.map_err(Error::from),
+ put_curr_version_block.map_err(Error::from),
+ chunker.next(),
+ )?;
+ if let Some(block) = next_block {
+ md5hasher.update(&block[..]);
+ sha256hasher.update(&block[..]);
+ let block_hash = blake2sum(&block[..]);
+ let block_len = block.len();
+ put_curr_version_block = put_block_meta(
+ garage,
+ version,
+ part_number,
+ next_offset as u64,
+ block_hash,
+ block_len as u64,
+ );
+ put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
+ next_offset += block_len;
+ } else {
+ break;
+ }
+ }
+
+ let total_size = next_offset as u64;
+ let data_md5sum = md5hasher.finalize();
+
+ let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
+
+ Ok((total_size, data_md5sum, data_sha256sum))
+}
+
+async fn put_block_meta(
+ garage: &Garage,
+ version: &Version,
+ part_number: u64,
+ offset: u64,
+ hash: Hash,
+ size: u64,
+) -> Result<(), GarageError> {
+ let mut version = version.clone();
+ version.blocks.put(
+ VersionBlockKey {
+ part_number,
+ offset,
+ },
+ VersionBlock { hash, size },
+ );
+
+ let block_ref = BlockRef {
+ block: hash,
+ version: version.uuid,
+ deleted: false.into(),
+ };
+
+ futures::try_join!(
+ garage.version_table.insert(&version),
+ garage.block_ref_table.insert(&block_ref),
+ )?;
+ Ok(())
+}
+
+struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
+ stream: S,
+ read_all: bool,
+ block_size: usize,
+ buf: VecDeque<u8>,
+}
+
+impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
+ fn new(stream: S, block_size: usize) -> Self {
+ Self {
+ stream,
+ read_all: false,
+ block_size,
+ buf: VecDeque::with_capacity(2 * block_size),
+ }
+ }
+
+ async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
+ while !self.read_all && self.buf.len() < self.block_size {
+ if let Some(block) = self.stream.next().await {
+ let bytes = block?;
+ trace!("Body next: {} bytes", bytes.len());
+ self.buf.extend(bytes);
+ } else {
+ self.read_all = true;
+ }
+ }
+
+ if self.buf.is_empty() {
+ Ok(None)
+ } else if self.buf.len() <= self.block_size {
+ let block = self.buf.drain(..).collect::<Vec<u8>>();
+ Ok(Some(block))
+ } else {
+ let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
+ Ok(Some(block))
+ }
+ }
+}
+
+pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
+ Response::builder()
+ .header("x-amz-version-id", hex::encode(version_uuid))
+ .header("ETag", format!("\"{}\"", md5sum_hex))
+ .body(Body::from(vec![]))
+ .unwrap()
+}
+
+pub async fn handle_create_multipart_upload(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_name: &str,
+ bucket_id: Uuid,
+ key: &str,
+) -> Result<Response<Body>, Error> {
+ let version_uuid = gen_uuid();
+ let headers = get_headers(req.headers())?;
+
+ // Create object in object table
+ let object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: now_msec(),
+ state: ObjectVersionState::Uploading(headers),
+ };
+ let object = Object::new(bucket_id, key.to_string(), vec![object_version]);
+ garage.object_table.insert(&object).await?;
+
+ // Insert empty version so that block_ref entries refer to something
+ // (they are inserted concurrently with blocks in the version table, so
+ // there is the possibility that they are inserted before the version table
+ // is created, in which case it is allowed to delete them, e.g. in repair_*)
+ let version = Version::new(version_uuid, bucket_id, key.into(), false);
+ garage.version_table.insert(&version).await?;
+
+ // Send success response
+ let result = s3_xml::InitiateMultipartUploadResult {
+ xmlns: (),
+ bucket: s3_xml::Value(bucket_name.to_string()),
+ key: s3_xml::Value(key.to_string()),
+ upload_id: s3_xml::Value(hex::encode(version_uuid)),
+ };
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::new(Body::from(xml.into_bytes())))
+}
+
+pub async fn handle_put_part(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ key: &str,
+ part_number: u64,
+ upload_id: &str,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ let version_uuid = decode_upload_id(upload_id)?;
+
+ let content_md5 = match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ };
+
+ // Read first chuck, and at the same time try to get object to see if it exists
+ let key = key.to_string();
+
+ let body = req.into_body().map_err(Error::from);
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
+
+ let (object, version, first_block) = futures::try_join!(
+ garage
+ .object_table
+ .get(&bucket_id, &key)
+ .map_err(Error::from),
+ garage
+ .version_table
+ .get(&version_uuid, &EmptyKey)
+ .map_err(Error::from),
+ chunker.next(),
+ )?;
+
+ // Check object is valid and multipart block can be accepted
+ let first_block = first_block.ok_or_bad_request("Empty body")?;
+ let object = object.ok_or_bad_request("Object not found")?;
+
+ if !object
+ .versions()
+ .iter()
+ .any(|v| v.uuid == version_uuid && v.is_uploading())
+ {
+ return Err(Error::NoSuchUpload);
+ }
+
+ // Check part hasn't already been uploaded
+ if let Some(v) = version {
+ if v.has_part_number(part_number) {
+ return Err(Error::BadRequest(format!(
+ "Part number {} has already been uploaded",
+ part_number
+ )));
+ }
+ }
+
+ // Copy block to store
+ let version = Version::new(version_uuid, bucket_id, key, false);
+ let first_block_hash = blake2sum(&first_block[..]);
+ let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
+ &garage,
+ &version,
+ part_number,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
+
+ // Verify that checksums map
+ ensure_checksum_matches(
+ data_md5sum.as_slice(),
+ data_sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )?;
+
+ // Store part etag in version
+ let data_md5sum_hex = hex::encode(data_md5sum);
+ let mut version = version;
+ version
+ .parts_etags
+ .put(part_number, data_md5sum_hex.clone());
+ garage.version_table.insert(&version).await?;
+
+ let response = Response::builder()
+ .header("ETag", format!("\"{}\"", data_md5sum_hex))
+ .body(Body::empty())
+ .unwrap();
+ Ok(response)
+}
+
+pub async fn handle_complete_multipart_upload(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_name: &str,
+ bucket_id: Uuid,
+ key: &str,
+ upload_id: &str,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
+ let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml)
+ .ok_or_bad_request("Invalid CompleteMultipartUpload XML")?;
+ debug!(
+ "CompleteMultipartUpload list of parts: {:?}",
+ body_list_of_parts
+ );
+
+ let version_uuid = decode_upload_id(upload_id)?;
+
+ // Get object and version
+ let key = key.to_string();
+ let (object, version) = futures::try_join!(
+ garage.object_table.get(&bucket_id, &key),
+ garage.version_table.get(&version_uuid, &EmptyKey),
+ )?;
+
+ let object = object.ok_or(Error::NoSuchKey)?;
+ let mut object_version = object
+ .versions()
+ .iter()
+ .find(|v| v.uuid == version_uuid && v.is_uploading())
+ .cloned()
+ .ok_or(Error::NoSuchUpload)?;
+
+ let version = version.ok_or(Error::NoSuchKey)?;
+ if version.blocks.is_empty() {
+ return Err(Error::BadRequest("No data was uploaded".to_string()));
+ }
+
+ let headers = match object_version.state {
+ ObjectVersionState::Uploading(headers) => headers,
+ _ => unreachable!(),
+ };
+
+ // Check that part numbers are an increasing sequence.
+ // (it doesn't need to start at 1 nor to be a continuous sequence,
+ // see discussion in #192)
+ if body_list_of_parts.is_empty() {
+ return Err(Error::EntityTooSmall);
+ }
+ if !body_list_of_parts
+ .iter()
+ .zip(body_list_of_parts.iter().skip(1))
+ .all(|(p1, p2)| p1.part_number < p2.part_number)
+ {
+ return Err(Error::InvalidPartOrder);
+ }
+
+ // Garage-specific restriction, see #204: part numbers must be
+ // consecutive starting at 1
+ if body_list_of_parts[0].part_number != 1
+ || !body_list_of_parts
+ .iter()
+ .zip(body_list_of_parts.iter().skip(1))
+ .all(|(p1, p2)| p1.part_number + 1 == p2.part_number)
+ {
+ return Err(Error::NotImplemented("Garage does not support completing a Multipart upload with non-consecutive part numbers. This is a restriction of Garage's data model, which might be fixed in a future release. See issue #204 for more information on this topic.".into()));
+ }
+
+ // Check that the list of parts they gave us corresponds to the parts we have here
+ debug!("Expected parts from request: {:?}", body_list_of_parts);
+ debug!("Parts stored in version: {:?}", version.parts_etags.items());
+ let parts = version
+ .parts_etags
+ .items()
+ .iter()
+ .map(|pair| (&pair.0, &pair.1));
+ let same_parts = body_list_of_parts
+ .iter()
+ .map(|x| (&x.part_number, &x.etag))
+ .eq(parts);
+ if !same_parts {
+ return Err(Error::InvalidPart);
+ }
+
+ // Check that all blocks belong to one of the parts
+ let block_parts = version
+ .blocks
+ .items()
+ .iter()
+ .map(|(bk, _)| bk.part_number)
+ .collect::<BTreeSet<_>>();
+ let same_parts = body_list_of_parts
+ .iter()
+ .map(|x| x.part_number)
+ .eq(block_parts.into_iter());
+ if !same_parts {
+ return Err(Error::BadRequest(
+ "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
+ ));
+ }
+
+ // Calculate etag of final object
+ // To understand how etags are calculated, read more here:
+ // https://teppen.io/2018/06/23/aws_s3_etags/
+ let num_parts = body_list_of_parts.len();
+ let mut etag_md5_hasher = Md5::new();
+ for (_, etag) in version.parts_etags.items().iter() {
+ etag_md5_hasher.update(etag.as_bytes());
+ }
+ let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
+
+ // Calculate total size of final object
+ let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
+
+ // Write final object version
+ object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ ObjectVersionMeta {
+ headers,
+ size: total_size,
+ etag: etag.clone(),
+ },
+ version.blocks.items()[0].1.hash,
+ ));
+
+ let final_object = Object::new(bucket_id, key.clone(), vec![object_version]);
+ garage.object_table.insert(&final_object).await?;
+
+ // Send response saying ok we're done
+ let result = s3_xml::CompleteMultipartUploadResult {
+ xmlns: (),
+ location: None,
+ bucket: s3_xml::Value(bucket_name.to_string()),
+ key: s3_xml::Value(key),
+ etag: s3_xml::Value(format!("\"{}\"", etag)),
+ };
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::new(Body::from(xml.into_bytes())))
+}
+
+pub async fn handle_abort_multipart_upload(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ key: &str,
+ upload_id: &str,
+) -> Result<Response<Body>, Error> {
+ let version_uuid = decode_upload_id(upload_id)?;
+
+ let object = garage
+ .object_table
+ .get(&bucket_id, &key.to_string())
+ .await?;
+ let object = object.ok_or(Error::NoSuchKey)?;
+
+ let object_version = object
+ .versions()
+ .iter()
+ .find(|v| v.uuid == version_uuid && v.is_uploading());
+ let mut object_version = match object_version {
+ None => return Err(Error::NoSuchUpload),
+ Some(x) => x.clone(),
+ };
+
+ object_version.state = ObjectVersionState::Aborted;
+ let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
+ garage.object_table.insert(&final_object).await?;
+
+ Ok(Response::new(Body::from(vec![])))
+}
+
+fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
+ Ok(headers
+ .get(hyper::header::CONTENT_TYPE)
+ .map(|x| x.to_str())
+ .unwrap_or(Ok("blob"))?
+ .to_string())
+}
+
+pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
+ let content_type = get_mime_type(headers)?;
+ let mut other = BTreeMap::new();
+
+ // Preserve standard headers
+ let standard_header = vec![
+ hyper::header::CACHE_CONTROL,
+ hyper::header::CONTENT_DISPOSITION,
+ hyper::header::CONTENT_ENCODING,
+ hyper::header::CONTENT_LANGUAGE,
+ hyper::header::EXPIRES,
+ ];
+ for h in standard_header.iter() {
+ if let Some(v) = headers.get(h) {
+ match v.to_str() {
+ Ok(v_str) => {
+ other.insert(h.to_string(), v_str.to_string());
+ }
+ Err(e) => {
+ warn!("Discarding header {}, error in .to_str(): {}", h, e);
+ }
+ }
+ }
+ }
+
+ // Preserve x-amz-meta- headers
+ for (k, v) in headers.iter() {
+ if k.as_str().starts_with("x-amz-meta-") {
+ match v.to_str() {
+ Ok(v_str) => {
+ other.insert(k.to_string(), v_str.to_string());
+ }
+ Err(e) => {
+ warn!("Discarding header {}, error in .to_str(): {}", k, e);
+ }
+ }
+ }
+ }
+
+ Ok(ObjectVersionHeaders {
+ content_type,
+ other,
+ })
+}
+
+pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
+ let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?;
+ if id_bin.len() != 32 {
+ return Err(Error::NoSuchUpload);
+ }
+ let mut uuid = [0u8; 32];
+ uuid.copy_from_slice(&id_bin[..]);
+ Ok(Uuid::from(uuid))
+}
+
+#[derive(Debug)]
+struct CompleteMultipartUploadPart {
+ etag: String,
+ part_number: u64,
+}
+
+fn parse_complete_multipart_upload_body(
+ xml: &roxmltree::Document,
+) -> Option<Vec<CompleteMultipartUploadPart>> {
+ let mut parts = vec![];
+
+ let root = xml.root();
+ let cmu = root.first_child()?;
+ if !cmu.has_tag_name("CompleteMultipartUpload") {
+ return None;
+ }
+
+ for item in cmu.children() {
+ // Only parse <Part> nodes
+ if !item.is_element() {
+ continue;
+ }
+
+ if item.has_tag_name("Part") {
+ let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?;
+ let part_number = item
+ .children()
+ .find(|e| e.has_tag_name("PartNumber"))?
+ .text()?;
+ parts.push(CompleteMultipartUploadPart {
+ etag: etag.trim_matches('"').to_string(),
+ part_number: part_number.parse().ok()?,
+ });
+ } else {
+ return None;
+ }
+ }
+
+ Some(parts)
+}
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
new file mode 100644
index 00000000..0525c649
--- /dev/null
+++ b/src/api/s3/router.rs
@@ -0,0 +1,1080 @@
+use crate::error::{Error, OkOrBadRequest};
+
+use std::borrow::Cow;
+
+use hyper::header::HeaderValue;
+use hyper::{HeaderMap, Method, Request};
+
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
+
+router_match! {@func
+
+/// List of all S3 API endpoints.
+///
+/// For each endpoint, it contains the parameters this endpoint receive by url (bucket, key and
+/// query parameters). Parameters it may receive by header are left out, however headers are
+/// considered when required to determine between one endpoint or another (for CopyObject and
+/// UploadObject, for instance).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ AbortMultipartUpload {
+ key: String,
+ upload_id: String,
+ },
+ CompleteMultipartUpload {
+ key: String,
+ upload_id: String,
+ },
+ CopyObject {
+ key: String,
+ },
+ CreateBucket {
+ },
+ CreateMultipartUpload {
+ key: String,
+ },
+ DeleteBucket {
+ },
+ DeleteBucketAnalyticsConfiguration {
+ id: String,
+ },
+ DeleteBucketCors {
+ },
+ DeleteBucketEncryption {
+ },
+ DeleteBucketIntelligentTieringConfiguration {
+ id: String,
+ },
+ DeleteBucketInventoryConfiguration {
+ id: String,
+ },
+ DeleteBucketLifecycle {
+ },
+ DeleteBucketMetricsConfiguration {
+ id: String,
+ },
+ DeleteBucketOwnershipControls {
+ },
+ DeleteBucketPolicy {
+ },
+ DeleteBucketReplication {
+ },
+ DeleteBucketTagging {
+ },
+ DeleteBucketWebsite {
+ },
+ DeleteObject {
+ key: String,
+ version_id: Option<String>,
+ },
+ DeleteObjects {
+ },
+ DeleteObjectTagging {
+ key: String,
+ version_id: Option<String>,
+ },
+ DeletePublicAccessBlock {
+ },
+ GetBucketAccelerateConfiguration {
+ },
+ GetBucketAcl {
+ },
+ GetBucketAnalyticsConfiguration {
+ id: String,
+ },
+ GetBucketCors {
+ },
+ GetBucketEncryption {
+ },
+ GetBucketIntelligentTieringConfiguration {
+ id: String,
+ },
+ GetBucketInventoryConfiguration {
+ id: String,
+ },
+ GetBucketLifecycleConfiguration {
+ },
+ GetBucketLocation {
+ },
+ GetBucketLogging {
+ },
+ GetBucketMetricsConfiguration {
+ id: String,
+ },
+ GetBucketNotificationConfiguration {
+ },
+ GetBucketOwnershipControls {
+ },
+ GetBucketPolicy {
+ },
+ GetBucketPolicyStatus {
+ },
+ GetBucketReplication {
+ },
+ GetBucketRequestPayment {
+ },
+ GetBucketTagging {
+ },
+ GetBucketVersioning {
+ },
+ GetBucketWebsite {
+ },
+ /// There are actually many more query parameters, used to add headers to the answer. They were
+ /// not added here as they are best handled in a dedicated route.
+ GetObject {
+ key: String,
+ part_number: Option<u64>,
+ version_id: Option<String>,
+ },
+ GetObjectAcl {
+ key: String,
+ version_id: Option<String>,
+ },
+ GetObjectLegalHold {
+ key: String,
+ version_id: Option<String>,
+ },
+ GetObjectLockConfiguration {
+ },
+ GetObjectRetention {
+ key: String,
+ version_id: Option<String>,
+ },
+ GetObjectTagging {
+ key: String,
+ version_id: Option<String>,
+ },
+ GetObjectTorrent {
+ key: String,
+ },
+ GetPublicAccessBlock {
+ },
+ HeadBucket {
+ },
+ HeadObject {
+ key: String,
+ part_number: Option<u64>,
+ version_id: Option<String>,
+ },
+ ListBucketAnalyticsConfigurations {
+ continuation_token: Option<String>,
+ },
+ ListBucketIntelligentTieringConfigurations {
+ continuation_token: Option<String>,
+ },
+ ListBucketInventoryConfigurations {
+ continuation_token: Option<String>,
+ },
+ ListBucketMetricsConfigurations {
+ continuation_token: Option<String>,
+ },
+ ListBuckets,
+ ListMultipartUploads {
+ delimiter: Option<char>,
+ encoding_type: Option<String>,
+ key_marker: Option<String>,
+ max_uploads: Option<usize>,
+ prefix: Option<String>,
+ upload_id_marker: Option<String>,
+ },
+ ListObjects {
+ delimiter: Option<char>,
+ encoding_type: Option<String>,
+ marker: Option<String>,
+ max_keys: Option<usize>,
+ prefix: Option<String>,
+ },
+ ListObjectsV2 {
+ // This value should always be 2. It is not checked when constructing the struct
+ list_type: String,
+ continuation_token: Option<String>,
+ delimiter: Option<char>,
+ encoding_type: Option<String>,
+ fetch_owner: Option<bool>,
+ max_keys: Option<usize>,
+ prefix: Option<String>,
+ start_after: Option<String>,
+ },
+ ListObjectVersions {
+ delimiter: Option<char>,
+ encoding_type: Option<String>,
+ key_marker: Option<String>,
+ max_keys: Option<u64>,
+ prefix: Option<String>,
+ version_id_marker: Option<String>,
+ },
+ ListParts {
+ key: String,
+ max_parts: Option<u64>,
+ part_number_marker: Option<u64>,
+ upload_id: String,
+ },
+ Options,
+ PutBucketAccelerateConfiguration {
+ },
+ PutBucketAcl {
+ },
+ PutBucketAnalyticsConfiguration {
+ id: String,
+ },
+ PutBucketCors {
+ },
+ PutBucketEncryption {
+ },
+ PutBucketIntelligentTieringConfiguration {
+ id: String,
+ },
+ PutBucketInventoryConfiguration {
+ id: String,
+ },
+ PutBucketLifecycleConfiguration {
+ },
+ PutBucketLogging {
+ },
+ PutBucketMetricsConfiguration {
+ id: String,
+ },
+ PutBucketNotificationConfiguration {
+ },
+ PutBucketOwnershipControls {
+ },
+ PutBucketPolicy {
+ },
+ PutBucketReplication {
+ },
+ PutBucketRequestPayment {
+ },
+ PutBucketTagging {
+ },
+ PutBucketVersioning {
+ },
+ PutBucketWebsite {
+ },
+ PutObject {
+ key: String,
+ },
+ PutObjectAcl {
+ key: String,
+ version_id: Option<String>,
+ },
+ PutObjectLegalHold {
+ key: String,
+ version_id: Option<String>,
+ },
+ PutObjectLockConfiguration {
+ },
+ PutObjectRetention {
+ key: String,
+ version_id: Option<String>,
+ },
+ PutObjectTagging {
+ key: String,
+ version_id: Option<String>,
+ },
+ PutPublicAccessBlock {
+ },
+ RestoreObject {
+ key: String,
+ version_id: Option<String>,
+ },
+ SelectObjectContent {
+ key: String,
+ // This value should always be 2. It is not checked when constructing the struct
+ select_type: String,
+ },
+ UploadPart {
+ key: String,
+ part_number: u64,
+ upload_id: String,
+ },
+ UploadPartCopy {
+ key: String,
+ part_number: u64,
+ upload_id: String,
+ },
+ // This endpoint is not documented with others because it has special use case :
+ // It's intended to be used with HTML forms, using a multipart/form-data body.
+ // It works a lot like presigned requests, but everything is in the form instead
+ // of being query parameters of the URL, so authenticating it is a bit different.
+ PostObject,
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(
+ req: &Request<T>,
+ bucket: Option<String>,
+ ) -> Result<(Self, Option<String>), Error> {
+ let uri = req.uri();
+ let path = uri.path().trim_start_matches('/');
+ let query = uri.query();
+ if bucket.is_none() && path.is_empty() {
+ if *req.method() == Method::OPTIONS {
+ return Ok((Self::Options, None));
+ } else {
+ return Ok((Self::ListBuckets, None));
+ }
+ }
+
+ let (bucket, key) = if let Some(bucket) = bucket {
+ (bucket, path)
+ } else {
+ path.split_once('/')
+ .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
+ .unwrap_or((path.to_owned(), ""))
+ };
+
+ if *req.method() == Method::OPTIONS {
+ return Ok((Self::Options, Some(bucket)));
+ }
+
+ let key = percent_encoding::percent_decode_str(key)
+ .decode_utf8()?
+ .into_owned();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let res = match *req.method() {
+ Method::GET => Self::from_get(key, &mut query)?,
+ Method::HEAD => Self::from_head(key, &mut query)?,
+ Method::POST => Self::from_post(key, &mut query)?,
+ Method::PUT => Self::from_put(key, &mut query, req.headers())?,
+ Method::DELETE => Self::from_delete(key, &mut query)?,
+ _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ };
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+ Ok((res, Some(bucket)))
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a GET.
+ fn from_get(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
+ key: [
+ EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker),
+ EMPTY => GetObject (query_opt::version_id, opt_parse::part_number),
+ ACL => GetObjectAcl (query_opt::version_id),
+ LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id),
+ RETENTION => GetObjectRetention (query_opt::version_id),
+ TAGGING => GetObjectTagging (query_opt::version_id),
+ TORRENT => GetObjectTorrent,
+ ],
+ no_key: [
+ EMPTY if list_type => ListObjectsV2 (query::list_type, query_opt::continuation_token,
+ opt_parse::delimiter, query_opt::encoding_type,
+ opt_parse::fetch_owner, opt_parse::max_keys,
+ query_opt::prefix, query_opt::start_after),
+ EMPTY => ListObjects (opt_parse::delimiter, query_opt::encoding_type, query_opt::marker,
+ opt_parse::max_keys, opt_parse::prefix),
+ ACCELERATE => GetBucketAccelerateConfiguration,
+ ACL => GetBucketAcl,
+ ANALYTICS if id => GetBucketAnalyticsConfiguration (query::id),
+ ANALYTICS => ListBucketAnalyticsConfigurations (query_opt::continuation_token),
+ CORS => GetBucketCors,
+ ENCRYPTION => GetBucketEncryption,
+ INTELLIGENT_TIERING if id => GetBucketIntelligentTieringConfiguration (query::id),
+ INTELLIGENT_TIERING => ListBucketIntelligentTieringConfigurations (query_opt::continuation_token),
+ INVENTORY if id => GetBucketInventoryConfiguration (query::id),
+ INVENTORY => ListBucketInventoryConfigurations (query_opt::continuation_token),
+ LIFECYCLE => GetBucketLifecycleConfiguration,
+ LOCATION => GetBucketLocation,
+ LOGGING => GetBucketLogging,
+ METRICS if id => GetBucketMetricsConfiguration (query::id),
+ METRICS => ListBucketMetricsConfigurations (query_opt::continuation_token),
+ NOTIFICATION => GetBucketNotificationConfiguration,
+ OBJECT_LOCK => GetObjectLockConfiguration,
+ OWNERSHIP_CONTROLS => GetBucketOwnershipControls,
+ POLICY => GetBucketPolicy,
+ POLICY_STATUS => GetBucketPolicyStatus,
+ PUBLIC_ACCESS_BLOCK => GetPublicAccessBlock,
+ REPLICATION => GetBucketReplication,
+ REQUEST_PAYMENT => GetBucketRequestPayment,
+ TAGGING => GetBucketTagging,
+ UPLOADS => ListMultipartUploads (opt_parse::delimiter, query_opt::encoding_type,
+ query_opt::key_marker, opt_parse::max_uploads,
+ query_opt::prefix, query_opt::upload_id_marker),
+ VERSIONING => GetBucketVersioning,
+ VERSIONS => ListObjectVersions (opt_parse::delimiter, query_opt::encoding_type,
+ query_opt::key_marker, opt_parse::max_keys,
+ query_opt::prefix, query_opt::version_id_marker),
+ WEBSITE => GetBucketWebsite,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a HEAD.
+ fn from_head(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
+ key: [
+ EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id),
+ ],
+ no_key: [
+ EMPTY => HeadBucket,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a POST.
+ fn from_post(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
+ key: [
+ EMPTY if upload_id => CompleteMultipartUpload (query::upload_id),
+ RESTORE => RestoreObject (query_opt::version_id),
+ SELECT => SelectObjectContent (query::select_type),
+ UPLOADS => CreateMultipartUpload,
+ ],
+ no_key: [
+ EMPTY => PostObject,
+ DELETE => DeleteObjects,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a PUT.
+ fn from_put(
+ key: String,
+ query: &mut QueryParameters<'_>,
+ headers: &HeaderMap<HeaderValue>,
+ ) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), key, query, headers),
+ key: [
+ EMPTY if part_number header "x-amz-copy-source" => UploadPartCopy (parse::part_number, query::upload_id),
+ EMPTY header "x-amz-copy-source" => CopyObject,
+ EMPTY if part_number => UploadPart (parse::part_number, query::upload_id),
+ EMPTY => PutObject,
+ ACL => PutObjectAcl (query_opt::version_id),
+ LEGAL_HOLD => PutObjectLegalHold (query_opt::version_id),
+ RETENTION => PutObjectRetention (query_opt::version_id),
+ TAGGING => PutObjectTagging (query_opt::version_id),
+
+ ],
+ no_key: [
+ EMPTY => CreateBucket,
+ ACCELERATE => PutBucketAccelerateConfiguration,
+ ACL => PutBucketAcl,
+ ANALYTICS => PutBucketAnalyticsConfiguration (query::id),
+ CORS => PutBucketCors,
+ ENCRYPTION => PutBucketEncryption,
+ INTELLIGENT_TIERING => PutBucketIntelligentTieringConfiguration(query::id),
+ INVENTORY => PutBucketInventoryConfiguration(query::id),
+ LIFECYCLE => PutBucketLifecycleConfiguration,
+ LOGGING => PutBucketLogging,
+ METRICS => PutBucketMetricsConfiguration(query::id),
+ NOTIFICATION => PutBucketNotificationConfiguration,
+ OBJECT_LOCK => PutObjectLockConfiguration,
+ OWNERSHIP_CONTROLS => PutBucketOwnershipControls,
+ POLICY => PutBucketPolicy,
+ PUBLIC_ACCESS_BLOCK => PutPublicAccessBlock,
+ REPLICATION => PutBucketReplication,
+ REQUEST_PAYMENT => PutBucketRequestPayment,
+ TAGGING => PutBucketTagging,
+ VERSIONING => PutBucketVersioning,
+ WEBSITE => PutBucketWebsite,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a DELETE.
+ fn from_delete(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), key, query, None),
+ key: [
+ EMPTY if upload_id => AbortMultipartUpload (query::upload_id),
+ EMPTY => DeleteObject (query_opt::version_id),
+ TAGGING => DeleteObjectTagging (query_opt::version_id),
+ ],
+ no_key: [
+ EMPTY => DeleteBucket,
+ ANALYTICS => DeleteBucketAnalyticsConfiguration (query::id),
+ CORS => DeleteBucketCors,
+ ENCRYPTION => DeleteBucketEncryption,
+ INTELLIGENT_TIERING => DeleteBucketIntelligentTieringConfiguration (query::id),
+ INVENTORY => DeleteBucketInventoryConfiguration (query::id),
+ LIFECYCLE => DeleteBucketLifecycle,
+ METRICS => DeleteBucketMetricsConfiguration (query::id),
+ OWNERSHIP_CONTROLS => DeleteBucketOwnershipControls,
+ POLICY => DeleteBucketPolicy,
+ PUBLIC_ACCESS_BLOCK => DeletePublicAccessBlock,
+ REPLICATION => DeleteBucketReplication,
+ TAGGING => DeleteBucketTagging,
+ WEBSITE => DeleteBucketWebsite,
+ ]
+ }
+ }
+
+ /// Get the key the request target. Returns None for requests which don't use a key.
+ #[allow(dead_code)]
+ pub fn get_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ key,
+ [
+ AbortMultipartUpload,
+ CompleteMultipartUpload,
+ CopyObject,
+ CreateMultipartUpload,
+ DeleteObject,
+ DeleteObjectTagging,
+ GetObject,
+ GetObjectAcl,
+ GetObjectLegalHold,
+ GetObjectRetention,
+ GetObjectTagging,
+ GetObjectTorrent,
+ HeadObject,
+ ListParts,
+ PutObject,
+ PutObjectAcl,
+ PutObjectLegalHold,
+ PutObjectRetention,
+ PutObjectTagging,
+ RestoreObject,
+ SelectObjectContent,
+ UploadPart,
+ UploadPartCopy,
+ ]
+ }
+ }
+
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ if let Endpoint::ListBuckets = self {
+ return Authorization::None;
+ };
+ let readonly = router_match! {
+ @match
+ self,
+ [
+ GetBucketAccelerateConfiguration,
+ GetBucketAcl,
+ GetBucketAnalyticsConfiguration,
+ GetBucketEncryption,
+ GetBucketIntelligentTieringConfiguration,
+ GetBucketInventoryConfiguration,
+ GetBucketLifecycleConfiguration,
+ GetBucketLocation,
+ GetBucketLogging,
+ GetBucketMetricsConfiguration,
+ GetBucketNotificationConfiguration,
+ GetBucketOwnershipControls,
+ GetBucketPolicy,
+ GetBucketPolicyStatus,
+ GetBucketReplication,
+ GetBucketRequestPayment,
+ GetBucketTagging,
+ GetBucketVersioning,
+ GetObject,
+ GetObjectAcl,
+ GetObjectLegalHold,
+ GetObjectLockConfiguration,
+ GetObjectRetention,
+ GetObjectTagging,
+ GetObjectTorrent,
+ GetPublicAccessBlock,
+ HeadBucket,
+ HeadObject,
+ ListBucketAnalyticsConfigurations,
+ ListBucketIntelligentTieringConfigurations,
+ ListBucketInventoryConfigurations,
+ ListBucketMetricsConfigurations,
+ ListMultipartUploads,
+ ListObjects,
+ ListObjectsV2,
+ ListObjectVersions,
+ ListParts,
+ SelectObjectContent,
+ ]
+ };
+ let owner = router_match! {
+ @match
+ self,
+ [
+ DeleteBucket,
+ GetBucketWebsite,
+ PutBucketWebsite,
+ DeleteBucketWebsite,
+ GetBucketCors,
+ PutBucketCors,
+ DeleteBucketCors,
+ ]
+ };
+ if readonly {
+ Authorization::Read
+ } else if owner {
+ Authorization::Owner
+ } else {
+ Authorization::Write
+ }
+ }
+}
+
+// parameter name => struct field
+generateQueryParameters! {
+ "continuation-token" => continuation_token,
+ "delimiter" => delimiter,
+ "encoding-type" => encoding_type,
+ "fetch-owner" => fetch_owner,
+ "id" => id,
+ "key-marker" => key_marker,
+ "list-type" => list_type,
+ "marker" => marker,
+ "max-keys" => max_keys,
+ "max-parts" => max_parts,
+ "max-uploads" => max_uploads,
+ "partNumber" => part_number,
+ "part-number-marker" => part_number_marker,
+ "prefix" => prefix,
+ "select-type" => select_type,
+ "start-after" => start_after,
+ "uploadId" => upload_id,
+ "upload-id-marker" => upload_id_marker,
+ "versionId" => version_id,
+ "version-id-marker" => version_id_marker
+}
+
+mod keywords {
+ //! This module contain all query parameters with no associated value S3 uses to differentiate
+ //! endpoints.
+ pub const EMPTY: &str = "";
+
+ pub const ACCELERATE: &str = "accelerate";
+ pub const ACL: &str = "acl";
+ pub const ANALYTICS: &str = "analytics";
+ pub const CORS: &str = "cors";
+ pub const DELETE: &str = "delete";
+ pub const ENCRYPTION: &str = "encryption";
+ pub const INTELLIGENT_TIERING: &str = "intelligent-tiering";
+ pub const INVENTORY: &str = "inventory";
+ pub const LEGAL_HOLD: &str = "legal-hold";
+ pub const LIFECYCLE: &str = "lifecycle";
+ pub const LOCATION: &str = "location";
+ pub const LOGGING: &str = "logging";
+ pub const METRICS: &str = "metrics";
+ pub const NOTIFICATION: &str = "notification";
+ pub const OBJECT_LOCK: &str = "object-lock";
+ pub const OWNERSHIP_CONTROLS: &str = "ownershipControls";
+ pub const POLICY: &str = "policy";
+ pub const POLICY_STATUS: &str = "policyStatus";
+ pub const PUBLIC_ACCESS_BLOCK: &str = "publicAccessBlock";
+ pub const REPLICATION: &str = "replication";
+ pub const REQUEST_PAYMENT: &str = "requestPayment";
+ pub const RESTORE: &str = "restore";
+ pub const RETENTION: &str = "retention";
+ pub const SELECT: &str = "select";
+ pub const TAGGING: &str = "tagging";
+ pub const TORRENT: &str = "torrent";
+ pub const UPLOADS: &str = "uploads";
+ pub const VERSIONING: &str = "versioning";
+ pub const VERSIONS: &str = "versions";
+ pub const WEBSITE: &str = "website";
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn parse(
+ method: &str,
+ uri: &str,
+ bucket: Option<String>,
+ header: Option<(&str, &str)>,
+ ) -> (Endpoint, Option<String>) {
+ let mut req = Request::builder().method(method).uri(uri);
+ if let Some((k, v)) = header {
+ req = req.header(k, v)
+ }
+ let req = req.body(()).unwrap();
+
+ Endpoint::from_request(&req, bucket).unwrap()
+ }
+
+ macro_rules! test_cases {
+ ($($method:ident $uri:expr => $variant:ident )*) => {{
+ $(
+ assert!(
+ matches!(
+ parse(test_cases!{@actual_method $method}, $uri, Some("my_bucket".to_owned()), None).0,
+ Endpoint::$variant { .. }
+ )
+ );
+ assert!(
+ matches!(
+ parse(test_cases!{@actual_method $method}, concat!("/my_bucket", $uri), None, None).0,
+ Endpoint::$variant { .. }
+ )
+ );
+
+ test_cases!{@auth $method $uri}
+ )*
+ }};
+
+ (@actual_method HEAD) => {{ "HEAD" }};
+ (@actual_method GET) => {{ "GET" }};
+ (@actual_method OWNER_GET) => {{ "GET" }};
+ (@actual_method PUT) => {{ "PUT" }};
+ (@actual_method OWNER_PUT) => {{ "PUT" }};
+ (@actual_method POST) => {{ "POST" }};
+ (@actual_method DELETE) => {{ "DELETE" }};
+ (@actual_method OWNER_DELETE) => {{ "DELETE" }};
+
+ (@auth HEAD $uri:expr) => {{
+ assert_eq!(parse("HEAD", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Read)
+ }};
+ (@auth GET $uri:expr) => {{
+ assert_eq!(parse("GET", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Read)
+ }};
+ (@auth OWNER_GET $uri:expr) => {{
+ assert_eq!(parse("GET", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Owner)
+ }};
+ (@auth PUT $uri:expr) => {{
+ assert_eq!(parse("PUT", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Write)
+ }};
+ (@auth OWNER_PUT $uri:expr) => {{
+ assert_eq!(parse("PUT", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Owner)
+ }};
+ (@auth POST $uri:expr) => {{
+ assert_eq!(parse("POST", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Write)
+ }};
+ (@auth DELETE $uri:expr) => {{
+ assert_eq!(parse("DELETE", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Write)
+ }};
+ (@auth OWNER_DELETE $uri:expr) => {{
+ assert_eq!(parse("DELETE", concat!("/my_bucket", $uri), None, None).0.authorization_type(),
+ Authorization::Owner)
+ }};
+ }
+
+ #[test]
+ fn test_bucket_extraction() {
+ assert_eq!(
+ parse("GET", "/my/key", Some("my_bucket".to_owned()), None).1,
+ parse("GET", "/my_bucket/my/key", None, None).1
+ );
+ assert_eq!(
+ parse("GET", "/my_bucket/my/key", None, None).1.unwrap(),
+ "my_bucket"
+ );
+ assert!(parse("GET", "/", None, None).1.is_none());
+ }
+
+ #[test]
+ fn test_key() {
+ assert_eq!(
+ parse("GET", "/my/key", Some("my_bucket".to_owned()), None)
+ .0
+ .get_key(),
+ parse("GET", "/my_bucket/my/key", None, None).0.get_key()
+ );
+ assert_eq!(
+ parse("GET", "/my_bucket/my/key", None, None)
+ .0
+ .get_key()
+ .unwrap(),
+ "my/key"
+ );
+ assert_eq!(
+ parse("GET", "/my_bucket/my/key?acl", None, None)
+ .0
+ .get_key()
+ .unwrap(),
+ "my/key"
+ );
+ assert!(parse("GET", "/my_bucket/?list-type=2", None, None)
+ .0
+ .get_key()
+ .is_none());
+
+ assert_eq!(
+ parse("GET", "/my_bucket/%26%2B%3F%25%C3%A9/something", None, None)
+ .0
+ .get_key()
+ .unwrap(),
+ "&+?%é/something"
+ );
+
+ /*
+ * this case is failing. We should verify how clients encode space in url
+ assert_eq!(
+ parse("GET", "/my_bucket/+", None, None).get_key().unwrap(),
+ " ");
+ */
+ }
+
+ #[test]
+ fn invalid_endpoint() {
+ let req = Request::builder()
+ .method("GET")
+ .uri("/bucket/key?website")
+ .body(())
+ .unwrap();
+
+ assert!(Endpoint::from_request(&req, None).is_err())
+ }
+
+ #[test]
+ fn test_aws_doc_examples() {
+ test_cases!(
+ DELETE "/example-object?uploadId=VXBsb2FkIElEIGZvciBlbHZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZ" => AbortMultipartUpload
+ DELETE "/Key+?uploadId=UploadId" => AbortMultipartUpload
+ POST "/example-object?uploadId=AAAsb2FkIElEIGZvciBlbHZpbmcncyWeeS1tb3ZpZS5tMnRzIRRwbG9hZA" => CompleteMultipartUpload
+ POST "/Key+?uploadId=UploadId" => CompleteMultipartUpload
+ PUT "/" => CreateBucket
+ POST "/example-object?uploads" => CreateMultipartUpload
+ POST "/{Key+}?uploads" => CreateMultipartUpload
+ OWNER_DELETE "/" => DeleteBucket
+ DELETE "/?analytics&id=list1" => DeleteBucketAnalyticsConfiguration
+ DELETE "/?analytics&id=Id" => DeleteBucketAnalyticsConfiguration
+ OWNER_DELETE "/?cors" => DeleteBucketCors
+ DELETE "/?encryption" => DeleteBucketEncryption
+ DELETE "/?intelligent-tiering&id=Id" => DeleteBucketIntelligentTieringConfiguration
+ DELETE "/?inventory&id=list1" => DeleteBucketInventoryConfiguration
+ DELETE "/?inventory&id=Id" => DeleteBucketInventoryConfiguration
+ DELETE "/?lifecycle" => DeleteBucketLifecycle
+ DELETE "/?metrics&id=ExampleMetrics" => DeleteBucketMetricsConfiguration
+ DELETE "/?metrics&id=Id" => DeleteBucketMetricsConfiguration
+ DELETE "/?ownershipControls" => DeleteBucketOwnershipControls
+ DELETE "/?policy" => DeleteBucketPolicy
+ DELETE "/?replication" => DeleteBucketReplication
+ DELETE "/?tagging" => DeleteBucketTagging
+ OWNER_DELETE "/?website" => DeleteBucketWebsite
+ DELETE "/my-second-image.jpg" => DeleteObject
+ DELETE "/my-third-image.jpg?versionId=UIORUnfndfiufdisojhr398493jfdkjFJjkndnqUifhnw89493jJFJ" => DeleteObject
+ DELETE "/Key+?versionId=VersionId" => DeleteObject
+ POST "/?delete" => DeleteObjects
+ DELETE "/exampleobject?tagging" => DeleteObjectTagging
+ DELETE "/{Key+}?tagging&versionId=VersionId" => DeleteObjectTagging
+ DELETE "/?publicAccessBlock" => DeletePublicAccessBlock
+ GET "/?accelerate" => GetBucketAccelerateConfiguration
+ GET "/?acl" => GetBucketAcl
+ GET "/?analytics&id=Id" => GetBucketAnalyticsConfiguration
+ OWNER_GET "/?cors" => GetBucketCors
+ GET "/?encryption" => GetBucketEncryption
+ GET "/?intelligent-tiering&id=Id" => GetBucketIntelligentTieringConfiguration
+ GET "/?inventory&id=list1" => GetBucketInventoryConfiguration
+ GET "/?inventory&id=Id" => GetBucketInventoryConfiguration
+ GET "/?lifecycle" => GetBucketLifecycleConfiguration
+ GET "/?location" => GetBucketLocation
+ GET "/?logging" => GetBucketLogging
+ GET "/?metrics&id=Documents" => GetBucketMetricsConfiguration
+ GET "/?metrics&id=Id" => GetBucketMetricsConfiguration
+ GET "/?notification" => GetBucketNotificationConfiguration
+ GET "/?ownershipControls" => GetBucketOwnershipControls
+ GET "/?policy" => GetBucketPolicy
+ GET "/?policyStatus" => GetBucketPolicyStatus
+ GET "/?replication" => GetBucketReplication
+ GET "/?requestPayment" => GetBucketRequestPayment
+ GET "/?tagging" => GetBucketTagging
+ GET "/?versioning" => GetBucketVersioning
+ OWNER_GET "/?website" => GetBucketWebsite
+ GET "/my-image.jpg" => GetObject
+ GET "/myObject?versionId=3/L4kqtJlcpXroDTDmpUMLUo" => GetObject
+ GET "/Junk3.txt?response-cache-control=No-cache&response-content-disposition=attachment%3B%20filename%3Dtesting.txt&response-content-encoding=x-gzip&response-content-language=mi%2C%20en&response-expires=Thu%2C%2001%20Dec%201994%2016:00:00%20GMT" => GetObject
+ GET "/Key+?partNumber=1&response-cache-control=ResponseCacheControl&response-content-disposition=ResponseContentDisposition&response-content-encoding=ResponseContentEncoding&response-content-language=ResponseContentLanguage&response-content-type=ResponseContentType&response-expires=ResponseExpires&versionId=VersionId" => GetObject
+ GET "/my-image.jpg?acl" => GetObjectAcl
+ GET "/my-image.jpg?versionId=3/L4kqtJlcpXroDVBH40Nr8X8gdRQBpUMLUo&acl" => GetObjectAcl
+ GET "/{Key+}?acl&versionId=VersionId" => GetObjectAcl
+ GET "/{Key+}?legal-hold&versionId=VersionId" => GetObjectLegalHold
+ GET "/?object-lock" => GetObjectLockConfiguration
+ GET "/{Key+}?retention&versionId=VersionId" => GetObjectRetention
+ GET "/example-object?tagging" => GetObjectTagging
+ GET "/{Key+}?tagging&versionId=VersionId" => GetObjectTagging
+ GET "/quotes/Nelson?torrent" => GetObjectTorrent
+ GET "/{Key+}?torrent" => GetObjectTorrent
+ GET "/?publicAccessBlock" => GetPublicAccessBlock
+ HEAD "/" => HeadBucket
+ HEAD "/my-image.jpg" => HeadObject
+ HEAD "/my-image.jpg?versionId=3HL4kqCxf3vjVBH40Nrjfkd" => HeadObject
+ HEAD "/Key+?partNumber=3&versionId=VersionId" => HeadObject
+ GET "/?analytics" => ListBucketAnalyticsConfigurations
+ GET "/?analytics&continuation-token=ContinuationToken" => ListBucketAnalyticsConfigurations
+ GET "/?intelligent-tiering" => ListBucketIntelligentTieringConfigurations
+ GET "/?intelligent-tiering&continuation-token=ContinuationToken" => ListBucketIntelligentTieringConfigurations
+ GET "/?inventory" => ListBucketInventoryConfigurations
+ GET "/?inventory&continuation-token=ContinuationToken" => ListBucketInventoryConfigurations
+ GET "/?metrics" => ListBucketMetricsConfigurations
+ GET "/?metrics&continuation-token=ContinuationToken" => ListBucketMetricsConfigurations
+ GET "/?uploads&max-uploads=3" => ListMultipartUploads
+ GET "/?uploads&delimiter=/" => ListMultipartUploads
+ GET "/?uploads&delimiter=/&prefix=photos/2006/" => ListMultipartUploads
+ GET "/?uploads&delimiter=D&encoding-type=EncodingType&key-marker=KeyMarker&max-uploads=1&prefix=Prefix&upload-id-marker=UploadIdMarker" => ListMultipartUploads
+ GET "/" => ListObjects
+ GET "/?prefix=N&marker=Ned&max-keys=40" => ListObjects
+ GET "/?delimiter=/" => ListObjects
+ GET "/?prefix=photos/2006/&delimiter=/" => ListObjects
+
+ GET "/?delimiter=D&encoding-type=EncodingType&marker=Marker&max-keys=1&prefix=Prefix" => ListObjects
+ GET "/?list-type=2" => ListObjectsV2
+ GET "/?list-type=2&max-keys=3&prefix=E&start-after=ExampleGuide.pdf" => ListObjectsV2
+ GET "/?list-type=2&delimiter=/" => ListObjectsV2
+ GET "/?list-type=2&prefix=photos/2006/&delimiter=/" => ListObjectsV2
+ GET "/?list-type=2" => ListObjectsV2
+ GET "/?list-type=2&continuation-token=1ueGcxLPRx1Tr/XYExHnhbYLgveDs2J/wm36Hy4vbOwM=" => ListObjectsV2
+ GET "/?list-type=2&continuation-token=ContinuationToken&delimiter=D&encoding-type=EncodingType&fetch-owner=true&max-keys=1&prefix=Prefix&start-after=StartAfter" => ListObjectsV2
+ GET "/?versions" => ListObjectVersions
+ GET "/?versions&key-marker=key2" => ListObjectVersions
+ GET "/?versions&key-marker=key3&version-id-marker=t46ZenlYTZBnj" => ListObjectVersions
+ GET "/?versions&key-marker=key3&version-id-marker=t46Z0menlYTZBnj&max-keys=3" => ListObjectVersions
+ GET "/?versions&delimiter=/" => ListObjectVersions
+ GET "/?versions&prefix=photos/2006/&delimiter=/" => ListObjectVersions
+ GET "/?versions&delimiter=D&encoding-type=EncodingType&key-marker=KeyMarker&max-keys=2&prefix=Prefix&version-id-marker=VersionIdMarker" => ListObjectVersions
+ GET "/example-object?uploadId=XXBsb2FkIElEIGZvciBlbHZpbmcncyVcdS1tb3ZpZS5tMnRzEEEwbG9hZA&max-parts=2&part-number-marker=1" => ListParts
+ GET "/Key+?max-parts=2&part-number-marker=2&uploadId=UploadId" => ListParts
+ PUT "/?accelerate" => PutBucketAccelerateConfiguration
+ PUT "/?acl" => PutBucketAcl
+ PUT "/?analytics&id=report1" => PutBucketAnalyticsConfiguration
+ PUT "/?analytics&id=Id" => PutBucketAnalyticsConfiguration
+ OWNER_PUT "/?cors" => PutBucketCors
+ PUT "/?encryption" => PutBucketEncryption
+ PUT "/?intelligent-tiering&id=Id" => PutBucketIntelligentTieringConfiguration
+ PUT "/?inventory&id=report1" => PutBucketInventoryConfiguration
+ PUT "/?inventory&id=Id" => PutBucketInventoryConfiguration
+ PUT "/?lifecycle" => PutBucketLifecycleConfiguration
+ PUT "/?logging" => PutBucketLogging
+ PUT "/?metrics&id=EntireBucket" => PutBucketMetricsConfiguration
+ PUT "/?metrics&id=Id" => PutBucketMetricsConfiguration
+ PUT "/?notification" => PutBucketNotificationConfiguration
+ PUT "/?ownershipControls" => PutBucketOwnershipControls
+ PUT "/?policy" => PutBucketPolicy
+ PUT "/?replication" => PutBucketReplication
+ PUT "/?requestPayment" => PutBucketRequestPayment
+ PUT "/?tagging" => PutBucketTagging
+ PUT "/?versioning" => PutBucketVersioning
+ OWNER_PUT "/?website" => PutBucketWebsite
+ PUT "/my-image.jpg" => PutObject
+ PUT "/Key+" => PutObject
+ PUT "/my-image.jpg?acl" => PutObjectAcl
+ PUT "/my-image.jpg?acl&versionId=3HL4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nrjfkd" => PutObjectAcl
+ PUT "/{Key+}?acl&versionId=VersionId" => PutObjectAcl
+ PUT "/{Key+}?legal-hold&versionId=VersionId" => PutObjectLegalHold
+ PUT "/?object-lock" => PutObjectLockConfiguration
+ PUT "/{Key+}?retention&versionId=VersionId" => PutObjectRetention
+ PUT "/object-key?tagging" => PutObjectTagging
+ PUT "/{Key+}?tagging&versionId=VersionId" => PutObjectTagging
+ PUT "/?publicAccessBlock" => PutPublicAccessBlock
+ POST "/object-one.csv?restore" => RestoreObject
+ POST "/{Key+}?restore&versionId=VersionId" => RestoreObject
+ PUT "/my-movie.m2ts?partNumber=1&uploadId=VCVsb2FkIElEIGZvciBlbZZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZR" => UploadPart
+ PUT "/Key+?partNumber=2&uploadId=UploadId" => UploadPart
+ POST "/" => PostObject
+ );
+ // no bucket, won't work with the rest of the test suite
+ assert!(matches!(
+ parse("GET", "/", None, None).0,
+ Endpoint::ListBuckets { .. }
+ ));
+ assert!(matches!(
+ parse("GET", "/", None, None).0.authorization_type(),
+ Authorization::None
+ ));
+
+ // require a header
+ assert!(matches!(
+ parse(
+ "PUT",
+ "/Key+",
+ Some("my_bucket".to_owned()),
+ Some(("x-amz-copy-source", "some/key"))
+ )
+ .0,
+ Endpoint::CopyObject { .. }
+ ));
+ assert!(matches!(
+ parse(
+ "PUT",
+ "/my_bucket/Key+",
+ None,
+ Some(("x-amz-copy-source", "some/key"))
+ )
+ .0,
+ Endpoint::CopyObject { .. }
+ ));
+ assert!(matches!(
+ parse(
+ "PUT",
+ "/my_bucket/Key+",
+ None,
+ Some(("x-amz-copy-source", "some/key"))
+ )
+ .0
+ .authorization_type(),
+ Authorization::Write
+ ));
+
+ // require a header
+ assert!(matches!(
+ parse(
+ "PUT",
+ "/Key+?partNumber=2&uploadId=UploadId",
+ Some("my_bucket".to_owned()),
+ Some(("x-amz-copy-source", "some/key"))
+ )
+ .0,
+ Endpoint::UploadPartCopy { .. }
+ ));
+ assert!(matches!(
+ parse(
+ "PUT",
+ "/my_bucket/Key+?partNumber=2&uploadId=UploadId",
+ None,
+ Some(("x-amz-copy-source", "some/key"))
+ )
+ .0,
+ Endpoint::UploadPartCopy { .. }
+ ));
+ assert!(matches!(
+ parse(
+ "PUT",
+ "/my_bucket/Key+?partNumber=2&uploadId=UploadId",
+ None,
+ Some(("x-amz-copy-source", "some/key"))
+ )
+ .0
+ .authorization_type(),
+ Authorization::Write
+ ));
+
+ // POST request, but with GET semantic for permissions purpose
+ assert!(matches!(
+ parse(
+ "POST",
+ "/{Key+}?select&select-type=2",
+ Some("my_bucket".to_owned()),
+ None
+ )
+ .0,
+ Endpoint::SelectObjectContent { .. }
+ ));
+ assert!(matches!(
+ parse("POST", "/my_bucket/{Key+}?select&select-type=2", None, None).0,
+ Endpoint::SelectObjectContent { .. }
+ ));
+ assert!(matches!(
+ parse("POST", "/my_bucket/{Key+}?select&select-type=2", None, None)
+ .0
+ .authorization_type(),
+ Authorization::Read
+ ));
+ }
+}
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
new file mode 100644
index 00000000..561130dc
--- /dev/null
+++ b/src/api/s3/website.rs
@@ -0,0 +1,369 @@
+use quick_xml::de::from_reader;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use crate::error::*;
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::signature::verify_signed_content;
+
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_table::*;
+use garage_util::data::*;
+
+pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> {
+ let param = bucket
+ .params()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ if let Some(website) = param.website_config.get() {
+ let wc = WebsiteConfiguration {
+ xmlns: (),
+ error_document: website.error_document.as_ref().map(|v| Key {
+ key: Value(v.to_string()),
+ }),
+ index_document: Some(Suffix {
+ suffix: Value(website.index_document.to_string()),
+ }),
+ redirect_all_requests_to: None,
+ routing_rules: None,
+ };
+ let xml = to_xml_with_header(&wc)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, "application/xml")
+ .body(Body::from(xml))?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+ }
+}
+
+pub async fn handle_delete_website(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+) -> Result<Response<Body>, Error> {
+ let mut bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .ok_or(Error::NoSuchBucket)?;
+
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ param.website_config.update(None);
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_put_website(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let mut bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .ok_or(Error::NoSuchBucket)?;
+
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
+ conf.validate()?;
+
+ param
+ .website_config
+ .update(Some(conf.into_garage_website_config()?));
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct WebsiteConfiguration {
+ #[serde(serialize_with = "xmlns_tag", skip_deserializing)]
+ pub xmlns: (),
+ #[serde(rename = "ErrorDocument")]
+ pub error_document: Option<Key>,
+ #[serde(rename = "IndexDocument")]
+ pub index_document: Option<Suffix>,
+ #[serde(rename = "RedirectAllRequestsTo")]
+ pub redirect_all_requests_to: Option<Target>,
+ #[serde(rename = "RoutingRules")]
+ pub routing_rules: Option<Vec<RoutingRule>>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct RoutingRule {
+ #[serde(rename = "RoutingRule")]
+ pub inner: RoutingRuleInner,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct RoutingRuleInner {
+ #[serde(rename = "Condition")]
+ pub condition: Option<Condition>,
+ #[serde(rename = "Redirect")]
+ pub redirect: Redirect,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Key {
+ #[serde(rename = "Key")]
+ pub key: Value,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Suffix {
+ #[serde(rename = "Suffix")]
+ pub suffix: Value,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Target {
+ #[serde(rename = "HostName")]
+ pub hostname: Value,
+ #[serde(rename = "Protocol")]
+ pub protocol: Option<Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Condition {
+ #[serde(rename = "HttpErrorCodeReturnedEquals")]
+ pub http_error_code: Option<IntValue>,
+ #[serde(rename = "KeyPrefixEquals")]
+ pub prefix: Option<Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Redirect {
+ #[serde(rename = "HostName")]
+ pub hostname: Option<Value>,
+ #[serde(rename = "Protocol")]
+ pub protocol: Option<Value>,
+ #[serde(rename = "HttpRedirectCode")]
+ pub http_redirect_code: Option<IntValue>,
+ #[serde(rename = "ReplaceKeyPrefixWith")]
+ pub replace_prefix: Option<Value>,
+ #[serde(rename = "ReplaceKeyWith")]
+ pub replace_full: Option<Value>,
+}
+
+impl WebsiteConfiguration {
+ pub fn validate(&self) -> Result<(), Error> {
+ if self.redirect_all_requests_to.is_some()
+ && (self.error_document.is_some()
+ || self.index_document.is_some()
+ || self.routing_rules.is_some())
+ {
+ return Err(Error::BadRequest(
+ "Bad XML: can't have RedirectAllRequestsTo and other fields".to_owned(),
+ ));
+ }
+ if let Some(ref ed) = self.error_document {
+ ed.validate()?;
+ }
+ if let Some(ref id) = self.index_document {
+ id.validate()?;
+ }
+ if let Some(ref rart) = self.redirect_all_requests_to {
+ rart.validate()?;
+ }
+ if let Some(ref rrs) = self.routing_rules {
+ for rr in rrs {
+ rr.inner.validate()?;
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn into_garage_website_config(self) -> Result<WebsiteConfig, Error> {
+ if self.redirect_all_requests_to.is_some() {
+ Err(Error::NotImplemented(
+ "S3 website redirects are not currently implemented in Garage.".into(),
+ ))
+ } else if self.routing_rules.map(|x| !x.is_empty()).unwrap_or(false) {
+ Err(Error::NotImplemented(
+ "S3 routing rules are not currently implemented in Garage.".into(),
+ ))
+ } else {
+ Ok(WebsiteConfig {
+ index_document: self
+ .index_document
+ .map(|x| x.suffix.0)
+ .unwrap_or_else(|| "index.html".to_string()),
+ error_document: self.error_document.map(|x| x.key.0),
+ })
+ }
+ }
+}
+
+impl Key {
+ pub fn validate(&self) -> Result<(), Error> {
+ if self.key.0.is_empty() {
+ Err(Error::BadRequest(
+ "Bad XML: error document specified but empty".to_owned(),
+ ))
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl Suffix {
+ pub fn validate(&self) -> Result<(), Error> {
+ if self.suffix.0.is_empty() | self.suffix.0.contains('/') {
+ Err(Error::BadRequest(
+ "Bad XML: index document is empty or contains /".to_owned(),
+ ))
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl Target {
+ pub fn validate(&self) -> Result<(), Error> {
+ if let Some(ref protocol) = self.protocol {
+ if protocol.0 != "http" && protocol.0 != "https" {
+ return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned()));
+ }
+ }
+ Ok(())
+ }
+}
+
+impl RoutingRuleInner {
+ pub fn validate(&self) -> Result<(), Error> {
+ let has_prefix = self
+ .condition
+ .as_ref()
+ .and_then(|c| c.prefix.as_ref())
+ .is_some();
+ self.redirect.validate(has_prefix)
+ }
+}
+
+impl Redirect {
+ pub fn validate(&self, has_prefix: bool) -> Result<(), Error> {
+ if self.replace_prefix.is_some() {
+ if self.replace_full.is_some() {
+ return Err(Error::BadRequest(
+ "Bad XML: both ReplaceKeyPrefixWith and ReplaceKeyWith are set".to_owned(),
+ ));
+ }
+ if !has_prefix {
+ return Err(Error::BadRequest(
+ "Bad XML: ReplaceKeyPrefixWith is set, but KeyPrefixEquals isn't".to_owned(),
+ ));
+ }
+ }
+ if let Some(ref protocol) = self.protocol {
+ if protocol.0 != "http" && protocol.0 != "https" {
+ return Err(Error::BadRequest("Bad XML: invalid protocol".to_owned()));
+ }
+ }
+ // TODO there are probably more invalide cases, but which ones?
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use quick_xml::de::from_str;
+
+ #[test]
+ fn test_deserialize() -> Result<(), Error> {
+ let message = r#"<?xml version="1.0" encoding="UTF-8"?>
+<WebsiteConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <ErrorDocument>
+ <Key>my-error-doc</Key>
+ </ErrorDocument>
+ <IndexDocument>
+ <Suffix>my-index</Suffix>
+ </IndexDocument>
+ <RedirectAllRequestsTo>
+ <HostName>garage.tld</HostName>
+ <Protocol>https</Protocol>
+ </RedirectAllRequestsTo>
+ <RoutingRules>
+ <RoutingRule>
+ <Condition>
+ <HttpErrorCodeReturnedEquals>404</HttpErrorCodeReturnedEquals>
+ <KeyPrefixEquals>prefix1</KeyPrefixEquals>
+ </Condition>
+ <Redirect>
+ <HostName>gara.ge</HostName>
+ <Protocol>http</Protocol>
+ <HttpRedirectCode>303</HttpRedirectCode>
+ <ReplaceKeyPrefixWith>prefix2</ReplaceKeyPrefixWith>
+ <ReplaceKeyWith>fullkey</ReplaceKeyWith>
+ </Redirect>
+ </RoutingRule>
+ </RoutingRules>
+</WebsiteConfiguration>"#;
+ let conf: WebsiteConfiguration = from_str(message).unwrap();
+ let ref_value = WebsiteConfiguration {
+ xmlns: (),
+ error_document: Some(Key {
+ key: Value("my-error-doc".to_owned()),
+ }),
+ index_document: Some(Suffix {
+ suffix: Value("my-index".to_owned()),
+ }),
+ redirect_all_requests_to: Some(Target {
+ hostname: Value("garage.tld".to_owned()),
+ protocol: Some(Value("https".to_owned())),
+ }),
+ routing_rules: Some(vec![RoutingRule {
+ inner: RoutingRuleInner {
+ condition: Some(Condition {
+ http_error_code: Some(IntValue(404)),
+ prefix: Some(Value("prefix1".to_owned())),
+ }),
+ redirect: Redirect {
+ hostname: Some(Value("gara.ge".to_owned())),
+ protocol: Some(Value("http".to_owned())),
+ http_redirect_code: Some(IntValue(303)),
+ replace_prefix: Some(Value("prefix2".to_owned())),
+ replace_full: Some(Value("fullkey".to_owned())),
+ },
+ },
+ }]),
+ };
+ assert_eq! {
+ ref_value,
+ conf
+ }
+
+ let message2 = to_xml_with_header(&ref_value)?;
+
+ let cleanup = |c: &str| c.replace(char::is_whitespace, "");
+ assert_eq!(cleanup(message), cleanup(&message2));
+
+ Ok(())
+ }
+}
diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs
new file mode 100644
index 00000000..75ec4559
--- /dev/null
+++ b/src/api/s3/xml.rs
@@ -0,0 +1,844 @@
+use quick_xml::se::to_string;
+use serde::{Deserialize, Serialize, Serializer};
+
+use crate::Error as ApiError;
+
+pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> {
+ let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string();
+ xml.push_str(&to_string(x)?);
+ Ok(xml)
+}
+
+pub fn xmlns_tag<S: Serializer>(_v: &(), s: S) -> Result<S::Ok, S::Error> {
+ s.serialize_str("http://s3.amazonaws.com/doc/2006-03-01/")
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Value(#[serde(rename = "$value")] pub String);
+
+impl From<&str> for Value {
+ fn from(s: &str) -> Value {
+ Value(s.to_string())
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct IntValue(#[serde(rename = "$value")] pub i64);
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct Bucket {
+ #[serde(rename = "CreationDate")]
+ pub creation_date: Value,
+ #[serde(rename = "Name")]
+ pub name: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct Owner {
+ #[serde(rename = "DisplayName")]
+ pub display_name: Value,
+ #[serde(rename = "ID")]
+ pub id: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct BucketList {
+ #[serde(rename = "Bucket")]
+ pub entries: Vec<Bucket>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListAllMyBucketsResult {
+ #[serde(rename = "Buckets")]
+ pub buckets: BucketList,
+ #[serde(rename = "Owner")]
+ pub owner: Owner,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct LocationConstraint {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "$value")]
+ pub region: String,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct Deleted {
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "VersionId")]
+ pub version_id: Value,
+ #[serde(rename = "DeleteMarkerVersionId")]
+ pub delete_marker_version_id: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct Error {
+ #[serde(rename = "Code")]
+ pub code: Value,
+ #[serde(rename = "Message")]
+ pub message: Value,
+ #[serde(rename = "Resource")]
+ pub resource: Option<Value>,
+ #[serde(rename = "Region")]
+ pub region: Option<Value>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct DeleteError {
+ #[serde(rename = "Code")]
+ pub code: Value,
+ #[serde(rename = "Key")]
+ pub key: Option<Value>,
+ #[serde(rename = "Message")]
+ pub message: Value,
+ #[serde(rename = "VersionId")]
+ pub version_id: Option<Value>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct DeleteResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Deleted")]
+ pub deleted: Vec<Deleted>,
+ #[serde(rename = "Error")]
+ pub errors: Vec<DeleteError>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct InitiateMultipartUploadResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Bucket")]
+ pub bucket: Value,
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "UploadId")]
+ pub upload_id: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct CompleteMultipartUploadResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Location")]
+ pub location: Option<Value>,
+ #[serde(rename = "Bucket")]
+ pub bucket: Value,
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "ETag")]
+ pub etag: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct Initiator {
+ #[serde(rename = "DisplayName")]
+ pub display_name: Value,
+ #[serde(rename = "ID")]
+ pub id: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListMultipartItem {
+ #[serde(rename = "Initiated")]
+ pub initiated: Value,
+ #[serde(rename = "Initiator")]
+ pub initiator: Initiator,
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "UploadId")]
+ pub upload_id: Value,
+ #[serde(rename = "Owner")]
+ pub owner: Owner,
+ #[serde(rename = "StorageClass")]
+ pub storage_class: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListMultipartUploadsResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Bucket")]
+ pub bucket: Value,
+ #[serde(rename = "KeyMarker")]
+ pub key_marker: Option<Value>,
+ #[serde(rename = "UploadIdMarker")]
+ pub upload_id_marker: Option<Value>,
+ #[serde(rename = "NextKeyMarker")]
+ pub next_key_marker: Option<Value>,
+ #[serde(rename = "NextUploadIdMarker")]
+ pub next_upload_id_marker: Option<Value>,
+ #[serde(rename = "Prefix")]
+ pub prefix: Value,
+ #[serde(rename = "Delimiter")]
+ pub delimiter: Option<Value>,
+ #[serde(rename = "MaxUploads")]
+ pub max_uploads: IntValue,
+ #[serde(rename = "IsTruncated")]
+ pub is_truncated: Value,
+ #[serde(rename = "Upload")]
+ pub upload: Vec<ListMultipartItem>,
+ #[serde(rename = "CommonPrefixes")]
+ pub common_prefixes: Vec<CommonPrefix>,
+ #[serde(rename = "EncodingType")]
+ pub encoding_type: Option<Value>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct PartItem {
+ #[serde(rename = "ETag")]
+ pub etag: Value,
+ #[serde(rename = "LastModified")]
+ pub last_modified: Value,
+ #[serde(rename = "PartNumber")]
+ pub part_number: IntValue,
+ #[serde(rename = "Size")]
+ pub size: IntValue,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListPartsResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Bucket")]
+ pub bucket: Value,
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "UploadId")]
+ pub upload_id: Value,
+ #[serde(rename = "PartNumberMarker")]
+ pub part_number_marker: Option<IntValue>,
+ #[serde(rename = "NextPartNumberMarker")]
+ pub next_part_number_marker: Option<IntValue>,
+ #[serde(rename = "MaxParts")]
+ pub max_parts: IntValue,
+ #[serde(rename = "IsTruncated")]
+ pub is_truncated: Value,
+ #[serde(rename = "Part", default)]
+ pub parts: Vec<PartItem>,
+ #[serde(rename = "Initiator")]
+ pub initiator: Initiator,
+ #[serde(rename = "Owner")]
+ pub owner: Owner,
+ #[serde(rename = "StorageClass")]
+ pub storage_class: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListBucketItem {
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "LastModified")]
+ pub last_modified: Value,
+ #[serde(rename = "ETag")]
+ pub etag: Value,
+ #[serde(rename = "Size")]
+ pub size: IntValue,
+ #[serde(rename = "StorageClass")]
+ pub storage_class: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct CommonPrefix {
+ #[serde(rename = "Prefix")]
+ pub prefix: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListBucketResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Name")]
+ pub name: Value,
+ #[serde(rename = "Prefix")]
+ pub prefix: Value,
+ #[serde(rename = "Marker")]
+ pub marker: Option<Value>,
+ #[serde(rename = "NextMarker")]
+ pub next_marker: Option<Value>,
+ #[serde(rename = "StartAfter")]
+ pub start_after: Option<Value>,
+ #[serde(rename = "ContinuationToken")]
+ pub continuation_token: Option<Value>,
+ #[serde(rename = "NextContinuationToken")]
+ pub next_continuation_token: Option<Value>,
+ #[serde(rename = "KeyCount")]
+ pub key_count: Option<IntValue>,
+ #[serde(rename = "MaxKeys")]
+ pub max_keys: IntValue,
+ #[serde(rename = "Delimiter")]
+ pub delimiter: Option<Value>,
+ #[serde(rename = "EncodingType")]
+ pub encoding_type: Option<Value>,
+ #[serde(rename = "IsTruncated")]
+ pub is_truncated: Value,
+ #[serde(rename = "Contents")]
+ pub contents: Vec<ListBucketItem>,
+ #[serde(rename = "CommonPrefixes")]
+ pub common_prefixes: Vec<CommonPrefix>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct VersioningConfiguration {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Status")]
+ pub status: Option<Value>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct PostObject {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Location")]
+ pub location: Value,
+ #[serde(rename = "Bucket")]
+ pub bucket: Value,
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "ETag")]
+ pub etag: Value,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use garage_util::time::*;
+
+ #[test]
+ fn error_message() -> Result<(), ApiError> {
+ let error = Error {
+ code: Value("TestError".to_string()),
+ message: Value("A dummy error message".to_string()),
+ resource: Some(Value("/bucket/a/plop".to_string())),
+ region: Some(Value("garage".to_string())),
+ };
+ assert_eq!(
+ to_xml_with_header(&error)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<Error>\
+ <Code>TestError</Code>\
+ <Message>A dummy error message</Message>\
+ <Resource>/bucket/a/plop</Resource>\
+ <Region>garage</Region>\
+</Error>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn list_all_my_buckets_result() -> Result<(), ApiError> {
+ let list_buckets = ListAllMyBucketsResult {
+ owner: Owner {
+ display_name: Value("owner_name".to_string()),
+ id: Value("qsdfjklm".to_string()),
+ },
+ buckets: BucketList {
+ entries: vec![
+ Bucket {
+ creation_date: Value(msec_to_rfc3339(0)),
+ name: Value("bucket_A".to_string()),
+ },
+ Bucket {
+ creation_date: Value(msec_to_rfc3339(3600 * 24 * 1000)),
+ name: Value("bucket_B".to_string()),
+ },
+ ],
+ },
+ };
+ assert_eq!(
+ to_xml_with_header(&list_buckets)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListAllMyBucketsResult>\
+ <Buckets>\
+ <Bucket>\
+ <CreationDate>1970-01-01T00:00:00.000Z</CreationDate>\
+ <Name>bucket_A</Name>\
+ </Bucket>\
+ <Bucket>\
+ <CreationDate>1970-01-02T00:00:00.000Z</CreationDate>\
+ <Name>bucket_B</Name>\
+ </Bucket>\
+ </Buckets>\
+ <Owner>\
+ <DisplayName>owner_name</DisplayName>\
+ <ID>qsdfjklm</ID>\
+ </Owner>\
+</ListAllMyBucketsResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn get_bucket_location_result() -> Result<(), ApiError> {
+ let get_bucket_location = LocationConstraint {
+ xmlns: (),
+ region: "garage".to_string(),
+ };
+ assert_eq!(
+ to_xml_with_header(&get_bucket_location)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">garage</LocationConstraint>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn get_bucket_versioning_result() -> Result<(), ApiError> {
+ let get_bucket_versioning = VersioningConfiguration {
+ xmlns: (),
+ status: None,
+ };
+ assert_eq!(
+ to_xml_with_header(&get_bucket_versioning)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"/>"
+ );
+ let get_bucket_versioning2 = VersioningConfiguration {
+ xmlns: (),
+ status: Some(Value("Suspended".to_string())),
+ };
+ assert_eq!(
+ to_xml_with_header(&get_bucket_versioning2)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"><Status>Suspended</Status></VersioningConfiguration>"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn delete_result() -> Result<(), ApiError> {
+ let delete_result = DeleteResult {
+ xmlns: (),
+ deleted: vec![
+ Deleted {
+ key: Value("a/plop".to_string()),
+ version_id: Value("qsdfjklm".to_string()),
+ delete_marker_version_id: Value("wxcvbn".to_string()),
+ },
+ Deleted {
+ key: Value("b/plip".to_string()),
+ version_id: Value("1234".to_string()),
+ delete_marker_version_id: Value("4321".to_string()),
+ },
+ ],
+ errors: vec![
+ DeleteError {
+ code: Value("NotFound".to_string()),
+ key: Some(Value("c/plap".to_string())),
+ message: Value("Object c/plap not found".to_string()),
+ version_id: None,
+ },
+ DeleteError {
+ code: Value("Forbidden".to_string()),
+ key: Some(Value("d/plep".to_string())),
+ message: Value("Not authorized".to_string()),
+ version_id: Some(Value("789".to_string())),
+ },
+ ],
+ };
+ assert_eq!(
+ to_xml_with_header(&delete_result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Deleted>\
+ <Key>a/plop</Key>\
+ <VersionId>qsdfjklm</VersionId>\
+ <DeleteMarkerVersionId>wxcvbn</DeleteMarkerVersionId>\
+ </Deleted>\
+ <Deleted>\
+ <Key>b/plip</Key>\
+ <VersionId>1234</VersionId>\
+ <DeleteMarkerVersionId>4321</DeleteMarkerVersionId>\
+ </Deleted>\
+ <Error>\
+ <Code>NotFound</Code>\
+ <Key>c/plap</Key>\
+ <Message>Object c/plap not found</Message>\
+ </Error>\
+ <Error>\
+ <Code>Forbidden</Code>\
+ <Key>d/plep</Key>\
+ <Message>Not authorized</Message>\
+ <VersionId>789</VersionId>\
+ </Error>\
+</DeleteResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn initiate_multipart_upload_result() -> Result<(), ApiError> {
+ let result = InitiateMultipartUploadResult {
+ xmlns: (),
+ bucket: Value("mybucket".to_string()),
+ key: Value("a/plop".to_string()),
+ upload_id: Value("azerty".to_string()),
+ };
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<InitiateMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Bucket>mybucket</Bucket>\
+ <Key>a/plop</Key>\
+ <UploadId>azerty</UploadId>\
+</InitiateMultipartUploadResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn complete_multipart_upload_result() -> Result<(), ApiError> {
+ let result = CompleteMultipartUploadResult {
+ xmlns: (),
+ location: Some(Value("https://garage.tld/mybucket/a/plop".to_string())),
+ bucket: Value("mybucket".to_string()),
+ key: Value("a/plop".to_string()),
+ etag: Value("\"3858f62230ac3c915f300c664312c11f-9\"".to_string()),
+ };
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Location>https://garage.tld/mybucket/a/plop</Location>\
+ <Bucket>mybucket</Bucket>\
+ <Key>a/plop</Key>\
+ <ETag>&quot;3858f62230ac3c915f300c664312c11f-9&quot;</ETag>\
+</CompleteMultipartUploadResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn list_multipart_uploads_result() -> Result<(), ApiError> {
+ let result = ListMultipartUploadsResult {
+ xmlns: (),
+ bucket: Value("example-bucket".to_string()),
+ key_marker: None,
+ next_key_marker: None,
+ upload_id_marker: None,
+ encoding_type: None,
+ next_upload_id_marker: None,
+ upload: vec![],
+ delimiter: Some(Value("/".to_string())),
+ prefix: Value("photos/2006/".to_string()),
+ max_uploads: IntValue(1000),
+ is_truncated: Value("false".to_string()),
+ common_prefixes: vec![
+ CommonPrefix {
+ prefix: Value("photos/2006/February/".to_string()),
+ },
+ CommonPrefix {
+ prefix: Value("photos/2006/January/".to_string()),
+ },
+ CommonPrefix {
+ prefix: Value("photos/2006/March/".to_string()),
+ },
+ ],
+ };
+
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListMultipartUploadsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Bucket>example-bucket</Bucket>\
+ <Prefix>photos/2006/</Prefix>\
+ <Delimiter>/</Delimiter>\
+ <MaxUploads>1000</MaxUploads>\
+ <IsTruncated>false</IsTruncated>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/February/</Prefix>\
+ </CommonPrefixes>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/January/</Prefix>\
+ </CommonPrefixes>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/March/</Prefix>\
+ </CommonPrefixes>\
+</ListMultipartUploadsResult>"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn list_objects_v1_1() -> Result<(), ApiError> {
+ let result = ListBucketResult {
+ xmlns: (),
+ name: Value("example-bucket".to_string()),
+ prefix: Value("".to_string()),
+ marker: Some(Value("".to_string())),
+ next_marker: None,
+ start_after: None,
+ continuation_token: None,
+ next_continuation_token: None,
+ key_count: None,
+ max_keys: IntValue(1000),
+ encoding_type: None,
+ delimiter: Some(Value("/".to_string())),
+ is_truncated: Value("false".to_string()),
+ contents: vec![ListBucketItem {
+ key: Value("sample.jpg".to_string()),
+ last_modified: Value(msec_to_rfc3339(0)),
+ etag: Value("\"bf1d737a4d46a19f3bced6905cc8b902\"".to_string()),
+ size: IntValue(142863),
+ storage_class: Value("STANDARD".to_string()),
+ }],
+ common_prefixes: vec![CommonPrefix {
+ prefix: Value("photos/".to_string()),
+ }],
+ };
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Name>example-bucket</Name>\
+ <Prefix></Prefix>\
+ <Marker></Marker>\
+ <MaxKeys>1000</MaxKeys>\
+ <Delimiter>/</Delimiter>\
+ <IsTruncated>false</IsTruncated>\
+ <Contents>\
+ <Key>sample.jpg</Key>\
+ <LastModified>1970-01-01T00:00:00.000Z</LastModified>\
+ <ETag>&quot;bf1d737a4d46a19f3bced6905cc8b902&quot;</ETag>\
+ <Size>142863</Size>\
+ <StorageClass>STANDARD</StorageClass>\
+ </Contents>\
+ <CommonPrefixes>\
+ <Prefix>photos/</Prefix>\
+ </CommonPrefixes>\
+</ListBucketResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn list_objects_v1_2() -> Result<(), ApiError> {
+ let result = ListBucketResult {
+ xmlns: (),
+ name: Value("example-bucket".to_string()),
+ prefix: Value("photos/2006/".to_string()),
+ marker: Some(Value("".to_string())),
+ next_marker: None,
+ start_after: None,
+ continuation_token: None,
+ next_continuation_token: None,
+ key_count: None,
+ max_keys: IntValue(1000),
+ delimiter: Some(Value("/".to_string())),
+ encoding_type: None,
+ is_truncated: Value("false".to_string()),
+ contents: vec![],
+ common_prefixes: vec![
+ CommonPrefix {
+ prefix: Value("photos/2006/February/".to_string()),
+ },
+ CommonPrefix {
+ prefix: Value("photos/2006/January/".to_string()),
+ },
+ ],
+ };
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Name>example-bucket</Name>\
+ <Prefix>photos/2006/</Prefix>\
+ <Marker></Marker>\
+ <MaxKeys>1000</MaxKeys>\
+ <Delimiter>/</Delimiter>\
+ <IsTruncated>false</IsTruncated>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/February/</Prefix>\
+ </CommonPrefixes>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/January/</Prefix>\
+ </CommonPrefixes>\
+</ListBucketResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn list_objects_v2_1() -> Result<(), ApiError> {
+ let result = ListBucketResult {
+ xmlns: (),
+ name: Value("quotes".to_string()),
+ prefix: Value("E".to_string()),
+ marker: None,
+ next_marker: None,
+ start_after: Some(Value("ExampleGuide.pdf".to_string())),
+ continuation_token: None,
+ next_continuation_token: None,
+ key_count: None,
+ max_keys: IntValue(3),
+ delimiter: None,
+ encoding_type: None,
+ is_truncated: Value("false".to_string()),
+ contents: vec![ListBucketItem {
+ key: Value("ExampleObject.txt".to_string()),
+ last_modified: Value(msec_to_rfc3339(0)),
+ etag: Value("\"599bab3ed2c697f1d26842727561fd94\"".to_string()),
+ size: IntValue(857),
+ storage_class: Value("REDUCED_REDUNDANCY".to_string()),
+ }],
+ common_prefixes: vec![],
+ };
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Name>quotes</Name>\
+ <Prefix>E</Prefix>\
+ <StartAfter>ExampleGuide.pdf</StartAfter>\
+ <MaxKeys>3</MaxKeys>\
+ <IsTruncated>false</IsTruncated>\
+ <Contents>\
+ <Key>ExampleObject.txt</Key>\
+ <LastModified>1970-01-01T00:00:00.000Z</LastModified>\
+ <ETag>&quot;599bab3ed2c697f1d26842727561fd94&quot;</ETag>\
+ <Size>857</Size>\
+ <StorageClass>REDUCED_REDUNDANCY</StorageClass>\
+ </Contents>\
+</ListBucketResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn list_objects_v2_2() -> Result<(), ApiError> {
+ let result = ListBucketResult {
+ xmlns: (),
+ name: Value("bucket".to_string()),
+ prefix: Value("".to_string()),
+ marker: None,
+ next_marker: None,
+ start_after: None,
+ continuation_token: Some(Value(
+ "1ueGcxLPRx1Tr/XYExHnhbYLgveDs2J/wm36Hy4vbOwM=".to_string(),
+ )),
+ next_continuation_token: Some(Value("qsdfjklm".to_string())),
+ key_count: Some(IntValue(112)),
+ max_keys: IntValue(1000),
+ delimiter: None,
+ encoding_type: None,
+ is_truncated: Value("false".to_string()),
+ contents: vec![ListBucketItem {
+ key: Value("happyfacex.jpg".to_string()),
+ last_modified: Value(msec_to_rfc3339(0)),
+ etag: Value("\"70ee1738b6b21e2c8a43f3a5ab0eee71\"".to_string()),
+ size: IntValue(1111),
+ storage_class: Value("STANDARD".to_string()),
+ }],
+ common_prefixes: vec![],
+ };
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Name>bucket</Name>\
+ <Prefix></Prefix>\
+ <ContinuationToken>1ueGcxLPRx1Tr/XYExHnhbYLgveDs2J/wm36Hy4vbOwM=</ContinuationToken>\
+ <NextContinuationToken>qsdfjklm</NextContinuationToken>\
+ <KeyCount>112</KeyCount>\
+ <MaxKeys>1000</MaxKeys>\
+ <IsTruncated>false</IsTruncated>\
+ <Contents>\
+ <Key>happyfacex.jpg</Key>\
+ <LastModified>1970-01-01T00:00:00.000Z</LastModified>\
+ <ETag>&quot;70ee1738b6b21e2c8a43f3a5ab0eee71&quot;</ETag>\
+ <Size>1111</Size>\
+ <StorageClass>STANDARD</StorageClass>\
+ </Contents>\
+</ListBucketResult>"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn list_parts() -> Result<(), ApiError> {
+ let result = ListPartsResult {
+ xmlns: (),
+ bucket: Value("example-bucket".to_string()),
+ key: Value("example-object".to_string()),
+ upload_id: Value(
+ "XXBsb2FkIElEIGZvciBlbHZpbmcncyVcdS1tb3ZpZS5tMnRzEEEwbG9hZA".to_string(),
+ ),
+ part_number_marker: Some(IntValue(1)),
+ next_part_number_marker: Some(IntValue(3)),
+ max_parts: IntValue(2),
+ is_truncated: Value("true".to_string()),
+ parts: vec![
+ PartItem {
+ etag: Value("\"7778aef83f66abc1fa1e8477f296d394\"".to_string()),
+ last_modified: Value("2010-11-10T20:48:34.000Z".to_string()),
+ part_number: IntValue(2),
+ size: IntValue(10485760),
+ },
+ PartItem {
+ etag: Value("\"aaaa18db4cc2f85cedef654fccc4a4x8\"".to_string()),
+ last_modified: Value("2010-11-10T20:48:33.000Z".to_string()),
+ part_number: IntValue(3),
+ size: IntValue(10485760),
+ },
+ ],
+ initiator: Initiator {
+ display_name: Value("umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx".to_string()),
+ id: Value(
+ "arn:aws:iam::111122223333:user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx"
+ .to_string(),
+ ),
+ },
+ owner: Owner {
+ display_name: Value("someName".to_string()),
+ id: Value(
+ "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a".to_string(),
+ ),
+ },
+ storage_class: Value("STANDARD".to_string()),
+ };
+
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListPartsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Bucket>example-bucket</Bucket>\
+ <Key>example-object</Key>\
+ <UploadId>XXBsb2FkIElEIGZvciBlbHZpbmcncyVcdS1tb3ZpZS5tMnRzEEEwbG9hZA</UploadId>\
+ <PartNumberMarker>1</PartNumberMarker>\
+ <NextPartNumberMarker>3</NextPartNumberMarker>\
+ <MaxParts>2</MaxParts>\
+ <IsTruncated>true</IsTruncated>\
+ <Part>\
+ <ETag>&quot;7778aef83f66abc1fa1e8477f296d394&quot;</ETag>\
+ <LastModified>2010-11-10T20:48:34.000Z</LastModified>\
+ <PartNumber>2</PartNumber>\
+ <Size>10485760</Size>\
+ </Part>\
+ <Part>\
+ <ETag>&quot;aaaa18db4cc2f85cedef654fccc4a4x8&quot;</ETag>\
+ <LastModified>2010-11-10T20:48:33.000Z</LastModified>\
+ <PartNumber>3</PartNumber>\
+ <Size>10485760</Size>\
+ </Part>\
+ <Initiator>\
+ <DisplayName>umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx</DisplayName>\
+ <ID>arn:aws:iam::111122223333:user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx</ID>\
+ </Initiator>\
+ <Owner>\
+ <DisplayName>someName</DisplayName>\
+ <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>\
+ </Owner>\
+ <StorageClass>STANDARD</StorageClass>\
+</ListPartsResult>"
+ );
+
+ Ok(())
+ }
+}