diff options
Diffstat (limited to 'src/model/version_table.rs')
-rw-r--r-- | src/model/version_table.rs | 107 |
1 files changed, 50 insertions, 57 deletions
diff --git a/src/model/version_table.rs b/src/model/version_table.rs index cf9fbe98..841fbfea 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; -use garage_table::table_sharded::*; +use garage_table::crdt::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::block_ref_table::*; @@ -15,8 +16,11 @@ pub struct Version { pub uuid: UUID, // Actual data: the blocks for this version - pub deleted: bool, - blocks: Vec<VersionBlock>, + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + pub deleted: crdt::Bool, + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + pub parts_etags: crdt::Map<u64, String>, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -25,56 +29,46 @@ pub struct Version { } impl Version { - pub fn new( - uuid: UUID, - bucket: String, - key: String, - deleted: bool, - blocks: Vec<VersionBlock>, - ) -> Self { - let mut ret = Self { + pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self { + Self { uuid, - deleted, - blocks: vec![], + deleted: deleted.into(), + blocks: crdt::Map::new(), + parts_etags: crdt::Map::new(), bucket, key, - }; - for b in blocks { - ret.add_block(b) - .expect("Twice the same VersionBlock in Version constructor"); } - ret } - /// Adds a block if it wasn't already present - pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self - .blocks - .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.blocks.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + pub part_number: u64, + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) } - pub fn blocks(&self) -> &[VersionBlock] { - &self.blocks[..] +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { - pub part_number: u64, - pub offset: u64, pub hash: Hash, pub size: u64, } -impl VersionBlock { - fn cmp_key(&self) -> (u64, u64) { - (self.part_number, self.offset) - } +impl AutoCRDT for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; } impl Entry<Hash, EmptyKey> for Version { @@ -84,23 +78,21 @@ impl Entry<Hash, EmptyKey> for Version { fn sort_key(&self) -> &EmptyKey { &EmptyKey } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} +impl CRDT for Version { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; + self.deleted.merge(&other.deleted); + + if self.deleted.get() { self.blocks.clear(); - } else if !self.deleted { - for bi in other.blocks.iter() { - match self - .blocks - .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) - { - Ok(_) => (), - Err(pos) => { - self.blocks.insert(pos, bi.clone()); - } - } - } + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); } } } @@ -121,14 +113,15 @@ impl TableSchema for VersionTable { self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { + if new_v.deleted.get() && !old_v.deleted.get() { let deleted_block_refs = old_v .blocks + .items() .iter() - .map(|vb| BlockRef { + .map(|(_k, vb)| BlockRef { block: vb.hash, version: old_v.uuid, - deleted: true, + deleted: true.into(), }) .collect::<Vec<_>>(); block_ref_table.insert_many(&deleted_block_refs[..]).await?; @@ -139,6 +132,6 @@ impl TableSchema for VersionTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } |