From 38d6ac429506f9f488ac522581b12fa530442a59 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 27 Apr 2023 17:57:54 +0200 Subject: New multipart upload table layout --- src/model/s3/mod.rs | 1 + src/model/s3/mpu_table.rs | 231 ++++++++++++++++++++++++++++++++++++++++++ src/model/s3/object_table.rs | 143 +++++++++++++++++++++++--- src/model/s3/version_table.rs | 85 ++++++++++++---- 4 files changed, 429 insertions(+), 31 deletions(-) create mode 100644 src/model/s3/mpu_table.rs (limited to 'src/model/s3') diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 4e94337d..36d67093 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -1,3 +1,4 @@ pub mod block_ref_table; +pub mod mpu_table; pub mod object_table; pub mod version_table; diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs new file mode 100644 index 00000000..dc5b5a82 --- /dev/null +++ b/src/model/s3/mpu_table.rs @@ -0,0 +1,231 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::index_counter::*; +use crate::s3::version_table::*; + +pub const UPLOADS: &str = "uploads"; +pub const PARTS: &str = "parts"; +pub const BYTES: &str = "bytes"; + +mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + pub use crate::s3::version_table::v09::VersionBlock; + + /// A part of a multipart upload + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MultipartUpload { + /// Partition key = Upload id = UUID of the object version + pub upload_id: Uuid, + + /// Is this multipart upload deleted + pub deleted: crdt::Bool, + /// List of uploaded parts, key = (part number, timestamp) + /// In case of retries, all versions for each part are kept + /// Everything is cleaned up only once the multipart upload is completed or + /// aborted + pub parts: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct MpuPartKey { + /// Number of the part + pub part_number: u64, + /// Timestamp of part upload + pub timestamp: u64, + } + + /// The version of an uploaded part + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MpuPart { + /// Links to a Version in VersionTable + pub version: Uuid, + /// ETag of the content of this part (known only once done uploading) + pub etag: Option, + /// Size of this part (known only once done uploading) + pub size: Option, + } + + impl garage_util::migrate::InitialFormat for MultipartUpload { + const VERSION_MARKER: &'static [u8] = b"G09s3mpu"; + } +} + +pub use v09::*; + +impl Ord for MpuPartKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.timestamp.cmp(&other.timestamp)) + } +} + +impl PartialOrd for MpuPartKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl MultipartUpload { + pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + Self { + upload_id, + deleted: crdt::Bool::new(deleted), + parts: crdt::Map::new(), + bucket_id, + key, + } + } +} + +impl Entry for MultipartUpload { + fn partition_key(&self) -> &Uuid { + &self.upload_id + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for MultipartUpload { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.parts.clear(); + } else { + self.parts.merge(&other.parts); + } + } +} + +impl Crdt for MpuPart { + fn merge(&mut self, other: &Self) { + self.etag = match (self.etag.take(), &other.etag) { + (None, Some(_)) => other.etag.clone(), + (Some(x), Some(y)) if x < *y => other.etag.clone(), + (x, _) => x, + }; + self.size = match (self.size, other.size) { + (None, Some(_)) => other.size, + (Some(x), Some(y)) if x < y => other.size, + (x, _) => x, + }; + } +} + +pub struct MultipartUploadTable { + pub version_table: Arc>, + pub mpu_counter_table: Arc>, +} + +impl TableSchema for MultipartUploadTable { + const TABLE_NAME: &'static str = "multipart_upload"; + + type P = Uuid; + type S = EmptyKey; + type E = MultipartUpload; + type Filter = DeletedFilter; + + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.mpu_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update multipart object part counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Propagate deletions to version table + if let (Some(old_mpu), Some(new_mpu)) = (old, new) { + if new_mpu.deleted.get() && !old_mpu.deleted.get() { + let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| { + Version::new( + p.version, + VersionBacklink::MultipartUpload { + upload_id: old_mpu.upload_id, + }, + true, + ) + }); + for version in deleted_versions { + let res = self.version_table.queue_insert(tx, &version); + if let Err(e) = db::unabort(res)? { + error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e); + } + } + } + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.is_tombstone()) + } +} + +impl CountedItem for MultipartUpload { + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let uploads = if self.deleted.get() { 0 } else { 1 }; + let mut parts = self + .parts + .items() + .iter() + .map(|(k, _)| k.part_number) + .collect::>(); + parts.dedup(); + let bytes = self + .parts + .items() + .iter() + .map(|(_, p)| p.size.unwrap_or(0)) + .sum::(); + vec![ + (UPLOADS, uploads), + (PARTS, parts.len() as i64), + (BYTES, bytes as i64), + ] + } +} diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 518acc95..a69069b2 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::index_counter::*; +use crate::s3::mpu_table::*; use crate::s3::version_table::*; pub const OBJECTS: &str = "objects"; @@ -130,7 +131,86 @@ mod v08 { } } -pub use v08::*; +mod v09 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v08; + + pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta}; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading { + /// Indicates whether this is a multipart upload + multipart: bool, + /// Headers to be included in the final object + headers: ObjectVersionHeaders, + }, + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + impl garage_util::migrate::Migrate for Object { + const VERSION_MARKER: &'static [u8] = b"G09s3o"; + + type Previous = v08::Object; + + fn migrate(old: v08::Object) -> Object { + let versions = old + .versions + .into_iter() + .map(|x| ObjectVersion { + uuid: x.uuid, + timestamp: x.timestamp, + state: match x.state { + v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading { + multipart: false, + headers: h, + }, + v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d), + v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + }) + .collect(); + Object { + bucket_id: old.bucket_id, + key: old.key, + versions, + } + } + } +} + +pub use v09::*; impl Object { /// Initialize an Object struct from parts @@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState { Complete(a) => { a.merge(b); } - Uploading(_) => { + Uploading { .. } => { *self = Complete(b.clone()); } }, - Uploading(_) => {} + Uploading { .. } => {} } } } @@ -199,8 +279,17 @@ impl ObjectVersion { } /// Is the object version currently being uploaded - pub fn is_uploading(&self) -> bool { - matches!(self.state, ObjectVersionState::Uploading(_)) + /// + /// matches only multipart uploads if check_multipart is Some(true) + /// matches only non-multipart uploads if check_multipart is Some(false) + /// matches both if check_multipart is None + pub fn is_uploading(&self, check_multipart: Option) -> bool { + match &self.state { + ObjectVersionState::Uploading { multipart, .. } => { + check_multipart.map(|x| x == *multipart).unwrap_or(true) + } + _ => false, + } } /// Is the object version completely received @@ -267,13 +356,20 @@ impl Crdt for Object { pub struct ObjectTable { pub version_table: Arc>, + pub mpu_table: Arc>, pub object_counter_table: Arc>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub enum ObjectFilter { + /// Is the object version available (received and not a tombstone) IsData, - IsUploading, + /// Is the object version currently being uploaded + /// + /// matches only multipart uploads if check_multipart is Some(true) + /// matches only non-multipart uploads if check_multipart is Some(false) + /// matches both if check_multipart is None + IsUploading { check_multipart: Option }, } impl TableSchema for ObjectTable { @@ -314,8 +410,29 @@ impl TableSchema for ObjectTable { } }; if newly_deleted { - let deleted_version = - Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + if let ObjectVersionState::Uploading { + multipart: true, .. + } = &v.state + { + let deleted_mpu = + MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.mpu_table.queue_insert(tx, &deleted_mpu); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", + e + ); + } + } + + let deleted_version = Version::new( + v.uuid, + VersionBacklink::Object { + bucket_id: old_v.bucket_id, + key: old_v.key.clone(), + }, + true, + ); let res = self.version_table.queue_insert(tx, &deleted_version); if let Err(e) = db::unabort(res)? { error!( @@ -333,7 +450,10 @@ impl TableSchema for ObjectTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { match filter { ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), - ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), + ObjectFilter::IsUploading { check_multipart } => entry + .versions + .iter() + .any(|v| v.is_uploading(*check_multipart)), } } } @@ -360,10 +480,7 @@ impl CountedItem for Object { } else { 0 }; - let n_unfinished_uploads = versions - .iter() - .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) - .count(); + let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count(); let n_bytes = versions .iter() .map(|v| match &v.state { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6edc83f4..6cf1cc75 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -66,6 +66,8 @@ mod v08 { use super::v05; + pub use v05::{VersionBlock, VersionBlockKey}; + /// A version of an object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { @@ -90,8 +92,6 @@ mod v08 { pub key: String, } - pub use v05::{VersionBlock, VersionBlockKey}; - impl garage_util::migrate::Migrate for Version { type Previous = v05::Version; @@ -110,32 +110,83 @@ mod v08 { } } -pub use v08::*; +pub(crate) mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v08; + + pub use v08::{VersionBlock, VersionBlockKey}; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + pub backlink: VersionBacklink, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum VersionBacklink { + Object { + /// Bucket in which the related object is stored + bucket_id: Uuid, + /// Key in which the related object is stored + key: String, + }, + MultipartUpload { + upload_id: Uuid, + }, + } + + impl garage_util::migrate::Migrate for Version { + const VERSION_MARKER: &'static [u8] = b"G09s3v"; + + type Previous = v08::Version; + + fn migrate(old: v08::Version) -> Version { + Version { + uuid: old.uuid, + deleted: old.deleted, + blocks: old.blocks, + backlink: VersionBacklink::Object { + bucket_id: old.bucket_id, + key: old.key, + }, + } + } + } +} + +pub use v09::*; impl Version { - pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self { Self { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), - parts_etags: crdt::Map::new(), - bucket_id, - key, + backlink, } } pub fn has_part_number(&self, part_number: u64) -> bool { - let case1 = self - .parts_etags - .items() - .binary_search_by(|(k, _)| k.cmp(&part_number)) - .is_ok(); - let case2 = self - .blocks + self.blocks .items() .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) - .is_ok(); - case1 || case2 + .is_ok() } } @@ -175,10 +226,8 @@ impl Crdt for Version { if self.deleted.get() { self.blocks.clear(); - self.parts_etags.clear(); } else { self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); } } } -- cgit v1.2.3 From 82e75c0e296c74c374f3d40feeb1aadcb58398f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 12:02:59 +0200 Subject: Adapt S3 API code to use new multipart upload models - Create and PutPart - completemultipartupload - upload part copy - list_parts --- src/model/s3/mpu_table.rs | 15 +++++++++++++++ src/model/s3/version_table.rs | 11 +++++++++++ 2 files changed, 26 insertions(+) (limited to 'src/model/s3') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index dc5b5a82..7148be51 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::time::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; @@ -94,6 +95,20 @@ impl MultipartUpload { key, } } + + pub fn next_timestamp(&self, part_number: u64) -> u64 { + std::cmp::max( + now_msec(), + 1 + self + .parts + .items() + .iter() + .filter(|(x, _)| x.part_number == part_number) + .map(|(x, _)| x.timestamp) + .max() + .unwrap_or(0), + ) + } } impl Entry for MultipartUpload { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6cf1cc75..dcf4110a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::error::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; @@ -188,6 +189,16 @@ impl Version { .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) .is_ok() } + + pub fn n_parts(&self) -> Result { + Ok(self + .blocks + .items() + .last() + .ok_or_message("version has no parts")? + .0 + .part_number) + } } impl Ord for VersionBlockKey { -- cgit v1.2.3 From 511e07ecd489fa72040171fe908323873a57ac19 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 11:49:23 +0200 Subject: fix mpu counter (add missing workers) and report info at appropriate places --- src/model/s3/mpu_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model/s3') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 7148be51..4764e8da 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -208,7 +208,7 @@ impl TableSchema for MultipartUploadTable { } impl CountedItem for MultipartUpload { - const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_counter"; // Partition key = bucket id type CP = Uuid; -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/model/s3/mpu_table.rs | 13 ++++++------- src/model/s3/version_table.rs | 5 +++-- 2 files changed, 9 insertions(+), 9 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 4764e8da..63a4f1af 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use garage_db as db; +use garage_util::crdt::Crdt; use garage_util::data::*; use garage_util::time::*; -use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -21,8 +21,6 @@ mod v09 { use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; - pub use crate::s3::version_table::v09::VersionBlock; - /// A part of a multipart upload #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct MultipartUpload { @@ -30,15 +28,16 @@ mod v09 { pub upload_id: Uuid, /// Is this multipart upload deleted + /// The MultipartUpload is marked as deleted as soon as the + /// multipart upload is either completed or aborted pub deleted: crdt::Bool, /// List of uploaded parts, key = (part number, timestamp) /// In case of retries, all versions for each part are kept - /// Everything is cleaned up only once the multipart upload is completed or - /// aborted + /// Everything is cleaned up only once the MultipartUpload is marked deleted pub parts: crdt::Map, - // Back link to bucket+key so that we can figure if - // this was deleted later on + // Back link to bucket+key so that we can find the object this mpu + // belongs to and check whether it is still valid /// Bucket in which the related object is stored pub bucket_id: Uuid, /// Key in which the related object is stored diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index dcf4110a..5c032f9f 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -134,8 +134,9 @@ pub(crate) mod v09 { /// list of blocks of data composing the version pub blocks: crdt::Map, - // Back link to bucket+key so that we can figure if - // this was deleted later on + // Back link to owner of this version (either an object or a multipart + // upload), used to find whether it has been deleted and this version + // should in turn be deleted (see versions repair procedure) pub backlink: VersionBacklink, } -- cgit v1.2.3 From 3d477906d4ff418de10973db7bd3e940f2e47b2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Jun 2023 17:13:27 +0200 Subject: properly delete multipart uploads after completion --- src/model/s3/object_table.rs | 54 ++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 20 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a69069b2..db5ccf96 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -397,34 +397,20 @@ impl TableSchema for ObjectTable { // 2. Enqueue propagation deletions to version table if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions for v in old_v.versions.iter() { - let newly_deleted = match new_v + let new_v_id = new_v .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())); + + // Propagate deletion of old versions to the Version table + let delete_version = match new_v_id { Err(_) => true, Ok(i) => { new_v.versions[i].state == ObjectVersionState::Aborted && v.state != ObjectVersionState::Aborted } }; - if newly_deleted { - if let ObjectVersionState::Uploading { - multipart: true, .. - } = &v.state - { - let deleted_mpu = - MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); - let res = self.mpu_table.queue_insert(tx, &deleted_mpu); - if let Err(e) = db::unabort(res)? { - error!( - "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", - e - ); - } - } - + if delete_version { let deleted_version = Version::new( v.uuid, VersionBacklink::Object { @@ -441,6 +427,34 @@ impl TableSchema for ObjectTable { ); } } + + // After abortion or completion of multipart uploads, delete MPU table entry + if matches!( + v.state, + ObjectVersionState::Uploading { + multipart: true, + .. + } + ) { + let delete_mpu = match new_v_id { + Err(_) => true, + Ok(i) => !matches!( + new_v.versions[i].state, + ObjectVersionState::Uploading { .. } + ), + }; + if delete_mpu { + let deleted_mpu = + MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.mpu_table.queue_insert(tx, &deleted_mpu); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", + e + ); + } + } + } } } -- cgit v1.2.3