aboutsummaryrefslogtreecommitdiff
path: root/src/model/object_table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/object_table.rs')
-rw-r--r--src/model/object_table.rs97
1 files changed, 21 insertions, 76 deletions
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 16cce72c..62606df4 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -5,13 +5,12 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
-use garage_table::table_sharded::*;
+use garage_table::crdt::*;
+use garage_table::replication::sharded::*;
use garage_table::*;
use crate::version_table::*;
-use model010::object_table as prev;
-
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
// Primary key
@@ -70,7 +69,7 @@ pub enum ObjectVersionState {
Aborted,
}
-impl ObjectVersionState {
+impl CRDT for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
@@ -91,37 +90,30 @@ impl ObjectVersionState {
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
DeleteMarker,
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
FirstBlock(ObjectVersionMeta, Hash),
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+impl AutoCRDT for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
pub headers: ObjectVersionHeaders,
pub size: u64,
pub etag: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
pub content_type: String,
pub other: BTreeMap<String, String>,
}
-impl ObjectVersionData {
- fn merge(&mut self, b: &Self) {
- if *self != *b {
- warn!(
- "Inconsistent object version data: {:?} (local) vs {:?} (remote)",
- self, b
- );
- }
- }
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
@@ -154,8 +146,14 @@ impl Entry<String, String> for Object {
fn sort_key(&self) -> &String {
&self.key
}
+ fn is_tombstone(&self) -> bool {
+ self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
+ }
+}
+impl CRDT for Object {
fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
for other_v in other.versions.iter() {
match self
.versions
@@ -169,6 +167,9 @@ impl Entry<String, String> for Object {
}
}
}
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
let last_complete = self
.versions
.iter()
@@ -212,13 +213,8 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
- let deleted_version = Version::new(
- v.uuid,
- old_v.bucket.clone(),
- old_v.key.clone(),
- true,
- vec![],
- );
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
version_table.insert(&deleted_version).await?;
}
}
@@ -231,55 +227,4 @@ impl TableSchema for ObjectTable {
let deleted = !entry.versions.iter().any(|v| v.is_data());
filter.apply(deleted)
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) {
- Ok(x) => x,
- Err(_) => return None,
- };
- let new_v = old
- .versions()
- .iter()
- .map(migrate_version)
- .collect::<Vec<_>>();
- let new = Object::new(old.bucket.clone(), old.key.clone(), new_v);
- Some(new)
- }
-}
-
-fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion {
- let headers = ObjectVersionHeaders {
- content_type: old.mime_type.clone(),
- other: BTreeMap::new(),
- };
- let meta = ObjectVersionMeta {
- headers: headers.clone(),
- size: old.size,
- etag: "".to_string(),
- };
- let state = match old.state {
- prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers),
- prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
- prev::ObjectVersionState::Complete => match &old.data {
- prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers),
- prev::ObjectVersionData::DeleteMarker => {
- ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
- }
- prev::ObjectVersionData::Inline(x) => {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone()))
- }
- prev::ObjectVersionData::FirstBlock(h) => {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(h.as_ref());
- ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash)))
- }
- },
- };
- let mut uuid = [0u8; 32];
- uuid.copy_from_slice(old.uuid.as_ref());
- ObjectVersion {
- uuid: UUID::from(uuid),
- timestamp: old.timestamp,
- state,
- }
}