From a6cc563bdd1caab11892f9b7a2f538a2f33e375b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Jun 2023 15:18:45 +0200 Subject: UploadPart: automatic cleanup of version (and reference blocked) when interrupted --- src/api/s3/put.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 804e1087..c7ac5030 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -121,13 +121,13 @@ pub(crate) async fn save_stream> + Unpin>( // The following consists in many steps that can each fail. // Keep track that some cleanup will be needed if things fail // before everything is finished (cleanup is done using the Drop trait). - let mut interrupted_cleanup = InterruptedCleanup(Some(( - garage.clone(), - bucket.id, - key.into(), + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + bucket_id: bucket.id, + key: key.into(), version_uuid, version_timestamp, - ))); + })); // Write version identifier in object table so that we have a trace // that we are uploading something @@ -433,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { .unwrap() } -struct InterruptedCleanup(Option<(Arc, Uuid, String, Uuid, u64)>); +struct InterruptedCleanup(Option); +struct InterruptedCleanupInner { + garage: Arc, + bucket_id: Uuid, + key: String, + version_uuid: Uuid, + version_timestamp: u64, +} impl InterruptedCleanup { fn cancel(&mut self) { @@ -442,15 +449,15 @@ impl InterruptedCleanup { } impl Drop for InterruptedCleanup { fn drop(&mut self) { - if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + if let Some(info) = self.0.take() { tokio::spawn(async move { let object_version = ObjectVersion { - uuid: version_uuid, - timestamp: version_ts, + uuid: info.version_uuid, + timestamp: info.version_timestamp, state: ObjectVersionState::Aborted, }; - let object = Object::new(bucket_id, key, vec![object_version]); - if let Err(e) = garage.object_table.insert(&object).await { + let object = Object::new(info.bucket_id, info.key, vec![object_version]); + if let Err(e) = info.garage.object_table.insert(&object).await { warn!("Cannot cleanup after aborted PutObject: {}", e); } }); -- cgit v1.2.3