aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/s3/put.rs93
1 files changed, 59 insertions, 34 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 97b8e4e3..c08fe40a 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -119,6 +119,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
return Ok((version_uuid, data_md5sum_hex));
}
+ // The following consists in many steps that can each fail.
+ // Keep track that some cleanup will be needed if things fail
+ // before everything is finished (cleanup is done using the Drop trait).
+ let mut interrupted_cleanup = InterruptedCleanup(Some((
+ garage.clone(),
+ bucket.id,
+ key.into(),
+ version_uuid,
+ version_timestamp,
+ )));
+
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
@@ -139,44 +150,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let first_block_hash = async_blake2sum(first_block.clone()).await;
- let tx_result = (|| async {
- let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
- &garage,
- &version,
- 1,
- first_block,
- first_block_hash,
- &mut chunker,
- )
- .await?;
-
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
-
- check_quotas(&garage, bucket, key, total_size).await?;
+ let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
+ &garage,
+ &version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
- Ok((total_size, data_md5sum))
- })()
- .await;
+ ensure_checksum_matches(
+ data_md5sum.as_slice(),
+ data_sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )?;
- // If something went wrong, clean up
- let (total_size, md5sum_arr) = match tx_result {
- Ok(rv) => rv,
- Err(e) => {
- // Mark object as aborted, this will free the blocks further down
- object_version.state = ObjectVersionState::Aborted;
- let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
- garage.object_table.insert(&object).await?;
- return Err(e);
- }
- };
+ check_quotas(&garage, bucket, key, total_size).await?;
// Save final object state, marked as Complete
- let md5sum_hex = hex::encode(md5sum_arr);
+ let md5sum_hex = hex::encode(data_md5sum);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
@@ -188,6 +182,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // We were not interrupted, everything went fine.
+ // We won't have to clean up on drop.
+ interrupted_cleanup.cancel();
+
Ok((version_uuid, md5sum_hex))
}
@@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
+struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
+
+impl InterruptedCleanup {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+impl Drop for InterruptedCleanup {
+ fn drop(&mut self) {
+ if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
+ tokio::spawn(async move {
+ let object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: version_ts,
+ state: ObjectVersionState::Aborted,
+ };
+ let object = Object::new(bucket_id, key, vec![object_version]);
+ if let Err(e) = garage.object_table.insert(&object).await {
+ warn!("Cannot cleanup after aborted PutObject: {}", e);
+ }
+ });
+ }
+ }
+}
+
+// ----
+
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,