From e02e9e035e269cd6b660c92706db424a2edb5306 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 21:15:24 +0100 Subject: Begin improve model to use better CRDTs --- src/model/key_table.rs | 121 +++++++++++++++++++++---------------------------- 1 file changed, 51 insertions(+), 70 deletions(-) (limited to 'src/model') diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 05b938ce..2b825aa3 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -2,9 +2,12 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_table::*; -use garage_util::data::*; +use garage_table::crdt::CRDT; + use garage_util::error::Error; +use model010::key_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -14,83 +17,56 @@ pub struct Key { pub secret_key: String, // Name - pub name: String, - pub name_timestamp: u64, + pub name: crdt::LWW, // Deletion - pub deleted: bool, + pub deleted: crdt::Bool, // Authorized keys - authorized_buckets: Vec, + pub authorized_buckets: crdt::LWWMap, + + // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { - pub fn new(name: String, buckets: Vec) -> Self { + pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { + let ret = Self { key_id, secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], + name: crdt::LWW::new(name), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); - } ret } pub fn delete(key_id: String) -> Self { Self { key_id, secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], + name: crdt::LWW::new("".to_string()), + deleted: crdt::Bool::new(true), + authorized_buckets: crdt::LWWMap::new(), } } /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_write) .unwrap_or(false) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { pub allow_read: bool, pub allow_write: bool, } @@ -104,35 +80,15 @@ impl Entry for Key { } fn merge(&mut self, other: &Self) { - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); - if other.deleted { - self.deleted = true; - } - if self.deleted { + if self.deleted.get() { self.authorized_buckets.clear(); return; } - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } + self.authorized_buckets.merge(&other.authorized_buckets); } } @@ -150,6 +106,31 @@ impl TableSchema for KeyTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let mut new = Self::E { + key_id: old.key_id.clone(), + secret_key: old.secret_key.clone(), + name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), + deleted: crdt::Bool::new(old.deleted), + authorized_buckets: crdt::LWWMap::new(), + }; + for ab in old.authorized_buckets() { + let it = crdt::LWWMap::migrate_from_raw_item( + ab.bucket.clone(), + ab.timestamp, + PermissionSet{ + allow_read: ab.allow_read, + allow_write: ab.allow_write, + }); + new.authorized_buckets.merge(&it); + } + Some(new) } } -- cgit v1.2.3 From f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:01:12 +0100 Subject: Convert bucket table to better CRDT representation --- src/model/block.rs | 2 +- src/model/bucket_table.rs | 137 ++++++++++++++++++++++------------------------ src/model/key_table.rs | 8 +-- 3 files changed, 71 insertions(+), 76 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 6a5d9c5b..8a513a3c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use garage_table::table_sharded::TableShardedReplication; -use garage_table::{TableReplication, DeletedFilter}; +use garage_table::{DeletedFilter, TableReplication}; use crate::block_ref_table::*; diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 35c0cc27..93421acb 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,71 +1,61 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; + use garage_util::error::Error; +use crate::key_table::PermissionSet; + +use model010::bucket_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { // Primary key pub name: String, - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, + pub state: crdt::LWW, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + Deleted, + Present(crdt::LWWMap), +} - // Authorized keys - authorized_keys: Vec, +impl CRDT for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_ak) => { + if let BucketState::Present(ak) = self { + ak.merge(other_ak); + } + } + } + } } impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec, - ) -> Self { - let mut ret = Bucket { + pub fn new(name: String) -> Self { + let ret = Bucket { name, - timestamp, - deleted, - authorized_keys: vec![], + state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); - } ret } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), - } + pub fn is_deleted(&self) -> bool { + *self.state.get() == BucketState::Deleted } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] - } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); + pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { + match self.state.get() { + BucketState::Deleted => &[], + BucketState::Present(ak) => ak.items(), + } } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - impl Entry for Bucket { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -75,36 +65,12 @@ impl Entry for Bucket { } fn merge(&mut self, other: &Self) { - if other.timestamp > self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } + self.state.merge(&other.state); } } pub struct BucketTable; - #[async_trait] impl TableSchema for BucketTable { type P = EmptyKey; @@ -117,6 +83,35 @@ impl TableSchema for BucketTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.is_deleted()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + if old.deleted { + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), + }) + } else { + let mut keys = crdt::LWWMap::new(); + for ak in old.authorized_keys() { + keys.merge(&crdt::LWWMap::migrate_from_raw_item( + ak.key_id.clone(), + ak.timestamp, + PermissionSet { + allow_read: ak.allow_read, + allow_write: ak.allow_write, + }, + )); + } + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)), + }) + } } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 2b825aa3..ff9d7b79 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use garage_table::*; use garage_table::crdt::CRDT; +use garage_table::*; use garage_util::error::Error; @@ -24,7 +24,6 @@ pub struct Key { // Authorized keys pub authorized_buckets: crdt::LWWMap, - // CRDT interaction: deleted implies authorized_buckets is empty } @@ -125,10 +124,11 @@ impl TableSchema for KeyTable { let it = crdt::LWWMap::migrate_from_raw_item( ab.bucket.clone(), ab.timestamp, - PermissionSet{ + PermissionSet { allow_read: ab.allow_read, allow_write: ab.allow_write, - }); + }, + ); new.authorized_buckets.merge(&it); } Some(new) -- cgit v1.2.3 From 68be5072e5b6cdf0192a3fbc92713047ccf0d372 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:20:20 +0100 Subject: simplify --- src/model/bucket_table.rs | 5 ++--- src/model/key_table.rs | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 93421acb..b7f24d71 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -39,11 +39,10 @@ impl CRDT for BucketState { impl Bucket { pub fn new(name: String) -> Self { - let ret = Bucket { + Bucket { name, state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), - }; - ret + } } pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted diff --git a/src/model/key_table.rs b/src/model/key_table.rs index ff9d7b79..20da3cc6 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -31,14 +31,13 @@ impl Key { pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let ret = Self { + Self { key_id, secret_key, name: crdt::LWW::new(name), deleted: crdt::Bool::new(false), authorized_buckets: crdt::LWWMap::new(), - }; - ret + } } pub fn delete(key_id: String) -> Self { Self { -- cgit v1.2.3