diff options
author | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
commit | fa78d806e3ae40031e80eebb86e4eb1756d7baea (patch) | |
tree | 144662fb430c484093f6f9a585a2441c2ff26494 /src/api/s3/put.rs | |
parent | 654999e254e6c1f46bb5d668bc1230f226575716 (diff) | |
parent | a16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff) | |
download | garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 96 |
1 files changed, 61 insertions, 35 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 97b8e4e3..350ab884 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; @@ -119,6 +120,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( return Ok((version_uuid, data_md5sum_hex)); } + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(( + garage.clone(), + bucket.id, + key.into(), + version_uuid, + version_timestamp, + ))); + // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { @@ -139,44 +151,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let first_block_hash = async_blake2sum(first_block.clone()).await; - let tx_result = (|| async { - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - check_quotas(&garage, bucket, key, total_size).await?; + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; - Ok((total_size, data_md5sum)) - })() - .await; + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; - // If something went wrong, clean up - let (total_size, md5sum_arr) = match tx_result { - Ok(rv) => rv, - Err(e) => { - // Mark object as aborted, this will free the blocks further down - object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - return Err(e); - } - }; + check_quotas(&garage, bucket, key, total_size).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(md5sum_arr); + let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -188,6 +183,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + Ok((version_uuid, md5sum_hex)) } @@ -209,7 +208,7 @@ fn ensure_checksum_matches( } } if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { + if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { return Err(Error::bad_request("Unable to validate content-md5")); } else { trace!("Successfully validated content-md5"); @@ -426,6 +425,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { .unwrap() } +struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + tokio::spawn(async move { + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_ts, + state: ObjectVersionState::Aborted, + }; + let object = Object::new(bucket_id, key, vec![object_version]); + if let Err(e) = garage.object_table.insert(&object).await { + warn!("Cannot cleanup after aborted PutObject: {}", e); + } + }); + } + } +} + +// ---- + pub async fn handle_create_multipart_upload( garage: Arc<Garage>, req: &Request<Body>, |