diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/block.rs | 2 | ||||
-rw-r--r-- | src/model/block_ref_table.rs | 17 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 2 | ||||
-rw-r--r-- | src/model/garage.rs | 1 | ||||
-rw-r--r-- | src/model/key_table.rs | 13 | ||||
-rw-r--r-- | src/model/object_table.rs | 39 | ||||
-rw-r--r-- | src/model/version_table.rs | 96 |
7 files changed, 77 insertions, 93 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 56c85c6a..d3957403 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -420,7 +420,7 @@ impl BlockManager { if Some(&block_ref.block) == last_hash.as_ref() { continue; } - if !block_ref.deleted { + if !block_ref.deleted.get() { last_hash = Some(block_ref.block); self.put_to_resync(&block_ref.block, 0)?; } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 9ab67737..07fa5144 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -1,9 +1,9 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; -use garage_util::background::*; use garage_util::data::*; +use garage_table::crdt::CRDT; use garage_table::*; use crate::block::*; @@ -17,7 +17,7 @@ pub struct BlockRef { pub version: UUID, // Keep track of deleted status - pub deleted: bool, + pub deleted: crdt::Bool, } impl Entry<Hash, UUID> for BlockRef { @@ -27,16 +27,15 @@ impl Entry<Hash, UUID> for BlockRef { fn sort_key(&self) -> &UUID { &self.version } +} +impl CRDT for BlockRef { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } + self.deleted.merge(&other.deleted); } } pub struct BlockRefTable { - pub background: Arc<BackgroundRunner>, pub block_manager: Arc<BlockManager>, } @@ -48,8 +47,8 @@ impl TableSchema for BlockRefTable { fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); + let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { if let Err(e) = self.block_manager.block_incref(block) { warn!("block_incref failed for block {:?}: {}", block, e); @@ -63,6 +62,6 @@ impl TableSchema for BlockRefTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 2878aa38..5bc8b7f9 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -89,7 +89,9 @@ impl Entry<EmptyKey, String> for Bucket { fn sort_key(&self) -> &String { &self.name } +} +impl CRDT for Bucket { fn merge(&mut self, other: &Self) { self.state.merge(&other.state); } diff --git a/src/model/garage.rs b/src/model/garage.rs index 467d0aec..d109fdaa 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -79,7 +79,6 @@ impl Garage { info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { - background: background.clone(), block_manager: block_manager.clone(), }, data_rep_param.clone(), diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 5942df75..6d8cc6c0 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::CRDT; +use garage_table::crdt::*; use garage_table::*; use model010::key_table as prev; @@ -66,6 +66,10 @@ pub struct PermissionSet { pub allow_write: bool, } +impl AutoCRDT for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; +} + impl Entry<EmptyKey, String> for Key { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -73,17 +77,18 @@ impl Entry<EmptyKey, String> for Key { fn sort_key(&self) -> &String { &self.key_id } +} +impl CRDT for Key { fn merge(&mut self, other: &Self) { self.name.merge(&other.name); self.deleted.merge(&other.deleted); if self.deleted.get() { self.authorized_buckets.clear(); - return; + } else { + self.authorized_buckets.merge(&other.authorized_buckets); } - - self.authorized_buckets.merge(&other.authorized_buckets); } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 16cce72c..75c37f6d 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; +use garage_table::crdt::*; use garage_table::table_sharded::*; use garage_table::*; @@ -70,7 +71,7 @@ pub enum ObjectVersionState { Aborted, } -impl ObjectVersionState { +impl CRDT for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; match other { @@ -91,37 +92,30 @@ impl ObjectVersionState { } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { DeleteMarker, Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), FirstBlock(ObjectVersionMeta, Hash), } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +impl AutoCRDT for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { pub headers: ObjectVersionHeaders, pub size: u64, pub etag: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { pub content_type: String, pub other: BTreeMap<String, String>, } -impl ObjectVersionData { - fn merge(&mut self, b: &Self) { - if *self != *b { - warn!( - "Inconsistent object version data: {:?} (local) vs {:?} (remote)", - self, b - ); - } - } -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) @@ -154,8 +148,11 @@ impl Entry<String, String> for Object { fn sort_key(&self) -> &String { &self.key } +} +impl CRDT for Object { fn merge(&mut self, other: &Self) { + // Merge versions from other into here for other_v in other.versions.iter() { match self .versions @@ -169,6 +166,9 @@ impl Entry<String, String> for Object { } } } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). let last_complete = self .versions .iter() @@ -212,13 +212,8 @@ impl TableSchema for ObjectTable { } }; if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); + let deleted_version = + Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true); version_table.insert(&deleted_version).await?; } } diff --git a/src/model/version_table.rs b/src/model/version_table.rs index cf9fbe98..26abb64e 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; +use garage_table::crdt::*; use garage_table::table_sharded::*; use garage_table::*; @@ -15,8 +16,8 @@ pub struct Version { pub uuid: UUID, // Actual data: the blocks for this version - pub deleted: bool, - blocks: Vec<VersionBlock>, + pub deleted: crdt::Bool, + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -25,56 +26,45 @@ pub struct Version { } impl Version { - pub fn new( - uuid: UUID, - bucket: String, - key: String, - deleted: bool, - blocks: Vec<VersionBlock>, - ) -> Self { - let mut ret = Self { + pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self { + Self { uuid, - deleted, - blocks: vec![], + deleted: deleted.into(), + blocks: crdt::Map::new(), bucket, key, - }; - for b in blocks { - ret.add_block(b) - .expect("Twice the same VersionBlock in Version constructor"); } - ret } - /// Adds a block if it wasn't already present - pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self - .blocks - .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.blocks.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + pub part_number: u64, + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) } - pub fn blocks(&self) -> &[VersionBlock] { - &self.blocks[..] +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { - pub part_number: u64, - pub offset: u64, pub hash: Hash, pub size: u64, } -impl VersionBlock { - fn cmp_key(&self) -> (u64, u64) { - (self.part_number, self.offset) - } +impl AutoCRDT for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; } impl Entry<Hash, EmptyKey> for Version { @@ -84,23 +74,16 @@ impl Entry<Hash, EmptyKey> for Version { fn sort_key(&self) -> &EmptyKey { &EmptyKey } +} +impl CRDT for Version { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; + self.deleted.merge(&other.deleted); + + if self.deleted.get() { self.blocks.clear(); - } else if !self.deleted { - for bi in other.blocks.iter() { - match self - .blocks - .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) - { - Ok(_) => (), - Err(pos) => { - self.blocks.insert(pos, bi.clone()); - } - } - } + } else { + self.blocks.merge(&other.blocks); } } } @@ -121,14 +104,15 @@ impl TableSchema for VersionTable { self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { + if new_v.deleted.get() && !old_v.deleted.get() { let deleted_block_refs = old_v .blocks + .items() .iter() - .map(|vb| BlockRef { + .map(|(_k, vb)| BlockRef { block: vb.hash, version: old_v.uuid, - deleted: true, + deleted: true.into(), }) .collect::<Vec<_>>(); block_ref_table.insert_many(&deleted_block_refs[..]).await?; @@ -139,6 +123,6 @@ impl TableSchema for VersionTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } |