aboutsummaryrefslogtreecommitdiff
path: root/src/model/object_table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/object_table.rs')
-rw-r--r--src/model/object_table.rs39
1 files changed, 17 insertions, 22 deletions
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 16cce72c..75c37f6d 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -5,6 +5,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
+use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -70,7 +71,7 @@ pub enum ObjectVersionState {
Aborted,
}
-impl ObjectVersionState {
+impl CRDT for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
@@ -91,37 +92,30 @@ impl ObjectVersionState {
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
DeleteMarker,
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
FirstBlock(ObjectVersionMeta, Hash),
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+impl AutoCRDT for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
pub headers: ObjectVersionHeaders,
pub size: u64,
pub etag: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
pub content_type: String,
pub other: BTreeMap<String, String>,
}
-impl ObjectVersionData {
- fn merge(&mut self, b: &Self) {
- if *self != *b {
- warn!(
- "Inconsistent object version data: {:?} (local) vs {:?} (remote)",
- self, b
- );
- }
- }
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
@@ -154,8 +148,11 @@ impl Entry<String, String> for Object {
fn sort_key(&self) -> &String {
&self.key
}
+}
+impl CRDT for Object {
fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
for other_v in other.versions.iter() {
match self
.versions
@@ -169,6 +166,9 @@ impl Entry<String, String> for Object {
}
}
}
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
let last_complete = self
.versions
.iter()
@@ -212,13 +212,8 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
- let deleted_version = Version::new(
- v.uuid,
- old_v.bucket.clone(),
- old_v.key.clone(),
- true,
- vec![],
- );
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
version_table.insert(&deleted_version).await?;
}
}