diff options
Diffstat (limited to 'src/model/helper/bucket.rs')
-rw-r--r-- | src/model/helper/bucket.rs | 386 |
1 files changed, 383 insertions, 3 deletions
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index e89a723d..bb4b5b24 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,14 +1,20 @@ use garage_table::util::EmptyKey; +use garage_util::crdt::*; use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::time::*; -use crate::bucket_table::Bucket; +use crate::bucket_alias_table::*; +use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::error::*; +use crate::key_table::{Key, KeyFilter}; +use crate::permission::BucketKeyPerm; pub struct BucketHelper<'a>(pub(crate) &'a Garage); +#[allow(clippy::ptr_arg)] impl<'a> BucketHelper<'a> { - #[allow(clippy::ptr_arg)] pub async fn resolve_global_bucket_name( &self, bucket_name: &String, @@ -45,12 +51,386 @@ impl<'a> BucketHelper<'a> { } } + /// Returns a Bucket if it is present in bucket table, + /// even if it is in deleted state. Querying a non-existing + /// bucket ID returns an internal error. + pub async fn get_internal_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> { + Ok(self + .0 + .bucket_table + .get(&bucket_id, &EmptyKey) + .await? + .ok_or_message(format!("Bucket {:?} does not exist", bucket_id))?) + } + + /// Returns a Bucket if it is present in bucket table, + /// only if it is in non-deleted state. + /// Querying a non-existing bucket ID or a deleted bucket + /// returns a bad request error. pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> { self.0 .bucket_table .get(&bucket_id, &EmptyKey) .await? .filter(|b| !b.is_deleted()) - .ok_or_bad_request(format!("Bucket {:?} does not exist", bucket_id)) + .ok_or_bad_request(format!( + "Bucket {:?} does not exist or has been deleted", + bucket_id + )) + } + + /// Returns a Key if it is present in key table, + /// even if it is in deleted state. Querying a non-existing + /// key ID returns an internal error. + pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> { + Ok(self + .0 + .key_table + .get(&EmptyKey, key_id) + .await? + .ok_or_message(format!("Key {} does not exist", key_id))?) + } + + /// Returns a Key if it is present in key table, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> { + self.0 + .key_table + .get(&EmptyKey, key_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id)) + } + + /// Returns a Key if it is present in key table, + /// looking it up by key ID or by a match on its name, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> { + let candidates = self + .0 + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Matches(pattern.to_string())), + 10, + ) + .await? + .into_iter() + .filter(|k| !k.state.is_deleted()) + .collect::<Vec<_>>(); + if candidates.len() != 1 { + Err(Error::BadRequest(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } + } + + /// Sets a new alias for a bucket in global namespace. + /// This function fails if: + /// - alias name is not valid according to S3 spec + /// - bucket does not exist or is deleted + /// - alias already exists and points to another bucket + pub async fn set_global_bucket_alias( + &self, + bucket_id: Uuid, + alias_name: &String, + ) -> Result<(), Error> { + if !is_valid_bucket_name(alias_name) { + return Err(Error::BadRequest(format!( + "{}: {}", + alias_name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let mut bucket = self.get_existing_bucket(bucket_id).await?; + + let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?; + + if let Some(existing_alias) = alias.as_ref() { + if let Some(p) = existing_alias.state.get().as_option() { + if p.bucket_id != bucket_id { + return Err(Error::BadRequest(format!( + "Alias {} already exists and points to different bucket: {:?}", + alias_name, p.bucket_id + ))); + } + } + } + + // Checks ok, add alias + let mut bucket_p = bucket.state.as_option_mut().unwrap(); + + let alias_ts = increment_logical_clock_2( + bucket_p.aliases.get_timestamp(alias_name), + alias.as_ref().map(|a| a.state.timestamp()).unwrap_or(0), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + let alias = match alias { + None => BucketAlias::new(alias_name.clone(), alias_ts, bucket_id) + .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?, + Some(mut a) => { + a.state = Lww::raw(alias_ts, Deletable::present(AliasParams { bucket_id })); + a + } + }; + self.0.bucket_alias_table.insert(&alias).await?; + + bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Unsets an alias for a bucket in global namespace. + /// This function fails if: + /// - bucket does not exist or is deleted + /// - alias does not exist or maps to another bucket (-> internal error) + /// - bucket has no other aliases (global or local) + pub async fn unset_global_bucket_alias( + &self, + bucket_id: Uuid, + alias_name: &String, + ) -> Result<(), Error> { + let mut bucket = self.get_existing_bucket(bucket_id).await?; + let mut bucket_state = bucket.state.as_option_mut().unwrap(); + + let mut alias = self + .0 + .bucket_alias_table + .get(&EmptyKey, alias_name) + .await? + .filter(|a| { + a.state + .get() + .as_option() + .map(|x| x.bucket_id == bucket_id) + .unwrap_or(false) + }) + .ok_or_message(format!( + "Internal error: alias not found or does not point to bucket {:?}", + bucket_id + ))?; + + let has_other_global_aliases = bucket_state + .aliases + .items() + .iter() + .any(|(name, _, active)| name != alias_name && *active); + let has_other_local_aliases = bucket_state + .local_aliases + .items() + .iter() + .any(|(_, _, active)| *active); + if !has_other_global_aliases && !has_other_local_aliases { + return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name))); + } + + // Checks ok, remove alias + let alias_ts = increment_logical_clock_2( + alias.state.timestamp(), + bucket_state.aliases.get_timestamp(alias_name), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + alias.state = Lww::raw(alias_ts, Deletable::delete()); + self.0.bucket_alias_table.insert(&alias).await?; + + bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Sets a new alias for a bucket in the local namespace of a key. + /// This function fails if: + /// - alias name is not valid according to S3 spec + /// - bucket does not exist or is deleted + /// - key does not exist or is deleted + /// - alias already exists and points to another bucket + pub async fn set_local_bucket_alias( + &self, + bucket_id: Uuid, + key_id: &String, + alias_name: &String, + ) -> Result<(), Error> { + if !is_valid_bucket_name(alias_name) { + return Err(Error::BadRequest(format!( + "{}: {}", + alias_name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let mut bucket = self.get_existing_bucket(bucket_id).await?; + let mut key = self.get_existing_key(key_id).await?; + + let mut key_param = key.state.as_option_mut().unwrap(); + + if let Some(Deletable::Present(existing_alias)) = key_param.local_aliases.get(alias_name) { + if *existing_alias != bucket_id { + return Err(Error::BadRequest(format!("Alias {} already exists in namespace of key {} and points to different bucket: {:?}", alias_name, key.key_id, existing_alias))); + } + } + + // Checks ok, add alias + let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); + + // Calculate the timestamp to assign to this aliasing in the two local_aliases maps + // (the one from key to bucket, and the reverse one stored in the bucket iself) + // so that merges on both maps in case of a concurrent operation resolve + // to the same alias being set + let alias_ts = increment_logical_clock_2( + key_param.local_aliases.get_timestamp(alias_name), + bucket_p + .local_aliases + .get_timestamp(&bucket_p_local_alias_key), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + key_param.local_aliases = + LwwMap::raw_item(alias_name.clone(), alias_ts, Deletable::present(bucket_id)); + self.0.key_table.insert(&key).await?; + + bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Unsets an alias for a bucket in the local namespace of a key. + /// This function fails if: + /// - bucket does not exist or is deleted + /// - key does not exist or is deleted + /// - alias does not exist or maps to another bucket (-> internal error) + /// - bucket has no other aliases (global or local) + pub async fn unset_local_bucket_alias( + &self, + bucket_id: Uuid, + key_id: &String, + alias_name: &String, + ) -> Result<(), Error> { + let mut bucket = self.get_existing_bucket(bucket_id).await?; + let mut key = self.get_existing_key(key_id).await?; + + let mut bucket_p = bucket.state.as_option_mut().unwrap(); + + if key + .state + .as_option() + .unwrap() + .local_aliases + .get(alias_name) + .map(|x| x.as_option()) + .flatten() != Some(&bucket_id) + { + return Err(GarageError::Message(format!( + "Bucket {:?} does not have alias {} in namespace of key {}", + bucket_id, alias_name, key_id + )) + .into()); + } + + let has_other_global_aliases = bucket_p + .aliases + .items() + .iter() + .any(|(_, _, active)| *active); + let has_other_local_aliases = bucket_p + .local_aliases + .items() + .iter() + .any(|((k, n), _, active)| *k == key.key_id && n == alias_name && *active); + if !has_other_global_aliases && !has_other_local_aliases { + return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name))); + } + + // Checks ok, remove alias + let mut key_param = key.state.as_option_mut().unwrap(); + let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); + + let alias_ts = increment_logical_clock_2( + key_param.local_aliases.get_timestamp(alias_name), + bucket_p + .local_aliases + .get_timestamp(&bucket_p_local_alias_key), + ); + + // ---- timestamp-ensured causality barrier ---- + // writes are now done and all writes use timestamp alias_ts + + key_param.local_aliases = + LwwMap::raw_item(alias_name.clone(), alias_ts, Deletable::delete()); + self.0.key_table.insert(&key).await?; + + bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); + self.0.bucket_table.insert(&bucket).await?; + + Ok(()) + } + + /// Sets permissions for a key on a bucket. + /// This function fails if: + /// - bucket or key cannot be found at all (its ok if they are in deleted state) + /// - bucket or key is in deleted state and we are trying to set permissions other than "deny + /// all" + pub async fn set_bucket_key_permissions( + &self, + bucket_id: Uuid, + key_id: &String, + mut perm: BucketKeyPerm, + ) -> Result<(), Error> { + let mut bucket = self.get_internal_bucket(bucket_id).await?; + let mut key = self.get_internal_key(key_id).await?; + + let allow_any = perm.allow_read || perm.allow_write || perm.allow_owner; + + if let Some(bstate) = bucket.state.as_option() { + if let Some(kp) = bstate.authorized_keys.get(key_id) { + perm.timestamp = increment_logical_clock_2(perm.timestamp, kp.timestamp); + } + } else if allow_any { + return Err(Error::BadRequest( + "Trying to give permissions on a deleted bucket".into(), + )); + } + + if let Some(kstate) = key.state.as_option() { + if let Some(bp) = kstate.authorized_buckets.get(&bucket_id) { + perm.timestamp = increment_logical_clock_2(perm.timestamp, bp.timestamp); + } + } else if allow_any { + return Err(Error::BadRequest( + "Trying to give permissions to a deleted key".into(), + )); + } + + // ---- timestamp-ensured causality barrier ---- + + if let Some(bstate) = bucket.state.as_option_mut() { + bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm); + self.0.bucket_table.insert(&bucket).await?; + } + + if let Some(kstate) = key.state.as_option_mut() { + kstate.authorized_buckets = Map::put_mutator(bucket_id, perm); + self.0.key_table.insert(&key).await?; + } + + Ok(()) } } |