diff options
-rw-r--r-- | src/api/s3/put.rs | 93 | ||||
-rw-r--r-- | src/block/manager.rs | 36 |
2 files changed, 91 insertions, 38 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 97b8e4e3..c08fe40a 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -119,6 +119,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( return Ok((version_uuid, data_md5sum_hex)); } + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(( + garage.clone(), + bucket.id, + key.into(), + version_uuid, + version_timestamp, + ))); + // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { @@ -139,44 +150,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let first_block_hash = async_blake2sum(first_block.clone()).await; - let tx_result = (|| async { - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - check_quotas(&garage, bucket, key, total_size).await?; + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; - Ok((total_size, data_md5sum)) - })() - .await; + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; - // If something went wrong, clean up - let (total_size, md5sum_arr) = match tx_result { - Ok(rv) => rv, - Err(e) => { - // Mark object as aborted, this will free the blocks further down - object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - return Err(e); - } - }; + check_quotas(&garage, bucket, key, total_size).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(md5sum_arr); + let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -188,6 +182,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + Ok((version_uuid, md5sum_hex)) } @@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { .unwrap() } +struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + tokio::spawn(async move { + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_ts, + state: ObjectVersionState::Aborted, + }; + let object = Object::new(bucket_id, key, vec![object_version]); + if let Err(e) = garage.object_table.insert(&object).await { + warn!("Cannot cleanup after aborted PutObject: {}", e); + } + }); + } + } +} + +// ---- + pub async fn handle_create_multipart_upload( garage: Arc<Garage>, req: &Request<Body>, diff --git a/src/block/manager.rs b/src/block/manager.rs index 19841d64..1655be06 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -6,6 +6,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use futures::Stream; @@ -676,14 +677,21 @@ impl BlockManagerLocked { } }; - let mut path2 = path.clone(); - path2.set_extension("tmp"); - let mut f = fs::File::create(&path2).await?; + let mut path_tmp = path.clone(); + let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); + path_tmp.set_extension(tmp_extension); + + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); + + let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; f.sync_all().await?; drop(f); - fs::rename(path2, path).await?; + fs::rename(path_tmp, path).await?; + + delete_on_drop.cancel(); + if let Some(to_delete) = to_delete { fs::remove_file(to_delete).await?; } @@ -749,3 +757,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> { .concat() .into()) } + +struct DeleteOnDrop(Option<PathBuf>); + +impl DeleteOnDrop { + fn cancel(&mut self) { + drop(self.0.take()); + } +} + +impl Drop for DeleteOnDrop { + fn drop(&mut self) { + if let Some(path) = self.0.take() { + tokio::spawn(async move { + if let Err(e) = fs::remove_file(&path).await { + debug!("DeleteOnDrop failed for {}: {}", path.display(), e); + } + }); + } + } +} |