From e02e9e035e269cd6b660c92706db424a2edb5306 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 21:15:24 +0100 Subject: Begin improve model to use better CRDTs --- src/api/signature.rs | 2 +- src/garage/admin_rpc.rs | 42 +++++------- src/model/key_table.rs | 121 ++++++++++++++------------------- src/table/crdt.rs | 175 ++++++++++++++++++++++++++++++++++++++++++++++++ src/table/lib.rs | 1 + 5 files changed, 245 insertions(+), 96 deletions(-) create mode 100644 src/table/crdt.rs (limited to 'src') diff --git a/src/api/signature.rs b/src/api/signature.rs index 402b1881..0ee47961 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -68,7 +68,7 @@ pub async fn check_signature( .key_table .get(&EmptyKey, &authorization.key_id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .ok_or(Error::Forbidden(format!( "No such key: {}", authorization.key_id diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index 778e4a1d..bd9fca49 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -6,6 +6,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_table::*; +use garage_table::crdt::CRDT; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -114,7 +115,7 @@ impl AdminRpcHandler { // --- done checking, now commit --- for ak in bucket.authorized_keys() { if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { - if !key.deleted { + if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } @@ -173,7 +174,7 @@ impl AdminRpcHandler { .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) .await? .iter() - .map(|k| (k.key_id.to_string(), k.name.to_string())) + .map(|k| (k.key_id.to_string(), k.name.get().clone())) .collect::>(); Ok(AdminRPC::KeyList(key_ids)) } @@ -182,14 +183,13 @@ impl AdminRpcHandler { Ok(AdminRPC::KeyInfo(key)) } KeyOperation::New(query) => { - let key = Key::new(query.name, vec![]); + let key = Key::new(query.name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_id).await?; - key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); - key.name = query.new_name; + key.name.update(query.new_name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } @@ -201,16 +201,16 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ab in key.authorized_buckets().iter() { + for (ab_name, _, _) in key.authorized_buckets.items().iter() { if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? + self.garage.bucket_table.get(&EmptyKey, ab_name).await? { if !bucket.deleted { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } } else { - return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + return Err(Error::Message(format!("Bucket not found: {}", ab_name))); } } let del_key = Key::delete(key.key_id); @@ -241,7 +241,7 @@ impl AdminRpcHandler { .key_table .get(&EmptyKey, id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id)))) } @@ -281,22 +281,14 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match key - .authorized_buckets() - .iter() - .find(|x| x.bucket == *bucket) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - key.clear_buckets(); - key.add_bucket(AllowedBucket { - bucket: bucket.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); + let old_map = key.authorized_buckets.take_and_clear(); + key.authorized_buckets.merge( + &old_map.update_mutator( + bucket.clone(), + PermissionSet{ + allow_read, allow_write + } + )); self.garage.key_table.insert(&key).await?; Ok(()) } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 05b938ce..2b825aa3 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -2,9 +2,12 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_table::*; -use garage_util::data::*; +use garage_table::crdt::CRDT; + use garage_util::error::Error; +use model010::key_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -14,83 +17,56 @@ pub struct Key { pub secret_key: String, // Name - pub name: String, - pub name_timestamp: u64, + pub name: crdt::LWW, // Deletion - pub deleted: bool, + pub deleted: crdt::Bool, // Authorized keys - authorized_buckets: Vec, + pub authorized_buckets: crdt::LWWMap, + + // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { - pub fn new(name: String, buckets: Vec) -> Self { + pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { + let ret = Self { key_id, secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], + name: crdt::LWW::new(name), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); - } ret } pub fn delete(key_id: String) -> Self { Self { key_id, secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], + name: crdt::LWW::new("".to_string()), + deleted: crdt::Bool::new(true), + authorized_buckets: crdt::LWWMap::new(), } } /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_write) .unwrap_or(false) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { pub allow_read: bool, pub allow_write: bool, } @@ -104,35 +80,15 @@ impl Entry for Key { } fn merge(&mut self, other: &Self) { - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); - if other.deleted { - self.deleted = true; - } - if self.deleted { + if self.deleted.get() { self.authorized_buckets.clear(); return; } - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } + self.authorized_buckets.merge(&other.authorized_buckets); } } @@ -150,6 +106,31 @@ impl TableSchema for KeyTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let mut new = Self::E { + key_id: old.key_id.clone(), + secret_key: old.secret_key.clone(), + name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), + deleted: crdt::Bool::new(old.deleted), + authorized_buckets: crdt::LWWMap::new(), + }; + for ab in old.authorized_buckets() { + let it = crdt::LWWMap::migrate_from_raw_item( + ab.bucket.clone(), + ab.timestamp, + PermissionSet{ + allow_read: ab.allow_read, + allow_write: ab.allow_write, + }); + new.authorized_buckets.merge(&it); + } + Some(new) } } diff --git a/src/table/crdt.rs b/src/table/crdt.rs new file mode 100644 index 00000000..8f5e4d71 --- /dev/null +++ b/src/table/crdt.rs @@ -0,0 +1,175 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +pub trait CRDT { + fn merge(&mut self, other: &Self); +} + +impl CRDT for T +where T: Ord + Clone { + fn merge(&mut self, other: &Self) { + if other > self { + *self = other.clone(); + } + } +} + +// ---- LWW Register ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWW +{ + ts: u64, + v: T, +} + +impl LWW +where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +{ + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { + ts, + v: value, + } + } + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + pub fn get(&self) -> &T { + &self.v + } +} + +impl CRDT for LWW +where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +{ + fn merge(&mut self, other: &Self) { + if other.ts > self.ts { + self.ts = other.ts; + self.v = other.v.clone(); + } else if other.ts == self.ts { + self.v.merge(&other.v); + } + } +} + + +// ---- Boolean (true as absorbing state) ---- + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + pub fn new(b: bool) -> Self { + Self(b) + } + pub fn set(&mut self) { + self.0 = true; + } + pub fn get(&self) -> bool { + self.0 + } +} + +impl CRDT for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} + + +// ---- LWW Map ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWWMap +{ + vals: Vec<(K, u64, V)>, +} + +impl LWWMap +where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +{ + pub fn new() -> Self { + Self{ + vals: vec![], + } + } + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self{ + vals: vec![(k, ts, v)], + } + } + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self{vals} + } + pub fn clear(&mut self) { + self.vals.clear(); + } + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts+1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => { + vec![(k, now_msec(), new_v)] + } + }; + Self{ + vals: new_vals, + } + } + pub fn get(&self, k: &K) -> Option<&V> { + match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None + } + } + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } +} + +impl CRDT for LWWMap +where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => { + let (_, ts1, v1) = &self.vals[i]; + if ts2 > ts1 { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } else if ts1 == ts2 { + self.vals[i].2.merge(&v2); + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 7684fe9d..e2bf1f46 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -5,6 +5,7 @@ extern crate log; pub mod schema; pub mod util; +pub mod crdt; pub mod table; pub mod table_fullcopy; -- cgit v1.2.3 From f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:01:12 +0100 Subject: Convert bucket table to better CRDT representation --- src/api/s3_put.rs | 2 +- src/garage/admin_rpc.rs | 105 ++++++++++++++++------------------- src/model/block.rs | 2 +- src/model/bucket_table.rs | 137 ++++++++++++++++++++++------------------------ src/model/key_table.rs | 8 +-- src/table/crdt.rs | 73 ++++++++++-------------- src/table/lib.rs | 4 +- src/table/schema.rs | 4 -- 8 files changed, 150 insertions(+), 185 deletions(-) (limited to 'src') diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 72613323..a1681d77 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -322,7 +322,7 @@ pub async fn handle_put_part( let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; + let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; if !object diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index bd9fca49..c2b2f22c 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -2,11 +2,10 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use garage_util::data::*; use garage_util::error::Error; -use garage_table::*; use garage_table::crdt::CRDT; +use garage_table::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -80,25 +79,26 @@ impl AdminRpcHandler { Ok(AdminRPC::BucketInfo(bucket)) } BucketOperation::Create(query) => { - let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; - if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRPC(format!( - "Bucket {} already exists", - query.name - ))); - } - let new_time = match bucket { - Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), - None => now_msec(), + let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { + Some(mut bucket) => { + if !bucket.is_deleted() { + return Err(Error::BadRPC(format!( + "Bucket {} already exists", + query.name + ))); + } + bucket + .state + .update(BucketState::Present(crdt::LWWMap::new())); + bucket + } + None => Bucket::new(query.name.clone()), }; - self.garage - .bucket_table - .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) - .await?; + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; + let mut bucket = self.get_existing_bucket(&query.name).await?; let objects = self .garage .object_table @@ -113,25 +113,18 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ak in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { + for (key_id, _, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } } else { - return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + return Err(Error::Message(format!("Key not found: {}", key_id))); } } - self.garage - .bucket_table - .insert(&Bucket::new( - query.name.clone(), - std::cmp::max(bucket.timestamp + 1, now_msec()), - true, - vec![], - )) - .await?; + bucket.state.update(BucketState::Deleted); + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } BucketOperation::Allow(query) => { @@ -202,10 +195,8 @@ impl AdminRpcHandler { } // --- done checking, now commit --- for (ab_name, _, _) in key.authorized_buckets.items().iter() { - if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, ab_name).await? - { - if !bucket.deleted { + if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { + if !bucket.is_deleted() { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } @@ -228,7 +219,7 @@ impl AdminRpcHandler { .bucket_table .get(&EmptyKey, bucket) .await? - .filter(|b| !b.deleted) + .filter(|b| !b.is_deleted()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!( "Bucket {} does not exist", @@ -253,24 +244,20 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match bucket - .authorized_keys() - .iter() - .find(|x| x.key_id == *key_id) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - bucket.clear_keys(); - bucket - .add_key(AllowedKey { - key_id: key_id.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.bucket_table.insert(&bucket).await?; + if let BucketState::Present(ak) = bucket.state.get_mut() { + let old_ak = ak.take_and_clear(); + ak.merge(&old_ak.update_mutator( + key_id.to_string(), + PermissionSet { + allow_read, + allow_write, + }, + )); + } else { + return Err(Error::Message(format!( + "Bucket is deleted in update_bucket_key" + ))); + } Ok(()) } @@ -282,13 +269,13 @@ impl AdminRpcHandler { allow_write: bool, ) -> Result<(), Error> { let old_map = key.authorized_buckets.take_and_clear(); - key.authorized_buckets.merge( - &old_map.update_mutator( - bucket.clone(), - PermissionSet{ - allow_read, allow_write - } - )); + key.authorized_buckets.merge(&old_map.update_mutator( + bucket.clone(), + PermissionSet { + allow_read, + allow_write, + }, + )); self.garage.key_table.insert(&key).await?; Ok(()) } diff --git a/src/model/block.rs b/src/model/block.rs index 6a5d9c5b..8a513a3c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use garage_table::table_sharded::TableShardedReplication; -use garage_table::{TableReplication, DeletedFilter}; +use garage_table::{DeletedFilter, TableReplication}; use crate::block_ref_table::*; diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 35c0cc27..93421acb 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,71 +1,61 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; + use garage_util::error::Error; +use crate::key_table::PermissionSet; + +use model010::bucket_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { // Primary key pub name: String, - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, + pub state: crdt::LWW, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + Deleted, + Present(crdt::LWWMap), +} - // Authorized keys - authorized_keys: Vec, +impl CRDT for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_ak) => { + if let BucketState::Present(ak) = self { + ak.merge(other_ak); + } + } + } + } } impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec, - ) -> Self { - let mut ret = Bucket { + pub fn new(name: String) -> Self { + let ret = Bucket { name, - timestamp, - deleted, - authorized_keys: vec![], + state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); - } ret } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), - } + pub fn is_deleted(&self) -> bool { + *self.state.get() == BucketState::Deleted } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] - } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); + pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { + match self.state.get() { + BucketState::Deleted => &[], + BucketState::Present(ak) => ak.items(), + } } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - impl Entry for Bucket { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -75,36 +65,12 @@ impl Entry for Bucket { } fn merge(&mut self, other: &Self) { - if other.timestamp > self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } + self.state.merge(&other.state); } } pub struct BucketTable; - #[async_trait] impl TableSchema for BucketTable { type P = EmptyKey; @@ -117,6 +83,35 @@ impl TableSchema for BucketTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.is_deleted()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + if old.deleted { + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), + }) + } else { + let mut keys = crdt::LWWMap::new(); + for ak in old.authorized_keys() { + keys.merge(&crdt::LWWMap::migrate_from_raw_item( + ak.key_id.clone(), + ak.timestamp, + PermissionSet { + allow_read: ak.allow_read, + allow_write: ak.allow_write, + }, + )); + } + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)), + }) + } } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 2b825aa3..ff9d7b79 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use garage_table::*; use garage_table::crdt::CRDT; +use garage_table::*; use garage_util::error::Error; @@ -24,7 +24,6 @@ pub struct Key { // Authorized keys pub authorized_buckets: crdt::LWWMap, - // CRDT interaction: deleted implies authorized_buckets is empty } @@ -125,10 +124,11 @@ impl TableSchema for KeyTable { let it = crdt::LWWMap::migrate_from_raw_item( ab.bucket.clone(), ab.timestamp, - PermissionSet{ + PermissionSet { allow_read: ab.allow_read, allow_write: ab.allow_write, - }); + }, + ); new.authorized_buckets.merge(&it); } Some(new) diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 8f5e4d71..7c888e3a 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -7,7 +7,9 @@ pub trait CRDT { } impl CRDT for T -where T: Ord + Clone { +where + T: Ord + Clone, +{ fn merge(&mut self, other: &Self) { if other > self { *self = other.clone(); @@ -18,14 +20,14 @@ where T: Ord + Clone { // ---- LWW Register ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW -{ +pub struct LWW { ts: u64, v: T, } impl LWW -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { pub fn new(value: T) -> Self { Self { @@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { - ts, - v: value, - } + Self { ts, v: value } } pub fn update(&mut self, new_value: T) { self.ts = std::cmp::max(self.ts + 1, now_msec()); @@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part pub fn get(&self) -> &T { &self.v } + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } } impl CRDT for LWW -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { @@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } - // ---- Boolean (true as absorbing state) ---- #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] @@ -85,61 +87,48 @@ impl CRDT for Bool { } } - // ---- LWW Map ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap -{ +pub struct LWWMap { vals: Vec<(K, u64, V)>, } impl LWWMap -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, { pub fn new() -> Self { - Self{ - vals: vec![], - } + Self { vals: vec![] } } pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self{ + Self { vals: vec![(k, ts, v)], } } pub fn take_and_clear(&mut self) -> Self { let vals = std::mem::replace(&mut self.vals, vec![]); - Self{vals} + Self { vals } } pub fn clear(&mut self) { self.vals.clear(); } pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts+1, now_msec()); + let new_ts = std::cmp::max(old_ts + 1, now_msec()); vec![(k, new_ts, new_v)] } - Err(_) => { - vec![(k, now_msec(), new_v)] - } + Err(_) => vec![(k, now_msec(), new_v)], }; - Self{ - vals: new_vals, - } + Self { vals: new_vals } } pub fn get(&self, k: &K) -> Option<&V> { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => Some(&self.vals[i].2), - Err(_) => None + Err(_) => None, } } pub fn items(&self) -> &[(K, u64, V)] { @@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } impl CRDT for LWWMap -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { - let (_, ts1, v1) = &self.vals[i]; + let (_, ts1, _v1) = &self.vals[i]; if ts2 > ts1 { self.vals[i].1 = *ts2; self.vals[i].2 = v2.clone(); diff --git a/src/table/lib.rs b/src/table/lib.rs index e2bf1f46..704f8f1e 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -3,9 +3,9 @@ #[macro_use] extern crate log; +pub mod crdt; pub mod schema; pub mod util; -pub mod crdt; pub mod table; pub mod table_fullcopy; @@ -13,5 +13,5 @@ pub mod table_sharded; pub mod table_sync; pub use schema::*; -pub use util::*; pub use table::*; +pub use util::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index 49cede0a..d2ec9450 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -20,7 +20,6 @@ impl PartitionKey for Hash { } } - pub trait SortKey { fn sort_key(&self) -> &[u8]; } @@ -37,7 +36,6 @@ impl SortKey for Hash { } } - pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { @@ -47,7 +45,6 @@ pub trait Entry: fn merge(&mut self, other: &Self); } - #[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync { true } } - -- cgit v1.2.3 From 173f0dbac98f7962c75663cf7ee37c700596b40d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:09:32 +0100 Subject: oops --- src/garage/admin_rpc.rs | 1 + src/table/crdt.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index c2b2f22c..a23d3e95 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -258,6 +258,7 @@ impl AdminRpcHandler { "Bucket is deleted in update_bucket_key" ))); } + self.garage.bucket_table.insert(&bucket).await?; Ok(()) } diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 7c888e3a..708d47f3 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -97,7 +97,7 @@ pub struct LWWMap { impl LWWMap where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { pub fn new() -> Self { Self { vals: vec![] } -- cgit v1.2.3 From 68be5072e5b6cdf0192a3fbc92713047ccf0d372 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:20:20 +0100 Subject: simplify --- src/model/bucket_table.rs | 5 ++--- src/model/key_table.rs | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 93421acb..b7f24d71 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -39,11 +39,10 @@ impl CRDT for BucketState { impl Bucket { pub fn new(name: String) -> Self { - let ret = Bucket { + Bucket { name, state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), - }; - ret + } } pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted diff --git a/src/model/key_table.rs b/src/model/key_table.rs index ff9d7b79..20da3cc6 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -31,14 +31,13 @@ impl Key { pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let ret = Self { + Self { key_id, secret_key, name: crdt::LWW::new(name), deleted: crdt::Bool::new(false), authorized_buckets: crdt::LWWMap::new(), - }; - ret + } } pub fn delete(key_id: String) -> Self { Self { -- cgit v1.2.3 From 4f7f1d1cb3023af30b0741b04c8b9bcd900f5cc7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:23:55 +0100 Subject: less type bounds --- src/table/crdt.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 708d47f3..2b903cf0 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -27,7 +27,7 @@ pub struct LWW { impl LWW where - T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, + T: CRDT, { pub fn new(value: T) -> Self { Self { @@ -52,7 +52,7 @@ where impl CRDT for LWW where - T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, + T: Clone + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { @@ -96,8 +96,8 @@ pub struct LWWMap { impl LWWMap where - K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, + K: Ord, + V: CRDT, { pub fn new() -> Self { Self { vals: vec![] } @@ -138,8 +138,8 @@ where impl CRDT for LWWMap where - K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, + K: Clone + Ord, + V: Clone + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { -- cgit v1.2.3 From f9be964c3f3efd10f1bd9cf752d839f8133edcc1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:33:30 +0100 Subject: Warning when cannot decode entry (data format incompatibilities) --- src/table/table.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/table/table.rs b/src/table/table.rs index 2beac3f4..54a42d34 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use log::warn; + use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; @@ -185,7 +187,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = Self::decode_entry(v_bytes.as_slice())?; + let v = self.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -241,7 +243,7 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = Self::decode_entry(entry_bytes.as_slice())?; + let entry = self.decode_entry(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -363,7 +365,7 @@ where let keep = match filter { None => true, Some(f) => { - let entry = Self::decode_entry(value.as_ref())?; + let entry = self.decode_entry(value.as_ref())?; F::matches_filter(&entry, f) } }; @@ -382,14 +384,14 @@ where let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { - let update = Self::decode_entry(update_bytes.as_slice())?; + let update = self.decode_entry(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { - let old_entry = Self::decode_entry(&prev_bytes) + let old_entry = self.decode_entry(&prev_bytes) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); @@ -437,7 +439,7 @@ where break; } if let Some(old_val) = self.store.remove(&key)? { - let old_entry = Self::decode_entry(&old_val)?; + let old_entry = self.decode_entry(&old_val)?; self.instance.updated(Some(old_entry), None).await?; self.system .background @@ -455,12 +457,15 @@ where ret } - fn decode_entry(bytes: &[u8]) -> Result { + fn decode_entry(&self, bytes: &[u8]) -> Result { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { Some(x) => Ok(x), - None => Err(e.into()), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + Err(e.into()) + } }, } } -- cgit v1.2.3 From a8b3c8fd5898400d64310f61bb1a41dd5fefe5ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:37:34 +0100 Subject: data hexdump in warning --- src/table/Cargo.toml | 1 + src/table/table.rs | 3 +++ 2 files changed, 4 insertions(+) (limited to 'src') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 1963f3da..945763fa 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.7" hex = "0.3" arc-swap = "0.4" log = "0.4" +hexdump = "0.1" sled = "0.31" diff --git a/src/table/table.rs b/src/table/table.rs index 54a42d34..5dfee3c8 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -464,6 +464,9 @@ where Some(x) => Ok(x), None => { warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } Err(e.into()) } }, -- cgit v1.2.3