From 38d6ac429506f9f488ac522581b12fa530442a59 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 27 Apr 2023 17:57:54 +0200 Subject: New multipart upload table layout --- src/model/s3/mpu_table.rs | 231 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 src/model/s3/mpu_table.rs (limited to 'src/model/s3/mpu_table.rs') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs new file mode 100644 index 00000000..dc5b5a82 --- /dev/null +++ b/src/model/s3/mpu_table.rs @@ -0,0 +1,231 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::index_counter::*; +use crate::s3::version_table::*; + +pub const UPLOADS: &str = "uploads"; +pub const PARTS: &str = "parts"; +pub const BYTES: &str = "bytes"; + +mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + pub use crate::s3::version_table::v09::VersionBlock; + + /// A part of a multipart upload + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MultipartUpload { + /// Partition key = Upload id = UUID of the object version + pub upload_id: Uuid, + + /// Is this multipart upload deleted + pub deleted: crdt::Bool, + /// List of uploaded parts, key = (part number, timestamp) + /// In case of retries, all versions for each part are kept + /// Everything is cleaned up only once the multipart upload is completed or + /// aborted + pub parts: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct MpuPartKey { + /// Number of the part + pub part_number: u64, + /// Timestamp of part upload + pub timestamp: u64, + } + + /// The version of an uploaded part + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MpuPart { + /// Links to a Version in VersionTable + pub version: Uuid, + /// ETag of the content of this part (known only once done uploading) + pub etag: Option, + /// Size of this part (known only once done uploading) + pub size: Option, + } + + impl garage_util::migrate::InitialFormat for MultipartUpload { + const VERSION_MARKER: &'static [u8] = b"G09s3mpu"; + } +} + +pub use v09::*; + +impl Ord for MpuPartKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.timestamp.cmp(&other.timestamp)) + } +} + +impl PartialOrd for MpuPartKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl MultipartUpload { + pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + Self { + upload_id, + deleted: crdt::Bool::new(deleted), + parts: crdt::Map::new(), + bucket_id, + key, + } + } +} + +impl Entry for MultipartUpload { + fn partition_key(&self) -> &Uuid { + &self.upload_id + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for MultipartUpload { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.parts.clear(); + } else { + self.parts.merge(&other.parts); + } + } +} + +impl Crdt for MpuPart { + fn merge(&mut self, other: &Self) { + self.etag = match (self.etag.take(), &other.etag) { + (None, Some(_)) => other.etag.clone(), + (Some(x), Some(y)) if x < *y => other.etag.clone(), + (x, _) => x, + }; + self.size = match (self.size, other.size) { + (None, Some(_)) => other.size, + (Some(x), Some(y)) if x < y => other.size, + (x, _) => x, + }; + } +} + +pub struct MultipartUploadTable { + pub version_table: Arc>, + pub mpu_counter_table: Arc>, +} + +impl TableSchema for MultipartUploadTable { + const TABLE_NAME: &'static str = "multipart_upload"; + + type P = Uuid; + type S = EmptyKey; + type E = MultipartUpload; + type Filter = DeletedFilter; + + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.mpu_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update multipart object part counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Propagate deletions to version table + if let (Some(old_mpu), Some(new_mpu)) = (old, new) { + if new_mpu.deleted.get() && !old_mpu.deleted.get() { + let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| { + Version::new( + p.version, + VersionBacklink::MultipartUpload { + upload_id: old_mpu.upload_id, + }, + true, + ) + }); + for version in deleted_versions { + let res = self.version_table.queue_insert(tx, &version); + if let Err(e) = db::unabort(res)? { + error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e); + } + } + } + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.is_tombstone()) + } +} + +impl CountedItem for MultipartUpload { + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let uploads = if self.deleted.get() { 0 } else { 1 }; + let mut parts = self + .parts + .items() + .iter() + .map(|(k, _)| k.part_number) + .collect::>(); + parts.dedup(); + let bytes = self + .parts + .items() + .iter() + .map(|(_, p)| p.size.unwrap_or(0)) + .sum::(); + vec![ + (UPLOADS, uploads), + (PARTS, parts.len() as i64), + (BYTES, bytes as i64), + ] + } +} -- cgit v1.2.3 From 82e75c0e296c74c374f3d40feeb1aadcb58398f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 12:02:59 +0200 Subject: Adapt S3 API code to use new multipart upload models - Create and PutPart - completemultipartupload - upload part copy - list_parts --- src/model/s3/mpu_table.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'src/model/s3/mpu_table.rs') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index dc5b5a82..7148be51 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::time::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; @@ -94,6 +95,20 @@ impl MultipartUpload { key, } } + + pub fn next_timestamp(&self, part_number: u64) -> u64 { + std::cmp::max( + now_msec(), + 1 + self + .parts + .items() + .iter() + .filter(|(x, _)| x.part_number == part_number) + .map(|(x, _)| x.timestamp) + .max() + .unwrap_or(0), + ) + } } impl Entry for MultipartUpload { -- cgit v1.2.3 From 511e07ecd489fa72040171fe908323873a57ac19 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 11:49:23 +0200 Subject: fix mpu counter (add missing workers) and report info at appropriate places --- src/model/s3/mpu_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model/s3/mpu_table.rs') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 7148be51..4764e8da 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -208,7 +208,7 @@ impl TableSchema for MultipartUploadTable { } impl CountedItem for MultipartUpload { - const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_counter"; // Partition key = bucket id type CP = Uuid; -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/model/s3/mpu_table.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'src/model/s3/mpu_table.rs') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 4764e8da..63a4f1af 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use garage_db as db; +use garage_util::crdt::Crdt; use garage_util::data::*; use garage_util::time::*; -use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -21,8 +21,6 @@ mod v09 { use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; - pub use crate::s3::version_table::v09::VersionBlock; - /// A part of a multipart upload #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct MultipartUpload { @@ -30,15 +28,16 @@ mod v09 { pub upload_id: Uuid, /// Is this multipart upload deleted + /// The MultipartUpload is marked as deleted as soon as the + /// multipart upload is either completed or aborted pub deleted: crdt::Bool, /// List of uploaded parts, key = (part number, timestamp) /// In case of retries, all versions for each part are kept - /// Everything is cleaned up only once the multipart upload is completed or - /// aborted + /// Everything is cleaned up only once the MultipartUpload is marked deleted pub parts: crdt::Map, - // Back link to bucket+key so that we can figure if - // this was deleted later on + // Back link to bucket+key so that we can find the object this mpu + // belongs to and check whether it is still valid /// Bucket in which the related object is stored pub bucket_id: Uuid, /// Key in which the related object is stored -- cgit v1.2.3 From 942c1f1bfe138cbc4e49540cede852e4d462590e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 10:48:22 +0200 Subject: multipart uploads: save timestamp --- src/model/s3/mpu_table.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src/model/s3/mpu_table.rs') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 63a4f1af..238cbf11 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -27,6 +27,8 @@ mod v09 { /// Partition key = Upload id = UUID of the object version pub upload_id: Uuid, + /// The timestamp at which the multipart upload was created + pub timestamp: u64, /// Is this multipart upload deleted /// The MultipartUpload is marked as deleted as soon as the /// multipart upload is either completed or aborted @@ -85,9 +87,16 @@ impl PartialOrd for MpuPartKey { } impl MultipartUpload { - pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + pub fn new( + upload_id: Uuid, + timestamp: u64, + bucket_id: Uuid, + key: String, + deleted: bool, + ) -> Self { Self { upload_id, + timestamp, deleted: crdt::Bool::new(deleted), parts: crdt::Map::new(), bucket_id, -- cgit v1.2.3