aboutsummaryrefslogtreecommitdiff
path: root/src/model/version_table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/version_table.rs')
-rw-r--r--src/model/version_table.rs96
1 files changed, 40 insertions, 56 deletions
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index cf9fbe98..26abb64e 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
+use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -15,8 +16,8 @@ pub struct Version {
pub uuid: UUID,
// Actual data: the blocks for this version
- pub deleted: bool,
- blocks: Vec<VersionBlock>,
+ pub deleted: crdt::Bool,
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
@@ -25,56 +26,45 @@ pub struct Version {
}
impl Version {
- pub fn new(
- uuid: UUID,
- bucket: String,
- key: String,
- deleted: bool,
- blocks: Vec<VersionBlock>,
- ) -> Self {
- let mut ret = Self {
+ pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self {
+ Self {
uuid,
- deleted,
- blocks: vec![],
+ deleted: deleted.into(),
+ blocks: crdt::Map::new(),
bucket,
key,
- };
- for b in blocks {
- ret.add_block(b)
- .expect("Twice the same VersionBlock in Version constructor");
}
- ret
}
- /// Adds a block if it wasn't already present
- pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
- match self
- .blocks
- .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key()))
- {
- Err(i) => {
- self.blocks.insert(i, new);
- Ok(())
- }
- Ok(_) => Err(()),
- }
+}
+
+#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlockKey {
+ pub part_number: u64,
+ pub offset: u64,
+}
+
+impl Ord for VersionBlockKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.offset.cmp(&other.offset))
}
- pub fn blocks(&self) -> &[VersionBlock] {
- &self.blocks[..]
+}
+
+impl PartialOrd for VersionBlockKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
- pub part_number: u64,
- pub offset: u64,
pub hash: Hash,
pub size: u64,
}
-impl VersionBlock {
- fn cmp_key(&self) -> (u64, u64) {
- (self.part_number, self.offset)
- }
+impl AutoCRDT for VersionBlock {
+ const WARN_IF_DIFFERENT: bool = true;
}
impl Entry<Hash, EmptyKey> for Version {
@@ -84,23 +74,16 @@ impl Entry<Hash, EmptyKey> for Version {
fn sort_key(&self) -> &EmptyKey {
&EmptyKey
}
+}
+impl CRDT for Version {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
self.blocks.clear();
- } else if !self.deleted {
- for bi in other.blocks.iter() {
- match self
- .blocks
- .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key()))
- {
- Ok(_) => (),
- Err(pos) => {
- self.blocks.insert(pos, bi.clone());
- }
- }
- }
+ } else {
+ self.blocks.merge(&other.blocks);
}
}
}
@@ -121,14 +104,15 @@ impl TableSchema for VersionTable {
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- if new_v.deleted && !old_v.deleted {
+ if new_v.deleted.get() && !old_v.deleted.get() {
let deleted_block_refs = old_v
.blocks
+ .items()
.iter()
- .map(|vb| BlockRef {
+ .map(|(_k, vb)| BlockRef {
block: vb.hash,
version: old_v.uuid,
- deleted: true,
+ deleted: true.into(),
})
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
@@ -139,6 +123,6 @@ impl TableSchema for VersionTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}