diff options
Diffstat (limited to 'src/model/bucket_table.rs')
-rw-r--r-- | src/model/bucket_table.rs | 137 |
1 files changed, 66 insertions, 71 deletions
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 11f853f9..b7f24d71 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,71 +1,60 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; + use garage_util::error::Error; +use crate::key_table::PermissionSet; + +use model010::bucket_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { // Primary key pub name: String, - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, + pub state: crdt::LWW<BucketState>, +} - // Authorized keys - authorized_keys: Vec<AllowedKey>, +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + Deleted, + Present(crdt::LWWMap<String, PermissionSet>), } -impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec<AllowedKey>, - ) -> Self { - let mut ret = Bucket { - name, - timestamp, - deleted, - authorized_keys: vec![], - }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); +impl CRDT for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_ak) => { + if let BucketState::Present(ak) = self { + ak.merge(other_ak); + } + } } - ret } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), +} + +impl Bucket { + pub fn new(name: String) -> Self { + Bucket { + name, + state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), } } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] + pub fn is_deleted(&self) -> bool { + *self.state.get() == BucketState::Deleted } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); + pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { + match self.state.get() { + BucketState::Deleted => &[], + BucketState::Present(ak) => ak.items(), + } } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - impl Entry<EmptyKey, String> for Bucket { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -75,30 +64,7 @@ impl Entry<EmptyKey, String> for Bucket { } fn merge(&mut self, other: &Self) { - if other.timestamp > self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } + self.state.merge(&other.state); } } @@ -116,6 +82,35 @@ impl TableSchema for BucketTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.is_deleted()) + } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + if old.deleted { + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), + }) + } else { + let mut keys = crdt::LWWMap::new(); + for ak in old.authorized_keys() { + keys.merge(&crdt::LWWMap::migrate_from_raw_item( + ak.key_id.clone(), + ak.timestamp, + PermissionSet { + allow_read: ak.allow_read, + allow_write: ak.allow_write, + }, + )); + } + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)), + }) + } } } |