diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/signature.rs | 2 | ||||
-rw-r--r-- | src/garage/admin_rpc.rs | 42 | ||||
-rw-r--r-- | src/model/key_table.rs | 121 | ||||
-rw-r--r-- | src/table/crdt.rs | 175 | ||||
-rw-r--r-- | src/table/lib.rs | 1 |
5 files changed, 245 insertions, 96 deletions
diff --git a/src/api/signature.rs b/src/api/signature.rs index 402b1881..0ee47961 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -68,7 +68,7 @@ pub async fn check_signature( .key_table .get(&EmptyKey, &authorization.key_id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .ok_or(Error::Forbidden(format!( "No such key: {}", authorization.key_id diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index 778e4a1d..bd9fca49 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -6,6 +6,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_table::*; +use garage_table::crdt::CRDT; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -114,7 +115,7 @@ impl AdminRpcHandler { // --- done checking, now commit --- for ak in bucket.authorized_keys() { if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { - if !key.deleted { + if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } @@ -173,7 +174,7 @@ impl AdminRpcHandler { .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) .await? .iter() - .map(|k| (k.key_id.to_string(), k.name.to_string())) + .map(|k| (k.key_id.to_string(), k.name.get().clone())) .collect::<Vec<_>>(); Ok(AdminRPC::KeyList(key_ids)) } @@ -182,14 +183,13 @@ impl AdminRpcHandler { Ok(AdminRPC::KeyInfo(key)) } KeyOperation::New(query) => { - let key = Key::new(query.name, vec![]); + let key = Key::new(query.name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_id).await?; - key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); - key.name = query.new_name; + key.name.update(query.new_name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } @@ -201,16 +201,16 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ab in key.authorized_buckets().iter() { + for (ab_name, _, _) in key.authorized_buckets.items().iter() { if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? + self.garage.bucket_table.get(&EmptyKey, ab_name).await? { if !bucket.deleted { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } } else { - return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + return Err(Error::Message(format!("Bucket not found: {}", ab_name))); } } let del_key = Key::delete(key.key_id); @@ -241,7 +241,7 @@ impl AdminRpcHandler { .key_table .get(&EmptyKey, id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id)))) } @@ -281,22 +281,14 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match key - .authorized_buckets() - .iter() - .find(|x| x.bucket == *bucket) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - key.clear_buckets(); - key.add_bucket(AllowedBucket { - bucket: bucket.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); + let old_map = key.authorized_buckets.take_and_clear(); + key.authorized_buckets.merge( + &old_map.update_mutator( + bucket.clone(), + PermissionSet{ + allow_read, allow_write + } + )); self.garage.key_table.insert(&key).await?; Ok(()) } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 05b938ce..2b825aa3 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -2,9 +2,12 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_table::*; -use garage_util::data::*; +use garage_table::crdt::CRDT; + use garage_util::error::Error; +use model010::key_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -14,83 +17,56 @@ pub struct Key { pub secret_key: String, // Name - pub name: String, - pub name_timestamp: u64, + pub name: crdt::LWW<String>, // Deletion - pub deleted: bool, + pub deleted: crdt::Bool, // Authorized keys - authorized_buckets: Vec<AllowedBucket>, + pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, + + // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { - pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self { + pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { + let ret = Self { key_id, secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], + name: crdt::LWW::new(name), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); - } ret } pub fn delete(key_id: String) -> Self { Self { key_id, secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], + name: crdt::LWW::new("".to_string()), + deleted: crdt::Bool::new(true), + authorized_buckets: crdt::LWWMap::new(), } } /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_write) .unwrap_or(false) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { pub allow_read: bool, pub allow_write: bool, } @@ -104,35 +80,15 @@ impl Entry<EmptyKey, String> for Key { } fn merge(&mut self, other: &Self) { - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); - if other.deleted { - self.deleted = true; - } - if self.deleted { + if self.deleted.get() { self.authorized_buckets.clear(); return; } - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } + self.authorized_buckets.merge(&other.authorized_buckets); } } @@ -150,6 +106,31 @@ impl TableSchema for KeyTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let mut new = Self::E { + key_id: old.key_id.clone(), + secret_key: old.secret_key.clone(), + name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), + deleted: crdt::Bool::new(old.deleted), + authorized_buckets: crdt::LWWMap::new(), + }; + for ab in old.authorized_buckets() { + let it = crdt::LWWMap::migrate_from_raw_item( + ab.bucket.clone(), + ab.timestamp, + PermissionSet{ + allow_read: ab.allow_read, + allow_write: ab.allow_write, + }); + new.authorized_buckets.merge(&it); + } + Some(new) } } diff --git a/src/table/crdt.rs b/src/table/crdt.rs new file mode 100644 index 00000000..8f5e4d71 --- /dev/null +++ b/src/table/crdt.rs @@ -0,0 +1,175 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +pub trait CRDT { + fn merge(&mut self, other: &Self); +} + +impl<T> CRDT for T +where T: Ord + Clone { + fn merge(&mut self, other: &Self) { + if other > self { + *self = other.clone(); + } + } +} + +// ---- LWW Register ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWW<T> +{ + ts: u64, + v: T, +} + +impl<T> LWW<T> +where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +{ + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { + ts, + v: value, + } + } + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + pub fn get(&self) -> &T { + &self.v + } +} + +impl<T> CRDT for LWW<T> +where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +{ + fn merge(&mut self, other: &Self) { + if other.ts > self.ts { + self.ts = other.ts; + self.v = other.v.clone(); + } else if other.ts == self.ts { + self.v.merge(&other.v); + } + } +} + + +// ---- Boolean (true as absorbing state) ---- + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + pub fn new(b: bool) -> Self { + Self(b) + } + pub fn set(&mut self) { + self.0 = true; + } + pub fn get(&self) -> bool { + self.0 + } +} + +impl CRDT for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} + + +// ---- LWW Map ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWWMap<K, V> +{ + vals: Vec<(K, u64, V)>, +} + +impl<K, V> LWWMap<K, V> +where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +{ + pub fn new() -> Self { + Self{ + vals: vec![], + } + } + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self{ + vals: vec![(k, ts, v)], + } + } + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self{vals} + } + pub fn clear(&mut self) { + self.vals.clear(); + } + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts+1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => { + vec![(k, now_msec(), new_v)] + } + }; + Self{ + vals: new_vals, + } + } + pub fn get(&self, k: &K) -> Option<&V> { + match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None + } + } + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } +} + +impl<K, V> CRDT for LWWMap<K, V> +where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => { + let (_, ts1, v1) = &self.vals[i]; + if ts2 > ts1 { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } else if ts1 == ts2 { + self.vals[i].2.merge(&v2); + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 7684fe9d..e2bf1f46 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -5,6 +5,7 @@ extern crate log; pub mod schema; pub mod util; +pub mod crdt; pub mod table; pub mod table_fullcopy; |