From 0650a43cf14e7e52121a553130a9ea6c92b7bd4a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 16:48:51 +0100 Subject: PutObject: better cleanup on Drop (incl. when request is interrupted in the middle) --- src/api/s3/put.rs | 93 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 59 insertions(+), 34 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 97b8e4e3..c08fe40a 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -119,6 +119,17 @@ pub(crate) async fn save_stream> + Unpin>( return Ok((version_uuid, data_md5sum_hex)); } + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(( + garage.clone(), + bucket.id, + key.into(), + version_uuid, + version_timestamp, + ))); + // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { @@ -139,44 +150,27 @@ pub(crate) async fn save_stream> + Unpin>( // Transfer data and verify checksum let first_block_hash = async_blake2sum(first_block.clone()).await; - let tx_result = (|| async { - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - check_quotas(&garage, bucket, key, total_size).await?; + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; - Ok((total_size, data_md5sum)) - })() - .await; + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; - // If something went wrong, clean up - let (total_size, md5sum_arr) = match tx_result { - Ok(rv) => rv, - Err(e) => { - // Mark object as aborted, this will free the blocks further down - object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - return Err(e); - } - }; + check_quotas(&garage, bucket, key, total_size).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(md5sum_arr); + let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -188,6 +182,10 @@ pub(crate) async fn save_stream> + Unpin>( let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + Ok((version_uuid, md5sum_hex)) } @@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { .unwrap() } +struct InterruptedCleanup(Option<(Arc, Uuid, String, Uuid, u64)>); + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + tokio::spawn(async move { + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_ts, + state: ObjectVersionState::Aborted, + }; + let object = Object::new(bucket_id, key, vec![object_version]); + if let Err(e) = garage.object_table.insert(&object).await { + warn!("Cannot cleanup after aborted PutObject: {}", e); + } + }); + } + } +} + +// ---- + pub async fn handle_create_multipart_upload( garage: Arc, req: &Request, -- cgit v1.2.3 From 36944f1839b27d0c60feadbe15e1d91ad9b74538 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 19:14:07 +0000 Subject: Cargo.toml: Updated base64 from 0.13 to 0.21. --- src/api/s3/list.rs | 11 +++++++---- src/api/s3/post_object.rs | 5 ++++- src/api/s3/put.rs | 3 ++- 3 files changed, 13 insertions(+), 6 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index e5f486c8..5cb0d65a 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; +use base64::prelude::*; use hyper::{Body, Response}; use garage_util::data::*; @@ -129,11 +130,11 @@ pub async fn handle_list( next_continuation_token: match (query.is_v2, &pagination) { (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!( "]{}", - base64::encode(key.as_bytes()) + BASE64_STANDARD.encode(key.as_bytes()) ))), (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!( "[{}", - base64::encode(key.as_bytes()) + BASE64_STANDARD.encode(key.as_bytes()) ))), _ => None, }, @@ -583,14 +584,16 @@ impl ListObjectsQuery { (Some(token), _) => match &token[..1] { "[" => Ok(RangeBegin::IncludingKey { key: String::from_utf8( - base64::decode(token[1..].as_bytes()) + BASE64_STANDARD + .decode(token[1..].as_bytes()) .ok_or_bad_request("Invalid continuation token")?, )?, fallback_key: None, }), "]" => Ok(RangeBegin::AfterKey { key: String::from_utf8( - base64::decode(token[1..].as_bytes()) + BASE64_STANDARD + .decode(token[1..].as_bytes()) .ok_or_bad_request("Invalid continuation token")?, )?, }), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index d063faa4..da542526 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -4,6 +4,7 @@ use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; +use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; use futures::{Stream, StreamExt}; @@ -138,7 +139,9 @@ pub async fn handle_post_object( .get_existing_bucket(bucket_id) .await?; - let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; + let decoded_policy = BASE64_STANDARD + .decode(&policy) + .ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c08fe40a..350ab884 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; @@ -207,7 +208,7 @@ fn ensure_checksum_matches( } } if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { + if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { return Err(Error::bad_request("Unable to validate content-md5")); } else { trace!("Successfully validated content-md5"); -- cgit v1.2.3 From 8e93d6997415d60ba5c371da8b27065a57254a8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 26 Jan 2023 17:26:32 +0100 Subject: More clippy fixes --- src/api/s3/bucket.rs | 2 +- src/api/s3/post_object.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 8471385f..733981e1 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -305,7 +305,7 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option> { let mut ret = None; for item in cbc.children() { if item.has_tag_name("LocationConstraint") { - if ret != None { + if ret.is_some() { return None; } ret = Some(item.text()?.to_string()); diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index da542526..f2098ab0 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -140,7 +140,7 @@ pub async fn handle_post_object( .await?; let decoded_policy = BASE64_STANDARD - .decode(&policy) + .decode(policy) .ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; -- cgit v1.2.3 From 1b6ec74748f1182fbfb9b4ce934351b000ccab22 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 2 Feb 2023 16:16:12 +0000 Subject: error.rs: Corrected error messages to say unexpected scope. --- src/api/s3/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api/s3') diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 67009d63..c50cff9f 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -21,7 +21,7 @@ pub enum Error { // Category: cannot process /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), /// The object requested don't exists -- cgit v1.2.3