diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/block.rs | 2 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 138 | ||||
-rw-r--r-- | src/model/key_table.rs | 122 |
3 files changed, 118 insertions, 144 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 6a5d9c5b..8a513a3c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use garage_table::table_sharded::TableShardedReplication; -use garage_table::{TableReplication, DeletedFilter}; +use garage_table::{DeletedFilter, TableReplication}; use crate::block_ref_table::*; diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 35c0cc27..b7f24d71 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,71 +1,60 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; + use garage_util::error::Error; +use crate::key_table::PermissionSet; + +use model010::bucket_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { // Primary key pub name: String, - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, + pub state: crdt::LWW<BucketState>, +} - // Authorized keys - authorized_keys: Vec<AllowedKey>, +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + Deleted, + Present(crdt::LWWMap<String, PermissionSet>), } -impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec<AllowedKey>, - ) -> Self { - let mut ret = Bucket { - name, - timestamp, - deleted, - authorized_keys: vec![], - }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); +impl CRDT for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_ak) => { + if let BucketState::Present(ak) = self { + ak.merge(other_ak); + } + } } - ret } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), +} + +impl Bucket { + pub fn new(name: String) -> Self { + Bucket { + name, + state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), } } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] + pub fn is_deleted(&self) -> bool { + *self.state.get() == BucketState::Deleted } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); + pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { + match self.state.get() { + BucketState::Deleted => &[], + BucketState::Present(ak) => ak.items(), + } } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - impl Entry<EmptyKey, String> for Bucket { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -75,36 +64,12 @@ impl Entry<EmptyKey, String> for Bucket { } fn merge(&mut self, other: &Self) { - if other.timestamp > self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } + self.state.merge(&other.state); } } pub struct BucketTable; - #[async_trait] impl TableSchema for BucketTable { type P = EmptyKey; @@ -117,6 +82,35 @@ impl TableSchema for BucketTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.is_deleted()) + } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + if old.deleted { + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), + }) + } else { + let mut keys = crdt::LWWMap::new(); + for ak in old.authorized_keys() { + keys.merge(&crdt::LWWMap::migrate_from_raw_item( + ak.key_id.clone(), + ak.timestamp, + PermissionSet { + allow_read: ak.allow_read, + allow_write: ak.allow_write, + }, + )); + } + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)), + }) + } } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 05b938ce..20da3cc6 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,10 +1,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; -use garage_util::data::*; + use garage_util::error::Error; +use model010::key_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -14,83 +17,54 @@ pub struct Key { pub secret_key: String, // Name - pub name: String, - pub name_timestamp: u64, + pub name: crdt::LWW<String>, // Deletion - pub deleted: bool, + pub deleted: crdt::Bool, // Authorized keys - authorized_buckets: Vec<AllowedBucket>, + pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, + // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { - pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self { + pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { + Self { key_id, secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], - }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); + name: crdt::LWW::new(name), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), } - ret } pub fn delete(key_id: String) -> Self { Self { key_id, secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], + name: crdt::LWW::new("".to_string()), + deleted: crdt::Bool::new(true), + authorized_buckets: crdt::LWWMap::new(), } } /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_write) .unwrap_or(false) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { pub allow_read: bool, pub allow_write: bool, } @@ -104,35 +78,15 @@ impl Entry<EmptyKey, String> for Key { } fn merge(&mut self, other: &Self) { - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); - if other.deleted { - self.deleted = true; - } - if self.deleted { + if self.deleted.get() { self.authorized_buckets.clear(); return; } - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } + self.authorized_buckets.merge(&other.authorized_buckets); } } @@ -150,6 +104,32 @@ impl TableSchema for KeyTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let mut new = Self::E { + key_id: old.key_id.clone(), + secret_key: old.secret_key.clone(), + name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), + deleted: crdt::Bool::new(old.deleted), + authorized_buckets: crdt::LWWMap::new(), + }; + for ab in old.authorized_buckets() { + let it = crdt::LWWMap::migrate_from_raw_item( + ab.bucket.clone(), + ab.timestamp, + PermissionSet { + allow_read: ab.allow_read, + allow_write: ab.allow_write, + }, + ); + new.authorized_buckets.merge(&it); + } + Some(new) } } |