diff options
-rw-r--r-- | src/api/s3/put.rs | 93 |
1 files changed, 59 insertions, 34 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 97b8e4e3..c08fe40a 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -119,6 +119,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( return Ok((version_uuid, data_md5sum_hex)); } + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(( + garage.clone(), + bucket.id, + key.into(), + version_uuid, + version_timestamp, + ))); + // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { @@ -139,44 +150,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let first_block_hash = async_blake2sum(first_block.clone()).await; - let tx_result = (|| async { - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - check_quotas(&garage, bucket, key, total_size).await?; + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; - Ok((total_size, data_md5sum)) - })() - .await; + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; - // If something went wrong, clean up - let (total_size, md5sum_arr) = match tx_result { - Ok(rv) => rv, - Err(e) => { - // Mark object as aborted, this will free the blocks further down - object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - return Err(e); - } - }; + check_quotas(&garage, bucket, key, total_size).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(md5sum_arr); + let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -188,6 +182,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + Ok((version_uuid, md5sum_hex)) } @@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { .unwrap() } +struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + tokio::spawn(async move { + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_ts, + state: ObjectVersionState::Aborted, + }; + let object = Object::new(bucket_id, key, vec![object_version]); + if let Err(e) = garage.object_table.insert(&object).await { + warn!("Cannot cleanup after aborted PutObject: {}", e); + } + }); + } + } +} + +// ---- + pub async fn handle_create_multipart_upload( garage: Arc<Garage>, req: &Request<Body>, |