aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-06-06 15:18:45 +0200
committerAlex Auvolat <alex@adnab.me>2023-06-09 16:23:37 +0200
commita6cc563bdd1caab11892f9b7a2f538a2f33e375b (patch)
tree5815f89d716bf700db346c174ab90c4d28ab29c9 /src
parentc14d3735e5514c395a691a2ab4bb93aef57035e2 (diff)
downloadgarage-a6cc563bdd1caab11892f9b7a2f538a2f33e375b.tar.gz
garage-a6cc563bdd1caab11892f9b7a2f538a2f33e375b.zip
UploadPart: automatic cleanup of version (and reference blocked) when interrupted
Diffstat (limited to 'src')
-rw-r--r--src/api/s3/multipart.rs52
-rw-r--r--src/api/s3/put.rs29
2 files changed, 66 insertions, 15 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 0e82f3d0..7df0dafc 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -96,18 +96,27 @@ pub async fn handle_put_part(
let first_block = first_block.ok_or_bad_request("Empty body")?;
// Calculate part identity: timestamp, version id
- let version_id = gen_uuid();
+ let version_uuid = gen_uuid();
let mpu_part_key = MpuPartKey {
part_number,
timestamp: mpu.next_timestamp(part_number),
};
+ // The following consists in many steps that can each fail.
+ // Keep track that some cleanup will be needed if things fail
+ // before everything is finished (cleanup is done using the Drop trait).
+ let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
+ garage: garage.clone(),
+ upload_id,
+ version_uuid,
+ }));
+
// Create version and link version from MPU
mpu.parts.clear();
mpu.parts.put(
mpu_part_key,
MpuPart {
- version: version_id,
+ version: version_uuid,
etag: None,
size: None,
},
@@ -115,7 +124,7 @@ pub async fn handle_put_part(
garage.mpu_table.insert(&mpu).await?;
let version = Version::new(
- version_id,
+ version_uuid,
VersionBacklink::MultipartUpload { upload_id },
false,
);
@@ -147,13 +156,17 @@ pub async fn handle_put_part(
mpu.parts.put(
mpu_part_key,
MpuPart {
- version: version_id,
+ version: version_uuid,
etag: Some(data_md5sum_hex.clone()),
size: Some(total_size),
},
);
garage.mpu_table.insert(&mpu).await?;
+ // We were not interrupted, everything went fine.
+ // We won't have to clean up on drop.
+ interrupted_cleanup.cancel();
+
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::empty())
@@ -161,6 +174,37 @@ pub async fn handle_put_part(
Ok(response)
}
+struct InterruptedCleanup(Option<InterruptedCleanupInner>);
+struct InterruptedCleanupInner {
+ garage: Arc<Garage>,
+ upload_id: Uuid,
+ version_uuid: Uuid,
+}
+
+impl InterruptedCleanup {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+impl Drop for InterruptedCleanup {
+ fn drop(&mut self) {
+ if let Some(info) = self.0.take() {
+ tokio::spawn(async move {
+ let version = Version::new(
+ info.version_uuid,
+ VersionBacklink::MultipartUpload {
+ upload_id: info.upload_id,
+ },
+ true,
+ );
+ if let Err(e) = info.garage.version_table.insert(&version).await {
+ warn!("Cannot cleanup after aborted UploadPart: {}", e);
+ }
+ });
+ }
+ }
+}
+
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
req: Request<Body>,
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 804e1087..c7ac5030 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -121,13 +121,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait).
- let mut interrupted_cleanup = InterruptedCleanup(Some((
- garage.clone(),
- bucket.id,
- key.into(),
+ let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
+ garage: garage.clone(),
+ bucket_id: bucket.id,
+ key: key.into(),
version_uuid,
version_timestamp,
- )));
+ }));
// Write version identifier in object table so that we have a trace
// that we are uploading something
@@ -433,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
-struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
+struct InterruptedCleanup(Option<InterruptedCleanupInner>);
+struct InterruptedCleanupInner {
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ key: String,
+ version_uuid: Uuid,
+ version_timestamp: u64,
+}
impl InterruptedCleanup {
fn cancel(&mut self) {
@@ -442,15 +449,15 @@ impl InterruptedCleanup {
}
impl Drop for InterruptedCleanup {
fn drop(&mut self) {
- if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
+ if let Some(info) = self.0.take() {
tokio::spawn(async move {
let object_version = ObjectVersion {
- uuid: version_uuid,
- timestamp: version_ts,
+ uuid: info.version_uuid,
+ timestamp: info.version_timestamp,
state: ObjectVersionState::Aborted,
};
- let object = Object::new(bucket_id, key, vec![object_version]);
- if let Err(e) = garage.object_table.insert(&object).await {
+ let object = Object::new(info.bucket_id, info.key, vec![object_version]);
+ if let Err(e) = info.garage.object_table.insert(&object).await {
warn!("Cannot cleanup after aborted PutObject: {}", e);
}
});