aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-06-06 15:18:45 +0200
committerAlex Auvolat <alex@adnab.me>2023-06-09 16:23:37 +0200
commita6cc563bdd1caab11892f9b7a2f538a2f33e375b (patch)
tree5815f89d716bf700db346c174ab90c4d28ab29c9 /src/api/s3/put.rs
parentc14d3735e5514c395a691a2ab4bb93aef57035e2 (diff)
downloadgarage-a6cc563bdd1caab11892f9b7a2f538a2f33e375b.tar.gz
garage-a6cc563bdd1caab11892f9b7a2f538a2f33e375b.zip
UploadPart: automatic cleanup of version (and reference blocked) when interrupted
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 804e1087..c7ac5030 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -121,13 +121,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait).
- let mut interrupted_cleanup = InterruptedCleanup(Some((
- garage.clone(),
- bucket.id,
- key.into(),
+ let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
+ garage: garage.clone(),
+ bucket_id: bucket.id,
+ key: key.into(),
version_uuid,
version_timestamp,
- )));
+ }));
// Write version identifier in object table so that we have a trace
// that we are uploading something
@@ -433,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
-struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
+struct InterruptedCleanup(Option<InterruptedCleanupInner>);
+struct InterruptedCleanupInner {
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ key: String,
+ version_uuid: Uuid,
+ version_timestamp: u64,
+}
impl InterruptedCleanup {
fn cancel(&mut self) {
@@ -442,15 +449,15 @@ impl InterruptedCleanup {
}
impl Drop for InterruptedCleanup {
fn drop(&mut self) {
- if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
+ if let Some(info) = self.0.take() {
tokio::spawn(async move {
let object_version = ObjectVersion {
- uuid: version_uuid,
- timestamp: version_ts,
+ uuid: info.version_uuid,
+ timestamp: info.version_timestamp,
state: ObjectVersionState::Aborted,
};
- let object = Object::new(bucket_id, key, vec![object_version]);
- if let Err(e) = garage.object_table.insert(&object).await {
+ let object = Object::new(info.bucket_id, info.key, vec![object_version]);
+ if let Err(e) = info.garage.object_table.insert(&object).await {
warn!("Cannot cleanup after aborted PutObject: {}", e);
}
});