diff options
author | Alex Auvolat <alex@adnab.me> | 2022-01-03 17:22:40 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-01-04 12:52:46 +0100 |
commit | e59c23a69df116737c428ccbfbe4dfeff4d956d5 (patch) | |
tree | 04e758a80943ba3bba9ea1e8b89867ed18dea242 | |
parent | 2140cd72054ac6e3a94cbe5931727159de20a97f (diff) | |
download | garage-e59c23a69df116737c428ccbfbe4dfeff4d956d5.tar.gz garage-e59c23a69df116737c428ccbfbe4dfeff4d956d5.zip |
Refactor logic for setting/unsetting aliases
-rw-r--r-- | src/garage/admin.rs | 482 | ||||
-rw-r--r-- | src/model/bucket_alias_table.rs | 9 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 386 | ||||
-rw-r--r-- | src/model/migrate.rs | 95 | ||||
-rw-r--r-- | src/model/permission.rs | 11 |
5 files changed, 570 insertions, 413 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 9328f46e..0c1e58f8 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::*; use garage_util::data::*; -use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_table::replication::*; @@ -28,8 +28,6 @@ use crate::repair::Repair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; -macro_rules! INVALID_BUCKET_NAME_MESSAGE { () => { "Invalid bucket name: {}. See AWS documentation for constraints on S3 bucket names:\nhttps://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html" }; } - #[derive(Debug, Serialize, Deserialize)] pub enum AdminRpc { BucketOperation(BucketOperation), @@ -134,47 +132,54 @@ impl AdminRpcHandler { #[allow(clippy::ptr_arg)] async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> { - let mut bucket = Bucket::new(); - let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? { - Some(mut alias) => { - if !alias.state.get().is_deleted() { - return Err(Error::BadRequest(format!("Bucket {} already exists", name))); - } - alias.state.update(Deletable::Present(AliasParams { - bucket_id: bucket.id, - })); - alias + if !is_valid_bucket_name(name) { + return Err(Error::BadRequest(format!( + "{}: {}", + name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? { + if !alias.state.get().is_deleted() { + return Err(Error::BadRequest(format!("Bucket {} already exists", name))); } - None => BucketAlias::new(name.clone(), bucket.id) - .ok_or_bad_request(format!(INVALID_BUCKET_NAME_MESSAGE!(), name))?, - }; - bucket.state.as_option_mut().unwrap().aliases.merge_raw( - name, - alias.state.timestamp(), - &true, - ); + } + + // ---- done checking, now commit ---- + + let bucket = Bucket::new(); self.garage.bucket_table.insert(&bucket).await?; - self.garage.bucket_alias_table.insert(&alias).await?; + + self.garage + .bucket_helper() + .set_global_bucket_alias(bucket.id, name) + .await?; + Ok(AdminRpc::Ok(format!("Bucket {} was created.", name))) } async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> { - let mut bucket_alias = self + let helper = self.garage.bucket_helper(); + + let bucket_id = helper + .resolve_global_bucket_name(&query.name) + .await? + .ok_or_bad_request("Bucket not found")?; + + // Get the alias, but keep in minde here the bucket name + // given in parameter can also be directly the bucket's ID. + // In that case bucket_alias will be None, and + // we can still delete the bucket if it has zero aliases + // (a condition which we try to prevent but that could still happen somehow). + // We just won't try to delete an alias entry because there isn't one. + let bucket_alias = self .garage .bucket_alias_table .get(&EmptyKey, &query.name) - .await? - .filter(|a| !a.is_deleted()) - .ok_or_bad_request(format!("Bucket {} does not exist", query.name))?; - - let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id; + .await?; // Check bucket doesn't have other aliases - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; + let mut bucket = helper.get_existing_bucket(bucket_id).await?; let bucket_state = bucket.state.as_option().unwrap(); if bucket_state .aliases @@ -216,18 +221,18 @@ impl AdminRpcHandler { // --- done checking, now commit --- // 1. delete authorization from keys that had access for (key_id, _) in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { - if !key.state.is_deleted() { - self.update_key_bucket(&key, bucket.id, false, false, false) - .await?; - } - } else { - return Err(Error::BadRequest(format!("Key not found: {}", key_id))); - } + helper + .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::no_permissions()) + .await?; } + // 2. delete bucket alias - bucket_alias.state.update(Deletable::Deleted); - self.garage.bucket_alias_table.insert(&bucket_alias).await?; + if bucket_alias.is_some() { + helper + .unset_global_bucket_alias(bucket_id, &query.name) + .await?; + } + // 3. delete bucket bucket.state = Deletable::delete(); self.garage.bucket_table.insert(&bucket).await?; @@ -236,125 +241,39 @@ impl AdminRpcHandler { } async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> { - let bucket_id = self - .garage - .bucket_helper() + let helper = self.garage.bucket_helper(); + + let bucket_id = helper .resolve_global_bucket_name(&query.existing_bucket) .await? .ok_or_bad_request("Bucket not found")?; - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - if let Some(key_local) = &query.local { - let mut key = self.get_existing_key(key_local).await?; - let mut key_param = key.state.as_option_mut().unwrap(); - - if let Some(Deletable::Present(existing_alias)) = - key_param.local_aliases.get(&query.new_name) - { - if *existing_alias == bucket_id { - return Ok(AdminRpc::Ok(format!( - "Alias {} already points to bucket {:?} in namespace of key {}", - query.new_name, bucket_id, key.key_id - ))); - } else { - return Err(Error::BadRequest(format!("Alias {} already exists and points to different bucket: {:?} in namespace of key {}", query.new_name, existing_alias, key.key_id))); - } - } - if !is_valid_bucket_name(&query.new_name) { - return Err(Error::BadRequest(format!( - INVALID_BUCKET_NAME_MESSAGE!(), - query.new_name - ))); - } - - // Checks ok, add alias - let mut bucket_p = bucket.state.as_option_mut().unwrap(); - let bucket_p_local_alias_key = (key.key_id.clone(), query.new_name.clone()); - - // Calculate the timestamp to assign to this aliasing in the two local_aliases maps - // (the one from key to bucket, and the reverse one stored in the bucket iself) - // so that merges on both maps in case of a concurrent operation resolve - // to the same alias being set - let alias_ts = increment_logical_clock_2( - key_param.local_aliases.get_timestamp(&query.new_name), - bucket_p - .local_aliases - .get_timestamp(&bucket_p_local_alias_key), - ); - - key_param.local_aliases = LwwMap::raw_item( - query.new_name.clone(), - alias_ts, - Deletable::present(bucket_id), - ); - self.garage.key_table.insert(&key).await?; - - bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); - self.garage.bucket_table.insert(&bucket).await?; + if let Some(key_pattern) = &query.local { + let key = helper.get_existing_matching_key(key_pattern).await?; + helper + .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name) + .await?; Ok(AdminRpc::Ok(format!( - "Alias {} created to bucket {:?} in namespace of key {}", + "Alias {} now points to bucket {:?} in namespace of key {}", query.new_name, bucket_id, key.key_id ))) } else { - let alias = self - .garage - .bucket_alias_table - .get(&EmptyKey, &query.new_name) + helper + .set_global_bucket_alias(bucket_id, &query.new_name) .await?; - - if let Some(existing_alias) = alias.as_ref() { - if let Some(p) = existing_alias.state.get().as_option() { - if p.bucket_id == bucket_id { - return Ok(AdminRpc::Ok(format!( - "Alias {} already points to bucket {:?}", - query.new_name, bucket_id - ))); - } else { - return Err(Error::BadRequest(format!( - "Alias {} already exists and points to different bucket: {:?}", - query.new_name, p.bucket_id - ))); - } - } - } - - // Checks ok, add alias - let mut bucket_p = bucket.state.as_option_mut().unwrap(); - - let alias_ts = increment_logical_clock_2( - bucket_p.aliases.get_timestamp(&query.new_name), - alias.as_ref().map(|a| a.state.timestamp()).unwrap_or(0), - ); - - let alias = match alias { - None => BucketAlias::new(query.new_name.clone(), bucket_id) - .ok_or_bad_request(format!(INVALID_BUCKET_NAME_MESSAGE!(), query.new_name))?, - Some(mut a) => { - a.state = Lww::raw(alias_ts, Deletable::present(AliasParams { bucket_id })); - a - } - }; - self.garage.bucket_alias_table.insert(&alias).await?; - - bucket_p.aliases = LwwMap::raw_item(query.new_name.clone(), alias_ts, true); - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!( - "Alias {} created to bucket {:?}", + "Alias {} now points to bucket {:?}", query.new_name, bucket_id ))) } } async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> { - if let Some(key_local) = &query.local { - let mut key = self.get_existing_key(key_local).await?; + let helper = self.garage.bucket_helper(); + + if let Some(key_pattern) = &query.local { + let key = helper.get_existing_matching_key(key_pattern).await?; let bucket_id = key .state @@ -365,122 +284,56 @@ impl AdminRpcHandler { .map(|a| a.into_option()) .flatten() .ok_or_bad_request("Bucket not found")?; - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let mut bucket_p = bucket.state.as_option_mut().unwrap(); - - let has_other_aliases = bucket_p - .aliases - .items() - .iter() - .any(|(_, _, active)| *active) - || bucket_p - .local_aliases - .items() - .iter() - .any(|((k, n), _, active)| *k == key.key_id && *n == query.name && *active); - if !has_other_aliases { - return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name))); - } - - // Checks ok, remove alias - let mut key_param = key.state.as_option_mut().unwrap(); - let bucket_p_local_alias_key = (key.key_id.clone(), query.name.clone()); - let alias_ts = increment_logical_clock_2( - key_param.local_aliases.get_timestamp(&query.name), - bucket_p - .local_aliases - .get_timestamp(&bucket_p_local_alias_key), - ); - - key_param.local_aliases = - LwwMap::raw_item(query.name.clone(), alias_ts, Deletable::delete()); - self.garage.key_table.insert(&key).await?; - - bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); - self.garage.bucket_table.insert(&bucket).await?; + helper + .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name) + .await?; Ok(AdminRpc::Ok(format!( - "Bucket alias {} deleted from namespace of key {}", - query.name, key.key_id + "Alias {} no longer points to bucket {:?} in namespace of key {}", + &query.name, bucket_id, key.key_id ))) } else { - let bucket_id = self - .garage - .bucket_helper() + let bucket_id = helper .resolve_global_bucket_name(&query.name) .await? .ok_or_bad_request("Bucket not found")?; - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let mut bucket_state = bucket.state.as_option_mut().unwrap(); - - let has_other_aliases = bucket_state - .aliases - .items() - .iter() - .any(|(name, _, active)| *name != query.name && *active) - || bucket_state - .local_aliases - .items() - .iter() - .any(|(_, _, active)| *active); - if !has_other_aliases { - return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name))); - } - - let mut alias = self - .garage - .bucket_alias_table - .get(&EmptyKey, &query.name) - .await? - .ok_or_message("Internal error: alias not found")?; - - // Checks ok, remove alias - let alias_ts = increment_logical_clock_2( - alias.state.timestamp(), - bucket_state.aliases.get_timestamp(&query.name), - ); - - alias.state = Lww::raw(alias_ts, Deletable::delete()); - self.garage.bucket_alias_table.insert(&alias).await?; - bucket_state.aliases = LwwMap::raw_item(query.name.clone(), alias_ts, false); - self.garage.bucket_table.insert(&bucket).await?; + helper + .unset_global_bucket_alias(bucket_id, &query.name) + .await?; - Ok(AdminRpc::Ok(format!("Bucket alias {} deleted", query.name))) + Ok(AdminRpc::Ok(format!( + "Alias {} no longer points to bucket {:?}", + &query.name, bucket_id + ))) } } async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { - let bucket_id = self - .garage - .bucket_helper() + let helper = self.garage.bucket_helper(); + + let bucket_id = helper .resolve_global_bucket_name(&query.bucket) .await? .ok_or_bad_request("Bucket not found")?; - let bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let key = self.get_existing_key(&query.key_pattern).await?; + let key = helper.get_existing_matching_key(&query.key_pattern).await?; let allow_read = query.read || key.allow_read(&bucket_id); let allow_write = query.write || key.allow_write(&bucket_id); let allow_owner = query.owner || key.allow_owner(&bucket_id); - let new_perm = self - .update_key_bucket(&key, bucket_id, allow_read, allow_write, allow_owner) - .await?; - self.update_bucket_key(bucket, &key.key_id, new_perm) + helper + .set_bucket_key_permissions( + bucket_id, + &key.key_id, + BucketKeyPerm { + timestamp: now_msec(), + allow_read, + allow_write, + allow_owner, + }, + ) .await?; Ok(AdminRpc::Ok(format!( @@ -490,27 +343,29 @@ impl AdminRpcHandler { } async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { - let bucket_id = self - .garage - .bucket_helper() + let helper = self.garage.bucket_helper(); + + let bucket_id = helper .resolve_global_bucket_name(&query.bucket) .await? .ok_or_bad_request("Bucket not found")?; - let bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let key = self.get_existing_key(&query.key_pattern).await?; + let key = helper.get_existing_matching_key(&query.key_pattern).await?; let allow_read = !query.read && key.allow_read(&bucket_id); let allow_write = !query.write && key.allow_write(&bucket_id); let allow_owner = !query.owner && key.allow_owner(&bucket_id); - let new_perm = self - .update_key_bucket(&key, bucket_id, allow_read, allow_write, allow_owner) - .await?; - self.update_bucket_key(bucket, &key.key_id, new_perm) + helper + .set_bucket_key_permissions( + bucket_id, + &key.key_id, + BucketKeyPerm { + timestamp: now_msec(), + allow_read, + allow_write, + allow_owner, + }, + ) .await?; Ok(AdminRpc::Ok(format!( @@ -590,7 +445,11 @@ impl AdminRpcHandler { } async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> { - let key = self.get_existing_key(&query.key_pattern).await?; + let key = self + .garage + .bucket_helper() + .get_existing_matching_key(&query.key_pattern) + .await?; self.key_info_result(key).await } @@ -601,54 +460,43 @@ impl AdminRpcHandler { } async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> { - let mut key = self.get_existing_key(&query.key_pattern).await?; + let mut key = self + .garage + .bucket_helper() + .get_existing_matching_key(&query.key_pattern) + .await?; key.name.update(query.new_name.clone()); self.garage.key_table.insert(&key).await?; self.key_info_result(key).await } async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> { - let mut key = self.get_existing_key(&query.key_pattern).await?; + let helper = self.garage.bucket_helper(); + + let mut key = helper.get_existing_matching_key(&query.key_pattern).await?; + if !query.yes { return Err(Error::BadRequest( "Add --yes flag to really perform this operation".to_string(), )); } + let state = key.state.as_option_mut().unwrap(); // --- done checking, now commit --- // 1. Delete local aliases for (alias, _, to) in state.local_aliases.items().iter() { if let Deletable::Present(bucket_id) = to { - if let Some(mut bucket) = self.garage.bucket_table.get(bucket_id, &EmptyKey).await? - { - if let Deletable::Present(bucket_state) = &mut bucket.state { - bucket_state.local_aliases = bucket_state - .local_aliases - .update_mutator((key.key_id.to_string(), alias.to_string()), false); - self.garage.bucket_table.insert(&bucket).await?; - } - } else { - // ignore - } + helper + .unset_local_bucket_alias(*bucket_id, &key.key_id, alias) + .await?; } } // 2. Delete authorized buckets - for (ab_id, auth) in state.authorized_buckets.items().iter() { - if let Some(bucket) = self.garage.bucket_table.get(ab_id, &EmptyKey).await? { - let new_perm = BucketKeyPerm { - timestamp: increment_logical_clock(auth.timestamp), - allow_read: false, - allow_write: false, - allow_owner: false, - }; - if !bucket.is_deleted() { - self.update_bucket_key(bucket, &key.key_id, new_perm) - .await?; - } - } else { - // ignore - } + for (ab_id, _auth) in state.authorized_buckets.items().iter() { + helper + .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::no_permissions()) + .await?; } // 3. Actually delete key key.state = Deletable::delete(); @@ -671,30 +519,6 @@ impl AdminRpcHandler { self.key_info_result(imported_key).await } - async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> { - let candidates = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Matches(pattern.to_string())), - 10, - ) - .await? - .into_iter() - .filter(|k| !k.state.is_deleted()) - .collect::<Vec<_>>(); - if candidates.len() != 1 { - Err(Error::BadRequest(format!( - "{} matching keys", - candidates.len() - ))) - } else { - Ok(candidates.into_iter().next().unwrap()) - } - } - async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> { let mut relevant_buckets = HashMap::new(); @@ -714,54 +538,6 @@ impl AdminRpcHandler { Ok(AdminRpc::KeyInfo(key, relevant_buckets)) } - /// Update **key table** to inform of the new linked bucket - async fn update_key_bucket( - &self, - key: &Key, - bucket_id: Uuid, - allow_read: bool, - allow_write: bool, - allow_owner: bool, - ) -> Result<BucketKeyPerm, Error> { - let mut key = key.clone(); - let mut key_state = key.state.as_option_mut().unwrap(); - - let perm = key_state - .authorized_buckets - .get(&bucket_id) - .cloned() - .map(|old_perm| BucketKeyPerm { - timestamp: increment_logical_clock(old_perm.timestamp), - allow_read, - allow_write, - allow_owner, - }) - .unwrap_or(BucketKeyPerm { - timestamp: now_msec(), - allow_read, - allow_write, - allow_owner, - }); - - key_state.authorized_buckets = Map::put_mutator(bucket_id, perm); - - self.garage.key_table.insert(&key).await?; - Ok(perm) - } - - /// Update **bucket table** to inform of the new linked key - async fn update_bucket_key( - &self, - mut bucket: Bucket, - key_id: &str, - new_perm: BucketKeyPerm, - ) -> Result<(), Error> { - bucket.state.as_option_mut().unwrap().authorized_keys = - Map::put_mutator(key_id.to_string(), new_perm); - self.garage.bucket_table.insert(&bucket).await?; - Ok(()) - } - async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> { if !opt.yes { return Err(Error::BadRequest( diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index caae76f1..45807178 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::time::*; use garage_table::crdt::*; use garage_table::*; @@ -24,10 +23,7 @@ impl AutoCrdt for AliasParams { } impl BucketAlias { - pub fn new(name: String, bucket_id: Uuid) -> Option<Self> { - Self::raw(name, now_msec(), bucket_id) - } - pub fn raw(name: String, ts: u64, bucket_id: Uuid) -> Option<Self> { + pub fn new(name: String, ts: u64, bucket_id: Uuid) -> Option<Self> { if !is_valid_bucket_name(&name) { None } else { @@ -101,3 +97,6 @@ pub fn is_valid_bucket_name(n: &str) -> bool { // Bucket names must not end with "-s3alias" && !n.ends_with("-s3alias") } + +/// Error message to return for invalid bucket names +pub const INVALID_BUCKET_NAME_MESSAGE: &str = "Invalid bucket name. See AWS documentation for constraints on S3 bucket names:\nhttps://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html"; diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index e89a723d..bb4b5b24 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,14 +1,20 @@ use garage_table::util::EmptyKey; +use garage_util::crdt::*; use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::time::*; -use crate::bucket_table::Bucket; +use crate::bucket_alias_table::*; +use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::error::*; +use crate::key_table::{Key, KeyFilter}; +use crate::permission::BucketKeyPerm; pub struct BucketHelper<'a>(pub(crate) &'a Garage); +#[allow(clippy::ptr_arg)] impl<'a> BucketHelper<'a> { - #[allow(clippy::ptr_arg)] pub async fn resolve_global_bucket_name( &self, bucket_name: &String, @@ -45,12 +51,386 @@ impl<'a> BucketHelper<'a> { } } + /// Returns a Bucket if it is present in bucket table, + /// even if it is in deleted state. Querying a non-existing + /// bucket ID returns an internal error. + pub async fn get_internal_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> { + Ok(self + .0 + .bucket_table + .get(&bucket_id, &EmptyKey) + .await? + .ok_or_message(format!("Bucket {:?} does not exist", bucket_id))?) + } + + /// Returns a Bucket if it is present in bucket table, + /// only if it is in non-deleted state. + /// Querying a non-existing bucket ID or a deleted bucket + /// returns a bad request error. pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> { self.0 .bucket_table .get(&bucket_id, &EmptyKey) .await? .filter(|b| !b.is_deleted()) - .ok_or_bad_request(format!("Bucket {:?} does not exist", bucket_id)) + .ok_or_bad_request(format!( + "Bucket {:?} does not exist or has been deleted", + bucket_id + )) + } + + /// Returns a Key if it is present in key table, + /// even if it is in deleted state. Querying a non-existing + /// key ID returns an internal error. + pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> { + Ok(self + .0 + .key_table + .get(&EmptyKey, key_id) + .await? + .ok_or_message(format!("Key {} does not exist", key_id))?) + } + + /// Returns a Key if it is present in key table, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> { + self.0 + .key_table + .get(&EmptyKey, key_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id)) + } + + /// Returns a Key if it is present in key table, + /// looking it up by key ID or by a match on its name, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> { + let candidates = self + .0 + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Matches(pattern.to_string())), + 10, + ) + .await? + .into_iter() + .filter(|k| !k.state.is_deleted()) + .collect::<Vec<_>>(); + if candidates.len() != 1 { + Err(Error::BadRequest(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } + } + + /// Sets a new alias for a bucket in global namespace. + /// This function fails if: + /// - alias name is not valid according to S3 spec + /// - bucket does not exist or is deleted + /// - alias already exists and points to another bucket + pub async fn set_global_bucket_alias( + &self, + bucket_id: Uuid, + alias_name: &String, + ) -> Result<(), Error> { + if !is_valid_bucket_name(alias_name) { + return Err(Error::BadRequest(format!( + "{}: {}", + alias_name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let mut bucket = self.get_existing_bucket(bucket_id).await?; + + let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?; + + if let Some(existing_alias) = alias.as_ref() { + if let Some(p) = existing_alias.state.get().as_option() { + if p.bucket_id != bucket_id { + return Err(Error::BadRequest(format!( + "Alias {} already exists and points to different bucket: {:?}", + alias_name, p.bucket_id + ))); + } + } + } + + // Checks ok, add alias + let mut bucket_p = bucket.state.as_option_mut().unwrap(); + + let alias_ts = increment_logical_clock_2( + bucket_p.aliases.get_timestamp(alias_name), + alias.as_ref().map(|a| a.state.timestamp()).unwrap_or(0), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + let alias = match alias { + None => BucketAlias::new(alias_name.clone(), alias_ts, bucket_id) + .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?, + Some(mut a) => { + a.state = Lww::raw(alias_ts, Deletable::present(AliasParams { bucket_id })); + a + } + }; + self.0.bucket_alias_table.insert(&alias).await?; + + bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Unsets an alias for a bucket in global namespace. + /// This function fails if: + /// - bucket does not exist or is deleted + /// - alias does not exist or maps to another bucket (-> internal error) + /// - bucket has no other aliases (global or local) + pub async fn unset_global_bucket_alias( + &self, + bucket_id: Uuid, + alias_name: &String, + ) -> Result<(), Error> { + let mut bucket = self.get_existing_bucket(bucket_id).await?; + let mut bucket_state = bucket.state.as_option_mut().unwrap(); + + let mut alias = self + .0 + .bucket_alias_table + .get(&EmptyKey, alias_name) + .await? + .filter(|a| { + a.state + .get() + .as_option() + .map(|x| x.bucket_id == bucket_id) + .unwrap_or(false) + }) + .ok_or_message(format!( + "Internal error: alias not found or does not point to bucket {:?}", + bucket_id + ))?; + + let has_other_global_aliases = bucket_state + .aliases + .items() + .iter() + .any(|(name, _, active)| name != alias_name && *active); + let has_other_local_aliases = bucket_state + .local_aliases + .items() + .iter() + .any(|(_, _, active)| *active); + if !has_other_global_aliases && !has_other_local_aliases { + return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name))); + } + + // Checks ok, remove alias + let alias_ts = increment_logical_clock_2( + alias.state.timestamp(), + bucket_state.aliases.get_timestamp(alias_name), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + alias.state = Lww::raw(alias_ts, Deletable::delete()); + self.0.bucket_alias_table.insert(&alias).await?; + + bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Sets a new alias for a bucket in the local namespace of a key. + /// This function fails if: + /// - alias name is not valid according to S3 spec + /// - bucket does not exist or is deleted + /// - key does not exist or is deleted + /// - alias already exists and points to another bucket + pub async fn set_local_bucket_alias( + &self, + bucket_id: Uuid, + key_id: &String, + alias_name: &String, + ) -> Result<(), Error> { + if !is_valid_bucket_name(alias_name) { + return Err(Error::BadRequest(format!( + "{}: {}", + alias_name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let mut bucket = self.get_existing_bucket(bucket_id).await?; + let mut key = self.get_existing_key(key_id).await?; + + let mut key_param = key.state.as_option_mut().unwrap(); + + if let Some(Deletable::Present(existing_alias)) = key_param.local_aliases.get(alias_name) { + if *existing_alias != bucket_id { + return Err(Error::BadRequest(format!("Alias {} already exists in namespace of key {} and points to different bucket: {:?}", alias_name, key.key_id, existing_alias))); + } + } + + // Checks ok, add alias + let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); + + // Calculate the timestamp to assign to this aliasing in the two local_aliases maps + // (the one from key to bucket, and the reverse one stored in the bucket iself) + // so that merges on both maps in case of a concurrent operation resolve + // to the same alias being set + let alias_ts = increment_logical_clock_2( + key_param.local_aliases.get_timestamp(alias_name), + bucket_p + .local_aliases + .get_timestamp(&bucket_p_local_alias_key), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + key_param.local_aliases = + LwwMap::raw_item(alias_name.clone(), alias_ts, Deletable::present(bucket_id)); + self.0.key_table.insert(&key).await?; + + bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Unsets an alias for a bucket in the local namespace of a key. + /// This function fails if: + /// - bucket does not exist or is deleted + /// - key does not exist or is deleted + /// - alias does not exist or maps to another bucket (-> internal error) + /// - bucket has no other aliases (global or local) + pub async fn unset_local_bucket_alias( + &self, + bucket_id: Uuid, + key_id: &String, + alias_name: &String, + ) -> Result<(), Error> { + let mut bucket = self.get_existing_bucket(bucket_id).await?; + let mut key = self.get_existing_key(key_id).await?; + + let mut bucket_p = bucket.state.as_option_mut().unwrap(); + + if key + .state + .as_option() + .unwrap() + .local_aliases + .get(alias_name) + .map(|x| x.as_option()) + .flatten() != Some(&bucket_id) + { + return Err(GarageError::Message(format!( + "Bucket {:?} does not have alias {} in namespace of key {}", + bucket_id, alias_name, key_id + )) + .into()); + } + + let has_other_global_aliases = bucket_p + .aliases + .items() + .iter() + .any(|(_, _, active)| *active); + let has_other_local_aliases = bucket_p + .local_aliases + .items() + .iter() + .any(|((k, n), _, active)| *k == key.key_id && n == alias_name && *active); + if !has_other_global_aliases && !has_other_local_aliases { + return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name))); + } + + // Checks ok, remove alias + let mut key_param = key.state.as_option_mut().unwrap(); + let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); + + let alias_ts = increment_logical_clock_2( + key_param.local_aliases.get_timestamp(alias_name), + bucket_p + .local_aliases + .get_timestamp(&bucket_p_local_alias_key), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + key_param.local_aliases = + LwwMap::raw_item(alias_name.clone(), alias_ts, Deletable::delete()); + self.0.key_table.insert(&key).await?; + + bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Sets permissions for a key on a bucket. + /// This function fails if: + /// - bucket or key cannot be found at all (its ok if they are in deleted state) + /// - bucket or key is in deleted state and we are trying to set permissions other than "deny + /// all" + pub async fn set_bucket_key_permissions( + &self, + bucket_id: Uuid, + key_id: &String, + mut perm: BucketKeyPerm, + ) -> Result<(), Error> { + let mut bucket = self.get_internal_bucket(bucket_id).await?; + let mut key = self.get_internal_key(key_id).await?; + + let allow_any = perm.allow_read || perm.allow_write || perm.allow_owner; + + if let Some(bstate) = bucket.state.as_option() { + if let Some(kp) = bstate.authorized_keys.get(key_id) { + perm.timestamp = increment_logical_clock_2(perm.timestamp, kp.timestamp); + } + } else if allow_any { + return Err(Error::BadRequest( + "Trying to give permissions on a deleted bucket".into(), + )); + } + + if let Some(kstate) = key.state.as_option() { + if let Some(bp) = kstate.authorized_buckets.get(&bucket_id) { + perm.timestamp = increment_logical_clock_2(perm.timestamp, bp.timestamp); + } + } else if allow_any { + return Err(Error::BadRequest( + "Trying to give permissions to a deleted key".into(), + )); + } + + // ---- timestamp-ensured causality barrier ---- + + if let Some(bstate) = bucket.state.as_option_mut() { + bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm); + self.0.bucket_table.insert(&bucket).await?; + } + + if let Some(kstate) = key.state.as_option_mut() { + kstate.authorized_buckets = Map::put_mutator(bucket_id, perm); + self.0.key_table.insert(&key).await?; + } + + Ok(()) } } diff --git a/src/model/migrate.rs b/src/model/migrate.rs index a5508c4d..65140c4b 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -1,9 +1,8 @@ use std::sync::Arc; -use garage_table::util::EmptyKey; use garage_util::crdt::*; use garage_util::data::*; -use garage_util::error::*; +use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model_050::bucket_table as old_bucket; @@ -11,6 +10,7 @@ use garage_model_050::bucket_table as old_bucket; use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::garage::Garage; +use crate::helper::error::*; use crate::permission::*; pub struct Migrate { @@ -19,11 +19,16 @@ pub struct Migrate { impl Migrate { pub async fn migrate_buckets050(&self) -> Result<(), Error> { - let tree = self.garage.db.open_tree("bucket:table")?; + let tree = self + .garage + .db + .open_tree("bucket:table") + .map_err(GarageError::from)?; for res in tree.iter() { - let (_k, v) = res?; - let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])?; + let (_k, v) = res.map_err(GarageError::from)?; + let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) + .map_err(GarageError::from)?; if let old_bucket::BucketState::Present(p) = bucket.state.get() { self.migrate_buckets050_do_bucket(&bucket, p).await?; @@ -48,27 +53,6 @@ impl Migrate { hex::encode(&bucket_id.as_slice()[..16]) }; - let new_ak = old_bucket_p - .authorized_keys - .items() - .iter() - .map(|(k, ts, perm)| { - ( - k.to_string(), - BucketKeyPerm { - timestamp: *ts, - allow_read: perm.allow_read, - allow_write: perm.allow_write, - allow_owner: false, - }, - ) - }) - .collect::<Map<_, _>>(); - - let mut aliases = LwwMap::new(); - aliases.update_in_place(new_name.clone(), true); - let alias_ts = aliases.get_timestamp(&new_name); - let website = if *old_bucket_p.website.get() { Some(WebsiteConfig { index_document: "index.html".into(), @@ -78,32 +62,39 @@ impl Migrate { None }; - let new_bucket = Bucket { - id: bucket_id, - state: Deletable::Present(BucketParams { - creation_date: now_msec(), - authorized_keys: new_ak.clone(), - website_config: Lww::new(website), - aliases, - local_aliases: LwwMap::new(), - }), - }; - self.garage.bucket_table.insert(&new_bucket).await?; - - let new_alias = BucketAlias::raw(new_name.clone(), alias_ts, new_bucket.id).unwrap(); - self.garage.bucket_alias_table.insert(&new_alias).await?; - - for (k, perm) in new_ak.items().iter() { - let mut key = self - .garage - .key_table - .get(&EmptyKey, k) - .await? - .ok_or_message(format!("Missing key: {}", k))?; - if let Some(p) = key.state.as_option_mut() { - p.authorized_buckets.put(new_bucket.id, *perm); - } - self.garage.key_table.insert(&key).await?; + self.garage + .bucket_table + .insert(&Bucket { + id: bucket_id, + state: Deletable::Present(BucketParams { + creation_date: now_msec(), + authorized_keys: Map::new(), + website_config: Lww::new(website), + aliases: LwwMap::new(), + local_aliases: LwwMap::new(), + }), + }) + .await?; + + self.garage + .bucket_helper() + .set_global_bucket_alias(bucket_id, &new_name) + .await?; + + for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() { + self.garage + .bucket_helper() + .set_bucket_key_permissions( + bucket_id, + k, + BucketKeyPerm { + timestamp: *ts, + allow_read: perm.allow_read, + allow_write: perm.allow_write, + allow_owner: false, + }, + ) + .await?; } Ok(()) diff --git a/src/model/permission.rs b/src/model/permission.rs index ebb24a32..b8f7dd71 100644 --- a/src/model/permission.rs +++ b/src/model/permission.rs @@ -20,6 +20,17 @@ pub struct BucketKeyPerm { pub allow_owner: bool, } +impl BucketKeyPerm { + pub fn no_permissions() -> Self { + Self { + timestamp: 0, + allow_read: false, + allow_write: false, + allow_owner: false, + } + } +} + impl Crdt for BucketKeyPerm { fn merge(&mut self, other: &Self) { match other.timestamp.cmp(&self.timestamp) { |