From e02e9e035e269cd6b660c92706db424a2edb5306 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 21:15:24 +0100 Subject: Begin improve model to use better CRDTs --- src/garage/admin_rpc.rs | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index 778e4a1d..bd9fca49 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -6,6 +6,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_table::*; +use garage_table::crdt::CRDT; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -114,7 +115,7 @@ impl AdminRpcHandler { // --- done checking, now commit --- for ak in bucket.authorized_keys() { if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { - if !key.deleted { + if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } @@ -173,7 +174,7 @@ impl AdminRpcHandler { .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) .await? .iter() - .map(|k| (k.key_id.to_string(), k.name.to_string())) + .map(|k| (k.key_id.to_string(), k.name.get().clone())) .collect::>(); Ok(AdminRPC::KeyList(key_ids)) } @@ -182,14 +183,13 @@ impl AdminRpcHandler { Ok(AdminRPC::KeyInfo(key)) } KeyOperation::New(query) => { - let key = Key::new(query.name, vec![]); + let key = Key::new(query.name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_id).await?; - key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); - key.name = query.new_name; + key.name.update(query.new_name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } @@ -201,16 +201,16 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ab in key.authorized_buckets().iter() { + for (ab_name, _, _) in key.authorized_buckets.items().iter() { if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? + self.garage.bucket_table.get(&EmptyKey, ab_name).await? { if !bucket.deleted { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } } else { - return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + return Err(Error::Message(format!("Bucket not found: {}", ab_name))); } } let del_key = Key::delete(key.key_id); @@ -241,7 +241,7 @@ impl AdminRpcHandler { .key_table .get(&EmptyKey, id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id)))) } @@ -281,22 +281,14 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match key - .authorized_buckets() - .iter() - .find(|x| x.bucket == *bucket) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - key.clear_buckets(); - key.add_bucket(AllowedBucket { - bucket: bucket.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); + let old_map = key.authorized_buckets.take_and_clear(); + key.authorized_buckets.merge( + &old_map.update_mutator( + bucket.clone(), + PermissionSet{ + allow_read, allow_write + } + )); self.garage.key_table.insert(&key).await?; Ok(()) } -- cgit v1.2.3 From f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:01:12 +0100 Subject: Convert bucket table to better CRDT representation --- src/garage/admin_rpc.rs | 105 +++++++++++++++++++++--------------------------- 1 file changed, 46 insertions(+), 59 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index bd9fca49..c2b2f22c 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -2,11 +2,10 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use garage_util::data::*; use garage_util::error::Error; -use garage_table::*; use garage_table::crdt::CRDT; +use garage_table::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -80,25 +79,26 @@ impl AdminRpcHandler { Ok(AdminRPC::BucketInfo(bucket)) } BucketOperation::Create(query) => { - let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; - if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRPC(format!( - "Bucket {} already exists", - query.name - ))); - } - let new_time = match bucket { - Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), - None => now_msec(), + let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { + Some(mut bucket) => { + if !bucket.is_deleted() { + return Err(Error::BadRPC(format!( + "Bucket {} already exists", + query.name + ))); + } + bucket + .state + .update(BucketState::Present(crdt::LWWMap::new())); + bucket + } + None => Bucket::new(query.name.clone()), }; - self.garage - .bucket_table - .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) - .await?; + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; + let mut bucket = self.get_existing_bucket(&query.name).await?; let objects = self .garage .object_table @@ -113,25 +113,18 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ak in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { + for (key_id, _, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } } else { - return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + return Err(Error::Message(format!("Key not found: {}", key_id))); } } - self.garage - .bucket_table - .insert(&Bucket::new( - query.name.clone(), - std::cmp::max(bucket.timestamp + 1, now_msec()), - true, - vec![], - )) - .await?; + bucket.state.update(BucketState::Deleted); + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } BucketOperation::Allow(query) => { @@ -202,10 +195,8 @@ impl AdminRpcHandler { } // --- done checking, now commit --- for (ab_name, _, _) in key.authorized_buckets.items().iter() { - if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, ab_name).await? - { - if !bucket.deleted { + if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { + if !bucket.is_deleted() { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } @@ -228,7 +219,7 @@ impl AdminRpcHandler { .bucket_table .get(&EmptyKey, bucket) .await? - .filter(|b| !b.deleted) + .filter(|b| !b.is_deleted()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!( "Bucket {} does not exist", @@ -253,24 +244,20 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match bucket - .authorized_keys() - .iter() - .find(|x| x.key_id == *key_id) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - bucket.clear_keys(); - bucket - .add_key(AllowedKey { - key_id: key_id.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.bucket_table.insert(&bucket).await?; + if let BucketState::Present(ak) = bucket.state.get_mut() { + let old_ak = ak.take_and_clear(); + ak.merge(&old_ak.update_mutator( + key_id.to_string(), + PermissionSet { + allow_read, + allow_write, + }, + )); + } else { + return Err(Error::Message(format!( + "Bucket is deleted in update_bucket_key" + ))); + } Ok(()) } @@ -282,13 +269,13 @@ impl AdminRpcHandler { allow_write: bool, ) -> Result<(), Error> { let old_map = key.authorized_buckets.take_and_clear(); - key.authorized_buckets.merge( - &old_map.update_mutator( - bucket.clone(), - PermissionSet{ - allow_read, allow_write - } - )); + key.authorized_buckets.merge(&old_map.update_mutator( + bucket.clone(), + PermissionSet { + allow_read, + allow_write, + }, + )); self.garage.key_table.insert(&key).await?; Ok(()) } -- cgit v1.2.3 From 173f0dbac98f7962c75663cf7ee37c700596b40d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:09:32 +0100 Subject: oops --- src/garage/admin_rpc.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/garage') diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index c2b2f22c..a23d3e95 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -258,6 +258,7 @@ impl AdminRpcHandler { "Bucket is deleted in update_bucket_key" ))); } + self.garage.bucket_table.insert(&bucket).await?; Ok(()) } -- cgit v1.2.3