diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-04 15:56:10 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-04 15:56:10 +0100 |
commit | bbde9bc91225ac41ea6e8def61c5b7044bb186a0 (patch) | |
tree | 6e2bb951b1efb104c61d6e56aae84d7a6b036342 /src/api/s3 | |
parent | d0d95fd53f3d4a6fd5adcfbb4cbb031826fd64a4 (diff) | |
parent | 3168bb34a0082480660e945f7390a5ecab26c665 (diff) | |
download | garage-bbde9bc91225ac41ea6e8def61c5b7044bb186a0.tar.gz garage-bbde9bc91225ac41ea6e8def61c5b7044bb186a0.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/api_server.rs | 227 | ||||
-rw-r--r-- | src/api/s3/bucket.rs | 48 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 35 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 65 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 29 | ||||
-rw-r--r-- | src/api/s3/get.rs | 21 | ||||
-rw-r--r-- | src/api/s3/lifecycle.rs | 53 | ||||
-rw-r--r-- | src/api/s3/list.rs | 14 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 68 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 54 | ||||
-rw-r--r-- | src/api/s3/put.rs | 68 | ||||
-rw-r--r-- | src/api/s3/website.rs | 54 |
12 files changed, 350 insertions, 386 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 08405923..1ed30996 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -17,8 +17,7 @@ use garage_model::key_table::Key; use crate::generic_server::*; use crate::s3::error::*; -use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::*; +use crate::signature::verify_request; use crate::helpers::*; use crate::s3::bucket::*; @@ -125,17 +124,7 @@ impl ApiHandler for S3ApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; - let api_key = api_key - .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; - - let req = parse_streaming_body( - &api_key, - req, - &mut content_sha256, - &garage.config.s3_api.s3_region, - "s3", - )?; + let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?; let bucket_name = match bucket_name { None => { @@ -166,6 +155,7 @@ impl ApiHandler for S3ApiServer { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -178,12 +168,20 @@ impl ApiHandler for S3ApiServer { return Err(Error::forbidden("Operation is not allowed for this key.")); } - let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?; + let matching_cors_rule = find_matching_cors_rule(&bucket_params, &req)?.cloned(); + + let ctx = ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(garage, &req, bucket_id, &key, part_number).await, + } => handle_head(ctx, &req, &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -203,74 +201,37 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(garage, &req, bucket_id, &key, part_number, overrides).await + handle_get(ctx, &req, &key, part_number, overrides).await } Endpoint::UploadPart { key, part_number, upload_id, - } => { - handle_put_part( - garage, - req, - bucket_id, - &key, - part_number, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CopyObject { key } => { - handle_copy(garage, &api_key, &req, bucket_id, &key).await - } + } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, - } => { - handle_upload_part_copy( - garage, - &api_key, - &req, - bucket_id, - &key, - part_number, - &upload_id, - ) - .await - } - Endpoint::PutObject { key } => { - handle_put(garage, req, &bucket, &key, content_sha256).await - } + } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, Endpoint::AbortMultipartUpload { key, upload_id } => { - handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await + handle_abort_multipart_upload(ctx, &key, &upload_id).await } - Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, + Endpoint::DeleteObject { key, .. } => handle_delete(ctx, &key).await, Endpoint::CreateMultipartUpload { key } => { - handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await + handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload( - garage, - req, - &bucket_name, - &bucket, - &key, - &upload_id, - content_sha256, - ) - .await + handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } - Endpoint::DeleteBucket {} => { - handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await - } - Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), + Endpoint::DeleteBucket {} => handle_delete_bucket(ctx).await, + Endpoint::GetBucketLocation {} => handle_get_bucket_location(ctx), Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), Endpoint::ListObjects { delimiter, @@ -279,24 +240,21 @@ impl ApiHandler for S3ApiServer { max_keys, prefix, } => { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_keys.unwrap_or(1000).clamp(1, 1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - is_v2: false, - marker, - continuation_token: None, - start_after: None, + let query = ListObjectsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, - ) - .await + is_v2: false, + marker, + continuation_token: None, + start_after: None, + }; + handle_list(ctx, &query).await } Endpoint::ListObjectsV2 { delimiter, @@ -309,24 +267,21 @@ impl ApiHandler for S3ApiServer { .. } => { if list_type == "2" { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_keys.unwrap_or(1000).clamp(1, 1000), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - prefix: prefix.unwrap_or_default(), - }, - is_v2: true, - marker: None, - continuation_token, - start_after, + let query = ListObjectsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + prefix: prefix.unwrap_or_default(), }, - ) - .await + is_v2: true, + marker: None, + continuation_token, + start_after, + }; + handle_list(ctx, &query).await } else { Err(Error::bad_request(format!( "Invalid endpoint: list-type={}", @@ -342,22 +297,19 @@ impl ApiHandler for S3ApiServer { prefix, upload_id_marker, } => { - handle_list_multipart_upload( - garage, - &ListMultipartUploadsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - key_marker, - upload_id_marker, + let query = ListMultipartUploadsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, - ) - .await + key_marker, + upload_id_marker, + }; + handle_list_multipart_upload(ctx, &query).await } Endpoint::ListParts { key, @@ -365,39 +317,28 @@ impl ApiHandler for S3ApiServer { part_number_marker, upload_id, } => { - handle_list_parts( - garage, - &ListPartsQuery { - bucket_name, - bucket_id, - key, - upload_id, - part_number_marker: part_number_marker.map(|p| p.min(10000)), - max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), - }, - ) - .await - } - Endpoint::DeleteObjects {} => { - handle_delete_objects(garage, bucket_id, req, content_sha256).await - } - Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, - Endpoint::PutBucketWebsite {} => { - handle_put_website(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await, - Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, - Endpoint::PutBucketCors {} => { - handle_put_cors(garage, bucket.clone(), req, content_sha256).await + let query = ListPartsQuery { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.min(10000)), + max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), + }; + handle_list_parts(ctx, &query).await } - Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await, - Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, + Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, + Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketLifecycle {} => { - handle_delete_lifecycle(garage, bucket.clone()).await + handle_put_lifecycle(ctx, req, content_sha256).await } + Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; @@ -405,7 +346,7 @@ impl ApiHandler for S3ApiServer { // add the corresponding CORS headers to the response let mut resp_ok = resp?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) + add_cors_headers(&mut resp_ok, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index fa337566..6a12aa9c 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -21,7 +20,8 @@ use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> { +pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = ctx; let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -204,21 +204,20 @@ pub async fn handle_create_bucket( .unwrap()) } -pub async fn handle_delete_bucket( - garage: &Garage, - bucket_id: Uuid, - bucket_name: String, - api_key_id: &String, -) -> Result<Response<ResBody>, Error> { +pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params: bucket_state, + api_key, + .. + } = &ctx; let helper = garage.locked_helper().await; - let api_key = helper.key().get_existing_key(api_key_id).await?; let key_params = api_key.params().unwrap(); - let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_))); - - let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?; - let bucket_state = bucket.state.as_option().unwrap(); + let is_local_alias = matches!(key_params.local_aliases.get(bucket_name), Some(Some(_))); // If the bucket has no other aliases, this is a true deletion. // Otherwise, it is just an alias removal. @@ -228,20 +227,20 @@ pub async fn handle_delete_bucket( .items() .iter() .filter(|(_, _, active)| *active) - .any(|(n, _, _)| is_local_alias || (*n != bucket_name)); + .any(|(n, _, _)| is_local_alias || (*n != *bucket_name)); let has_other_local_aliases = bucket_state .local_aliases .items() .iter() .filter(|(_, _, active)| *active) - .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id); + .any(|((k, n), _, _)| !is_local_alias || *n != *bucket_name || *k != api_key.key_id); if !has_other_global_aliases && !has_other_local_aliases { // Delete bucket // Check bucket is empty - if !helper.bucket().is_bucket_empty(bucket_id).await? { + if !helper.bucket().is_bucket_empty(*bucket_id).await? { return Err(CommonError::BucketNotEmpty.into()); } @@ -249,33 +248,36 @@ pub async fn handle_delete_bucket( // 1. delete bucket alias if is_local_alias { helper - .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name) .await?; } else { helper - .unset_global_bucket_alias(bucket_id, &bucket_name) + .unset_global_bucket_alias(*bucket_id, bucket_name) .await?; } // 2. delete authorization from keys that had access - for (key_id, _) in bucket.authorized_keys() { + for (key_id, _) in bucket_state.authorized_keys.items() { helper - .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .set_bucket_key_permissions(*bucket_id, key_id, BucketKeyPerm::NO_PERMISSIONS) .await?; } + let bucket = Bucket { + id: *bucket_id, + state: Deletable::delete(), + }; // 3. delete bucket - bucket.state = Deletable::delete(); garage.bucket_table.insert(&bucket).await?; } else if is_local_alias { // Just unalias helper - .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name) .await?; } else { // Just unalias (but from global namespace) helper - .unset_global_bucket_alias(bucket_id, &bucket_name) + .unset_global_bucket_alias(*bucket_id, bucket_name) .await?; } diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 880ce5f4..3c2bd483 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -1,5 +1,4 @@ use std::pin::Pin; -use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt}; @@ -15,8 +14,6 @@ use garage_table::*; use garage_util::data::*; use garage_util::time::*; -use garage_model::garage::Garage; -use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; @@ -30,15 +27,19 @@ use crate::s3::put::get_headers; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( - garage: Arc<Garage>, - api_key: &Key, + ctx: ReqCtx, req: &Request<ReqBody>, - dest_bucket_id: Uuid, dest_key: &str, ) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = get_copy_source(&garage, api_key, req).await?; + let source_object = get_copy_source(&ctx, req).await?; + + let ReqCtx { + garage, + bucket_id: dest_bucket_id, + .. + } = ctx; let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -181,10 +182,8 @@ pub async fn handle_copy( } pub async fn handle_upload_part_copy( - garage: Arc<Garage>, - api_key: &Key, + ctx: ReqCtx, req: &Request<ReqBody>, - dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, @@ -195,10 +194,12 @@ pub async fn handle_upload_part_copy( let dest_key = dest_key.to_string(); let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( - get_copy_source(&garage, api_key, req), - multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id) + get_copy_source(&ctx, req), + multipart::get_upload(&ctx, &dest_key, &dest_upload_id) )?; + let ReqCtx { garage, .. } = ctx; + let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -439,11 +440,11 @@ pub async fn handle_upload_part_copy( .body(string_body(resp_xml))?) } -async fn get_copy_source( - garage: &Garage, - api_key: &Key, - req: &Request<ReqBody>, -) -> Result<Object, Error> { +async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> { + let ReqCtx { + garage, api_key, .. + } = ctx; + let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index e069cae4..173b7ffe 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -21,16 +21,13 @@ use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; -use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; +use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - if let Some(cors) = param.cors_config.get() { +pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { bucket_params, .. } = ctx; + if let Some(cors) = bucket_params.cors_config.get() { let wc = CorsConfiguration { xmlns: (), cors_rules: cors @@ -50,16 +47,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error } } -pub async fn handle_delete_cors( - garage: Arc<Garage>, - mut bucket: Bucket, -) -> Result<Response<ResBody>, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.cors_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.cors_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -67,28 +66,33 @@ pub async fn handle_delete_cors( } pub async fn handle_put_cors( - garage: Arc<Garage>, - mut bucket: Bucket, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - param + bucket_params .cors_config .update(Some(conf.into_garage_cors_config()?)); - garage.bucket_table.insert(&bucket).await?; + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) @@ -115,7 +119,8 @@ pub async fn handle_options_api( let bucket_id = helper.resolve_global_bucket_name(&bn).await?; if let Some(id) = bucket_id { let bucket = garage.bucket_helper().get_existing_bucket(id).await?; - handle_options_for_bucket(req, &bucket) + let bucket_params = bucket.state.into_option().unwrap(); + handle_options_for_bucket(req, &bucket_params) } else { // If there is a bucket name in the request, but that name // does not correspond to a global alias for a bucket, @@ -145,7 +150,7 @@ pub async fn handle_options_api( pub fn handle_options_for_bucket( req: &Request<IncomingBody>, - bucket: &Bucket, + bucket_params: &BucketParams, ) -> Result<Response<EmptyBody>, CommonError> { let origin = req .headers() @@ -162,7 +167,7 @@ pub fn handle_options_for_bucket( None => vec![], }; - if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { + if let Some(cors_config) = bucket_params.cors_config.get() { let matching_rule = cors_config .iter() .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); @@ -181,10 +186,10 @@ pub fn handle_options_for_bucket( } pub fn find_matching_cors_rule<'a>( - bucket: &'a Bucket, + bucket_params: &'a BucketParams, req: &Request<impl Body>, ) -> Result<Option<&'a GarageCorsRule>, Error> { - if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { + if let Some(cors_config) = bucket_params.cors_config.get() { if let Some(origin) = req.headers().get("Origin") { let origin = origin.to_str()?; let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 3fb39147..57f6f948 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; -use garage_model::garage::Garage; use garage_model::s3::object_table::*; use crate::helpers::*; @@ -15,14 +12,13 @@ use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -async fn handle_delete_internal( - garage: &Garage, - bucket_id: Uuid, - key: &str, -) -> Result<(Uuid, Uuid), Error> { +async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; // No need to delete @@ -44,7 +40,7 @@ async fn handle_delete_internal( }; let object = Object::new( - bucket_id, + *bucket_id, key.into(), vec![ObjectVersion { uuid: del_uuid, @@ -58,12 +54,8 @@ async fn handle_delete_internal( Ok((deleted_version, del_uuid)) } -pub async fn handle_delete( - garage: Arc<Garage>, - bucket_id: Uuid, - key: &str, -) -> Result<Response<ResBody>, Error> { - match handle_delete_internal(&garage, bucket_id, key).await { +pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, Error> { + match handle_delete_internal(&ctx, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(empty_body()) @@ -73,8 +65,7 @@ pub async fn handle_delete( } pub async fn handle_delete_objects( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { @@ -91,7 +82,7 @@ pub async fn handle_delete_objects( let mut ret_errors = Vec::new(); for obj in cmd.objects.iter() { - match handle_delete_internal(&garage, bucket_id, &obj.key).await { + match handle_delete_internal(&ctx, &obj.key).await { Ok((deleted_version, delete_marker_version)) => { if cmd.quiet { continue; diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 0d18e775..ed996fb1 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -131,6 +131,16 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( + ctx: ReqCtx, + req: &Request<impl Body>, + key: &str, + part_number: Option<u64>, +) -> Result<Response<ResBody>, Error> { + handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await +} + +/// Handle HEAD request for website +pub async fn handle_head_without_ctx( garage: Arc<Garage>, req: &Request<impl Body>, bucket_id: Uuid, @@ -218,6 +228,17 @@ pub async fn handle_head( /// Handle GET request pub async fn handle_get( + ctx: ReqCtx, + req: &Request<impl Body>, + key: &str, + part_number: Option<u64>, + overrides: GetObjectOverrides, +) -> Result<Response<ResBody>, Error> { + handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await +} + +/// Handle GET request +pub async fn handle_get_without_ctx( garage: Arc<Garage>, req: &Request<impl Body>, bucket_id: Uuid, diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 35757e8c..7eb1c2cb 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,5 +1,4 @@ use quick_xml::de::from_reader; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -16,15 +15,12 @@ use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; +pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { bucket_params, .. } = ctx; - if let Some(lifecycle) = param.lifecycle_config.get() { + if let Some(lifecycle) = bucket_params.lifecycle_config.get() { let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle); let xml = to_xml_with_header(&wc)?; Ok(Response::builder() @@ -38,16 +34,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, } } -pub async fn handle_delete_lifecycle( - garage: Arc<Garage>, - mut bucket: Bucket, -) -> Result<Response<ResBody>, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.lifecycle_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.lifecycle_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -55,28 +53,33 @@ pub async fn handle_delete_lifecycle( } pub async fn handle_put_lifecycle( - garage: Arc<Garage>, - mut bucket: Bucket, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf .validate_into_garage_lifecycle_config() .ok_or_bad_request("Invalid lifecycle configuration")?; - param.lifecycle_config.update(Some(config)); - garage.bucket_table.insert(&bucket).await?; + bucket_params.lifecycle_config.update(Some(config)); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index b832a4f4..302c03f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; -use std::sync::Arc; use base64::prelude::*; use hyper::Response; @@ -9,7 +8,6 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model::garage::Garage; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; @@ -62,9 +60,10 @@ pub struct ListPartsQuery { } pub async fn handle_list( - garage: Arc<Garage>, + ctx: ReqCtx, query: &ListObjectsQuery, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -167,9 +166,11 @@ pub async fn handle_list( } pub async fn handle_list_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, query: &ListMultipartUploadsQuery, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; + let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -269,15 +270,14 @@ pub async fn handle_list_multipart_upload( } pub async fn handle_list_parts( - garage: Arc<Garage>, + ctx: ReqCtx, query: &ListPartsQuery, ) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (_, _, mpu) = - s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; + let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; let (info, next) = fetch_part_info(query, &mpu)?; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 5959bcd6..1d5aeb26 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; @@ -25,12 +24,16 @@ use crate::signature::verify_signed_content; // ---- pub async fn handle_create_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, req: &Request<ReqBody>, - bucket_name: &str, - bucket_id: Uuid, key: &String, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload( headers, }, }; - let object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) - let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false); + let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false); garage.mpu_table.insert(&mpu).await?; // Send success response @@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload( } pub async fn handle_put_part( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; + let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -90,10 +94,8 @@ pub async fn handle_put_part( let stream = body_stream(req.into_body()); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = futures::try_join!( - get_upload(&garage, &bucket_id, &key, &upload_id), - chunker.next(), - )?; + let ((_, _, mut mpu), first_block) = + futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -135,7 +137,7 @@ pub async fn handle_put_part( // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; + read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; // Verify that checksums map ensure_checksum_matches( @@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup { } pub async fn handle_complete_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_name: &str, - bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; + let body = http_body_util::BodyExt::collect(req.into_body()) .await? .to_bytes(); @@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload( // Get object and multipart upload let key = key.to_string(); - let (object, mut object_version, mpu) = - get_upload(&garage, &bucket.id, &key, &upload_id).await?; + let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?; if mpu.parts.is_empty() { return Err(Error::bad_request("No data was uploaded")); @@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload( let mut final_version = Version::new( upload_id, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.to_string(), }, false, @@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); - if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await { + if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; return Err(e); @@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload( final_version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done @@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload( } pub async fn handle_abort_multipart_upload( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, key: &str, upload_id: &str, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let upload_id = decode_upload_id(upload_id)?; - let (_, mut object_version, _) = - get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; + let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?; object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; Ok(Response::new(empty_body())) @@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload( #[allow(clippy::ptr_arg)] pub(crate) async fn get_upload( - garage: &Garage, - bucket_id: &Uuid, + ctx: &ReqCtx, key: &String, upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let (object, mpu) = futures::try_join!( garage.object_table.get(bucket_id, key).map_err(Error::from), garage diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index bca8d6c6..66f8174c 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -21,7 +21,7 @@ use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; -use crate::signature::payload::{parse_date, verify_v4}; +use crate::signature::payload::{verify_v4, Authorization}; pub async fn handle_post_object( garage: Arc<Garage>, @@ -88,22 +88,11 @@ pub async fn handle_post_object( .get("key") .ok_or_bad_request("No key was provided")? .to_str()?; - let credential = params - .get("x-amz-credential") - .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))? - .to_str()?; let policy = params .get("policy") .ok_or_bad_request("No policy was provided")? .to_str()?; - let signature = params - .get("x-amz-signature") - .ok_or_bad_request("No signature was provided")? - .to_str()?; - let date = params - .get("x-amz-date") - .ok_or_bad_request("No date was provided")? - .to_str()?; + let authorization = Authorization::parse_form(¶ms)?; let key = if key.contains("${filename}") { // if no filename is provided, don't replace. This matches the behavior of AWS. @@ -116,16 +105,7 @@ pub async fn handle_post_object( key.to_owned() }; - let date = parse_date(date)?; - let api_key = verify_v4( - &garage, - "s3", - credential, - &date, - signature, - policy.as_bytes(), - ) - .await?; + let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes()).await?; let bucket_id = garage .bucket_helper() @@ -140,6 +120,12 @@ pub async fn handle_post_object( .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); + let matching_cors_rule = find_matching_cors_rule( + &bucket_params, + &Request::from_parts(head.clone(), empty_body::<Infallible>()), + )? + .cloned(); let decoded_policy = BASE64_STANDARD .decode(policy) @@ -233,11 +219,19 @@ pub async fn handle_post_object( let headers = get_headers(¶ms)?; let stream = field.map(|r| r.map_err(Into::into)); - let (_, md5) = save_stream( + + let ctx = ReqCtx { garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; + + let (_, md5) = save_stream( + &ctx, headers, StreamLimiter::new(stream, conditions.content_length), - &bucket, &key, None, None, @@ -254,7 +248,7 @@ pub async fn handle_post_object( { target .query_pairs_mut() - .append_pair("bucket", &bucket_name) + .append_pair("bucket", &ctx.bucket_name) .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); @@ -298,7 +292,7 @@ pub async fn handle_post_object( let xml = s3_xml::PostObject { xmlns: (), location: s3_xml::Value(location), - bucket: s3_xml::Value(bucket_name), + bucket: s3_xml::Value(ctx.bucket_name), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; @@ -311,12 +305,8 @@ pub async fn handle_post_object( } }; - let matching_cors_rule = find_matching_cors_rule( - &bucket, - &Request::from_parts(head, empty_body::<Infallible>()), - )?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp, rule) + add_cors_headers(&mut resp, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c1af513c..f06aa7a2 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::index_counter::CountedItem; use garage_model::s3::block_ref_table::*; @@ -42,9 +41,8 @@ use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; pub async fn handle_put( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket: &Bucket, key: &String, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { @@ -59,35 +57,27 @@ pub async fn handle_put( let stream = body_stream(req.into_body()); - save_stream( - garage, - headers, - stream, - bucket, - key, - content_md5, - content_sha256, - ) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + save_stream(&ctx, headers, stream, key, content_md5, content_sha256) + .await + .map(|(uuid, md5)| put_response(uuid, md5)) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( - garage: Arc<Garage>, + ctx: &ReqCtx, headers: ObjectVersionHeaders, body: S, - bucket: &Bucket, key: &String, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, ) -> Result<(Uuid, String), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (first_block_opt, existing_object) = try_join!( chunker.next(), - garage - .object_table - .get(&bucket.id, key) - .map_err(Error::from), + garage.object_table.get(bucket_id, key).map_err(Error::from), )?; let first_block = first_block_opt.unwrap_or_default(); @@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, size, existing_object.as_ref()).await?; + check_quotas(ctx, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( )), }; - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), version_uuid, version_timestamp, @@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( multipart: false, }, }; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); + let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table @@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version = Version::new( version_uuid, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), }, false, @@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?; + check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; // We were not interrupted, everything went fine. @@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches( /// Check that inserting this object with this size doesn't exceed bucket quotas pub(crate) async fn check_quotas( - garage: &Arc<Garage>, - bucket: &Bucket, + ctx: &ReqCtx, size: u64, prev_object: Option<&Object>, ) -> Result<(), Error> { - let quotas = bucket.state.as_option().unwrap().quotas.get(); + let ReqCtx { + garage, + bucket_id, + bucket_params, + .. + } = ctx; + + let quotas = bucket_params.quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { return Ok(()); }; @@ -248,7 +244,7 @@ pub(crate) async fn check_quotas( let counters = garage .object_counter_table .table - .get(&bucket.id, &EmptyKey) + .get(bucket_id, &EmptyKey) .await?; let counters = counters @@ -292,7 +288,7 @@ pub(crate) async fn check_quotas( } pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, first_block: Bytes, @@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let offset = written_bytes; written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( - garage, + ctx, version, part_number, offset, @@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + } async fn put_block_and_meta( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, offset: u64, @@ -455,6 +451,8 @@ async fn put_block_and_meta( block: Bytes, order_tag: OrderTag, ) -> Result<(), GarageError> { + let ReqCtx { garage, .. } = ctx; + let mut version = version.clone(); version.blocks.put( VersionBlockKey { diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 1c1dbf20..6af55677 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,5 +1,4 @@ use quick_xml::de::from_reader; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -12,15 +11,11 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::*; -use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - if let Some(website) = param.website_config.get() { +pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { bucket_params, .. } = ctx; + if let Some(website) = bucket_params.website_config.get() { let wc = WebsiteConfiguration { xmlns: (), error_document: website.error_document.as_ref().map(|v| Key { @@ -44,16 +39,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Er } } -pub async fn handle_delete_website( - garage: Arc<Garage>, - mut bucket: Bucket, -) -> Result<Response<ResBody>, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.website_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.website_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -61,28 +58,33 @@ pub async fn handle_delete_website( } pub async fn handle_put_website( - garage: Arc<Garage>, - mut bucket: Bucket, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - param + bucket_params .website_config .update(Some(conf.into_garage_website_config()?)); - garage.bucket_table.insert(&bucket).await?; + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) |