diff options
Diffstat (limited to 'src/model/object_table.rs')
-rw-r--r-- | src/model/object_table.rs | 97 |
1 files changed, 21 insertions, 76 deletions
diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 16cce72c..62606df4 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -5,13 +5,12 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; -use garage_table::table_sharded::*; +use garage_table::crdt::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::version_table::*; -use model010::object_table as prev; - #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { // Primary key @@ -70,7 +69,7 @@ pub enum ObjectVersionState { Aborted, } -impl ObjectVersionState { +impl CRDT for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; match other { @@ -91,37 +90,30 @@ impl ObjectVersionState { } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { DeleteMarker, Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), FirstBlock(ObjectVersionMeta, Hash), } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +impl AutoCRDT for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { pub headers: ObjectVersionHeaders, pub size: u64, pub etag: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { pub content_type: String, pub other: BTreeMap<String, String>, } -impl ObjectVersionData { - fn merge(&mut self, b: &Self) { - if *self != *b { - warn!( - "Inconsistent object version data: {:?} (local) vs {:?} (remote)", - self, b - ); - } - } -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) @@ -154,8 +146,14 @@ impl Entry<String, String> for Object { fn sort_key(&self) -> &String { &self.key } + fn is_tombstone(&self) -> bool { + self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + } +} +impl CRDT for Object { fn merge(&mut self, other: &Self) { + // Merge versions from other into here for other_v in other.versions.iter() { match self .versions @@ -169,6 +167,9 @@ impl Entry<String, String> for Object { } } } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). let last_complete = self .versions .iter() @@ -212,13 +213,8 @@ impl TableSchema for ObjectTable { } }; if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); + let deleted_version = + Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true); version_table.insert(&deleted_version).await?; } } @@ -231,55 +227,4 @@ impl TableSchema for ObjectTable { let deleted = !entry.versions.iter().any(|v| v.is_data()); filter.apply(deleted) } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - let new_v = old - .versions() - .iter() - .map(migrate_version) - .collect::<Vec<_>>(); - let new = Object::new(old.bucket.clone(), old.key.clone(), new_v); - Some(new) - } -} - -fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion { - let headers = ObjectVersionHeaders { - content_type: old.mime_type.clone(), - other: BTreeMap::new(), - }; - let meta = ObjectVersionMeta { - headers: headers.clone(), - size: old.size, - etag: "".to_string(), - }; - let state = match old.state { - prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - prev::ObjectVersionState::Complete => match &old.data { - prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionData::DeleteMarker => { - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) - } - prev::ObjectVersionData::Inline(x) => { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone())) - } - prev::ObjectVersionData::FirstBlock(h) => { - let mut hash = [0u8; 32]; - hash.copy_from_slice(h.as_ref()); - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash))) - } - }, - }; - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(old.uuid.as_ref()); - ObjectVersion { - uuid: UUID::from(uuid), - timestamp: old.timestamp, - state, - } } |