diff options
-rw-r--r-- | src/api/s3/multipart.rs | 52 | ||||
-rw-r--r-- | src/api/s3/put.rs | 29 |
2 files changed, 66 insertions, 15 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 0e82f3d0..7df0dafc 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -96,18 +96,27 @@ pub async fn handle_put_part( let first_block = first_block.ok_or_bad_request("Empty body")?; // Calculate part identity: timestamp, version id - let version_id = gen_uuid(); + let version_uuid = gen_uuid(); let mpu_part_key = MpuPartKey { part_number, timestamp: mpu.next_timestamp(part_number), }; + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + upload_id, + version_uuid, + })); + // Create version and link version from MPU mpu.parts.clear(); mpu.parts.put( mpu_part_key, MpuPart { - version: version_id, + version: version_uuid, etag: None, size: None, }, @@ -115,7 +124,7 @@ pub async fn handle_put_part( garage.mpu_table.insert(&mpu).await?; let version = Version::new( - version_id, + version_uuid, VersionBacklink::MultipartUpload { upload_id }, false, ); @@ -147,13 +156,17 @@ pub async fn handle_put_part( mpu.parts.put( mpu_part_key, MpuPart { - version: version_id, + version: version_uuid, etag: Some(data_md5sum_hex.clone()), size: Some(total_size), }, ); garage.mpu_table.insert(&mpu).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) .body(Body::empty()) @@ -161,6 +174,37 @@ pub async fn handle_put_part( Ok(response) } +struct InterruptedCleanup(Option<InterruptedCleanupInner>); +struct InterruptedCleanupInner { + garage: Arc<Garage>, + upload_id: Uuid, + version_uuid: Uuid, +} + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some(info) = self.0.take() { + tokio::spawn(async move { + let version = Version::new( + info.version_uuid, + VersionBacklink::MultipartUpload { + upload_id: info.upload_id, + }, + true, + ); + if let Err(e) = info.garage.version_table.insert(&version).await { + warn!("Cannot cleanup after aborted UploadPart: {}", e); + } + }); + } + } +} + pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, req: Request<Body>, diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 804e1087..c7ac5030 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -121,13 +121,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // The following consists in many steps that can each fail. // Keep track that some cleanup will be needed if things fail // before everything is finished (cleanup is done using the Drop trait). - let mut interrupted_cleanup = InterruptedCleanup(Some(( - garage.clone(), - bucket.id, - key.into(), + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + bucket_id: bucket.id, + key: key.into(), version_uuid, version_timestamp, - ))); + })); // Write version identifier in object table so that we have a trace // that we are uploading something @@ -433,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { .unwrap() } -struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); +struct InterruptedCleanup(Option<InterruptedCleanupInner>); +struct InterruptedCleanupInner { + garage: Arc<Garage>, + bucket_id: Uuid, + key: String, + version_uuid: Uuid, + version_timestamp: u64, +} impl InterruptedCleanup { fn cancel(&mut self) { @@ -442,15 +449,15 @@ impl InterruptedCleanup { } impl Drop for InterruptedCleanup { fn drop(&mut self) { - if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + if let Some(info) = self.0.take() { tokio::spawn(async move { let object_version = ObjectVersion { - uuid: version_uuid, - timestamp: version_ts, + uuid: info.version_uuid, + timestamp: info.version_timestamp, state: ObjectVersionState::Aborted, }; - let object = Object::new(bucket_id, key, vec![object_version]); - if let Err(e) = garage.object_table.insert(&object).await { + let object = Object::new(info.bucket_id, info.key, vec![object_version]); + if let Err(e) = info.garage.object_table.insert(&object).await { warn!("Cannot cleanup after aborted PutObject: {}", e); } }); |