aboutsummaryrefslogtreecommitdiff
path: root/src/model/s3/object_table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/s3/object_table.rs')
-rw-r--r--src/model/s3/object_table.rs167
1 files changed, 149 insertions, 18 deletions
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 518acc95..db5ccf96 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::index_counter::*;
+use crate::s3::mpu_table::*;
use crate::s3::version_table::*;
pub const OBJECTS: &str = "objects";
@@ -130,7 +131,86 @@ mod v08 {
}
}
-pub use v08::*;
+mod v09 {
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v08;
+
+ pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta};
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket_id: Uuid,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
+
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading {
+ /// Indicates whether this is a multipart upload
+ multipart: bool,
+ /// Headers to be included in the final object
+ headers: ObjectVersionHeaders,
+ },
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
+ impl garage_util::migrate::Migrate for Object {
+ const VERSION_MARKER: &'static [u8] = b"G09s3o";
+
+ type Previous = v08::Object;
+
+ fn migrate(old: v08::Object) -> Object {
+ let versions = old
+ .versions
+ .into_iter()
+ .map(|x| ObjectVersion {
+ uuid: x.uuid,
+ timestamp: x.timestamp,
+ state: match x.state {
+ v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading {
+ multipart: false,
+ headers: h,
+ },
+ v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d),
+ v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
+ },
+ })
+ .collect();
+ Object {
+ bucket_id: old.bucket_id,
+ key: old.key,
+ versions,
+ }
+ }
+ }
+}
+
+pub use v09::*;
impl Object {
/// Initialize an Object struct from parts
@@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState {
Complete(a) => {
a.merge(b);
}
- Uploading(_) => {
+ Uploading { .. } => {
*self = Complete(b.clone());
}
},
- Uploading(_) => {}
+ Uploading { .. } => {}
}
}
}
@@ -199,8 +279,17 @@ impl ObjectVersion {
}
/// Is the object version currently being uploaded
- pub fn is_uploading(&self) -> bool {
- matches!(self.state, ObjectVersionState::Uploading(_))
+ ///
+ /// matches only multipart uploads if check_multipart is Some(true)
+ /// matches only non-multipart uploads if check_multipart is Some(false)
+ /// matches both if check_multipart is None
+ pub fn is_uploading(&self, check_multipart: Option<bool>) -> bool {
+ match &self.state {
+ ObjectVersionState::Uploading { multipart, .. } => {
+ check_multipart.map(|x| x == *multipart).unwrap_or(true)
+ }
+ _ => false,
+ }
}
/// Is the object version completely received
@@ -267,13 +356,20 @@ impl Crdt for Object {
pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ObjectFilter {
+ /// Is the object version available (received and not a tombstone)
IsData,
- IsUploading,
+ /// Is the object version currently being uploaded
+ ///
+ /// matches only multipart uploads if check_multipart is Some(true)
+ /// matches only non-multipart uploads if check_multipart is Some(false)
+ /// matches both if check_multipart is None
+ IsUploading { check_multipart: Option<bool> },
}
impl TableSchema for ObjectTable {
@@ -301,21 +397,28 @@ impl TableSchema for ObjectTable {
// 2. Enqueue propagation deletions to version table
if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of old versions
for v in old_v.versions.iter() {
- let newly_deleted = match new_v
+ let new_v_id = new_v
.versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- {
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()));
+
+ // Propagate deletion of old versions to the Version table
+ let delete_version = match new_v_id {
Err(_) => true,
Ok(i) => {
new_v.versions[i].state == ObjectVersionState::Aborted
&& v.state != ObjectVersionState::Aborted
}
};
- if newly_deleted {
- let deleted_version =
- Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ if delete_version {
+ let deleted_version = Version::new(
+ v.uuid,
+ VersionBacklink::Object {
+ bucket_id: old_v.bucket_id,
+ key: old_v.key.clone(),
+ },
+ true,
+ );
let res = self.version_table.queue_insert(tx, &deleted_version);
if let Err(e) = db::unabort(res)? {
error!(
@@ -324,6 +427,34 @@ impl TableSchema for ObjectTable {
);
}
}
+
+ // After abortion or completion of multipart uploads, delete MPU table entry
+ if matches!(
+ v.state,
+ ObjectVersionState::Uploading {
+ multipart: true,
+ ..
+ }
+ ) {
+ let delete_mpu = match new_v_id {
+ Err(_) => true,
+ Ok(i) => !matches!(
+ new_v.versions[i].state,
+ ObjectVersionState::Uploading { .. }
+ ),
+ };
+ if delete_mpu {
+ let deleted_mpu =
+ MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ let res = self.mpu_table.queue_insert(tx, &deleted_mpu);
+ if let Err(e) = db::unabort(res)? {
+ error!(
+ "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.",
+ e
+ );
+ }
+ }
+ }
}
}
@@ -333,7 +464,10 @@ impl TableSchema for ObjectTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
match filter {
ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()),
- ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
+ ObjectFilter::IsUploading { check_multipart } => entry
+ .versions
+ .iter()
+ .any(|v| v.is_uploading(*check_multipart)),
}
}
}
@@ -360,10 +494,7 @@ impl CountedItem for Object {
} else {
0
};
- let n_unfinished_uploads = versions
- .iter()
- .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
- .count();
+ let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count();
let n_bytes = versions
.iter()
.map(|v| match &v.state {