aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs52
1 files changed, 48 insertions, 4 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 0e82f3d0..7df0dafc 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -96,18 +96,27 @@ pub async fn handle_put_part(
let first_block = first_block.ok_or_bad_request("Empty body")?;
// Calculate part identity: timestamp, version id
- let version_id = gen_uuid();
+ let version_uuid = gen_uuid();
let mpu_part_key = MpuPartKey {
part_number,
timestamp: mpu.next_timestamp(part_number),
};
+ // The following consists in many steps that can each fail.
+ // Keep track that some cleanup will be needed if things fail
+ // before everything is finished (cleanup is done using the Drop trait).
+ let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
+ garage: garage.clone(),
+ upload_id,
+ version_uuid,
+ }));
+
// Create version and link version from MPU
mpu.parts.clear();
mpu.parts.put(
mpu_part_key,
MpuPart {
- version: version_id,
+ version: version_uuid,
etag: None,
size: None,
},
@@ -115,7 +124,7 @@ pub async fn handle_put_part(
garage.mpu_table.insert(&mpu).await?;
let version = Version::new(
- version_id,
+ version_uuid,
VersionBacklink::MultipartUpload { upload_id },
false,
);
@@ -147,13 +156,17 @@ pub async fn handle_put_part(
mpu.parts.put(
mpu_part_key,
MpuPart {
- version: version_id,
+ version: version_uuid,
etag: Some(data_md5sum_hex.clone()),
size: Some(total_size),
},
);
garage.mpu_table.insert(&mpu).await?;
+ // We were not interrupted, everything went fine.
+ // We won't have to clean up on drop.
+ interrupted_cleanup.cancel();
+
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::empty())
@@ -161,6 +174,37 @@ pub async fn handle_put_part(
Ok(response)
}
+struct InterruptedCleanup(Option<InterruptedCleanupInner>);
+struct InterruptedCleanupInner {
+ garage: Arc<Garage>,
+ upload_id: Uuid,
+ version_uuid: Uuid,
+}
+
+impl InterruptedCleanup {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+impl Drop for InterruptedCleanup {
+ fn drop(&mut self) {
+ if let Some(info) = self.0.take() {
+ tokio::spawn(async move {
+ let version = Version::new(
+ info.version_uuid,
+ VersionBacklink::MultipartUpload {
+ upload_id: info.upload_id,
+ },
+ true,
+ );
+ if let Err(e) = info.garage.version_table.insert(&version).await {
+ warn!("Cannot cleanup after aborted UploadPart: {}", e);
+ }
+ });
+ }
+ }
+}
+
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
req: Request<Body>,