diff options
Diffstat (limited to 'src/garage/admin.rs')
-rw-r--r-- | src/garage/admin.rs | 545 |
1 files changed, 337 insertions, 208 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c7472670..6db8bfbe 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -5,17 +5,21 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use garage_util::error::Error; +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::time::*; -use garage_table::crdt::Crdt; use garage_table::replication::*; use garage_table::*; use garage_rpc::*; +use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; +use garage_model::permission::*; use crate::cli::*; use crate::repair::Repair; @@ -31,7 +35,7 @@ pub enum AdminRpc { // Replies Ok(String), - BucketList(Vec<String>), + BucketList(Vec<BucketAlias>), BucketInfo(Bucket), KeyList(Vec<(String, String)>), KeyInfo(Key), @@ -56,203 +60,331 @@ impl AdminRpcHandler { async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> { match cmd { - BucketOperation::List => { - let bucket_names = self + BucketOperation::List => self.handle_list_buckets().await, + BucketOperation::Info(query) => { + let bucket_id = self .garage - .bucket_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .bucket_helper() + .resolve_global_bucket_name(&query.name) .await? - .iter() - .map(|b| b.name.to_string()) - .collect::<Vec<_>>(); - Ok(AdminRpc::BucketList(bucket_names)) - } - BucketOperation::Info(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - Ok(AdminRpc::BucketInfo(bucket)) - } - BucketOperation::Create(query) => { - let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { - Some(mut bucket) => { - if !bucket.is_deleted() { - return Err(Error::BadRpc(format!( - "Bucket {} already exists", - query.name - ))); - } - bucket - .state - .update(BucketState::Present(BucketParams::new())); - bucket - } - None => Bucket::new(query.name.clone()), - }; - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name))) - } - BucketOperation::Delete(query) => { - let mut bucket = self.get_existing_bucket(&query.name).await?; - let objects = self + .ok_or_message("Bucket not found")?; + let bucket = self .garage - .object_table - .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10) + .bucket_helper() + .get_existing_bucket(bucket_id) .await?; - if !objects.is_empty() { - return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); - } - if !query.yes { - return Err(Error::BadRpc( - "Add --yes flag to really perform this operation".to_string(), - )); - } - // --- done checking, now commit --- - for (key_id, _, _) in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { - if !key.deleted.get() { - self.update_key_bucket(&key, &bucket.name, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Key not found: {}", key_id))); - } - } - bucket.state.update(BucketState::Deleted); - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) - } - BucketOperation::Allow(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = query.read || key.allow_read(&query.bucket); - let allow_write = query.write || key.allow_write(&query.bucket); - self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) - .await?; - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &key.key_id, &query.bucket, allow_read, allow_write - ))) - } - BucketOperation::Deny(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = !query.read && key.allow_read(&query.bucket); - let allow_write = !query.write && key.allow_write(&query.bucket); - self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) - .await?; - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &key.key_id, &query.bucket, allow_read, allow_write - ))) + Ok(AdminRpc::BucketInfo(bucket)) } - BucketOperation::Website(query) => { - let mut bucket = self.get_existing_bucket(&query.bucket).await?; + BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await, + BucketOperation::Delete(query) => self.handle_delete_bucket(query).await, + BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, + BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, + BucketOperation::Website(query) => self.handle_bucket_website(query).await, + } + } + + async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> { + let bucket_aliases = self + .garage + .bucket_alias_table + .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .await?; + Ok(AdminRpc::BucketList(bucket_aliases)) + } - if !(query.allow ^ query.deny) { - return Err(Error::Message( - "You must specify exactly one flag, either --allow or --deny".to_string(), - )); + #[allow(clippy::ptr_arg)] + async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> { + let mut bucket = Bucket::new(); + let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? { + Some(mut alias) => { + if !alias.state.get().is_deleted() { + return Err(Error::BadRpc(format!("Bucket {} already exists", name))); } + alias.state.update(Deletable::Present(AliasParams { + bucket_id: bucket.id, + website_access: false, + })); + alias + } + None => BucketAlias::new(name.clone(), bucket.id, false), + }; + bucket + .state + .as_option_mut() + .unwrap() + .aliases + .update_in_place(name.clone(), true); + self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_alias_table.insert(&alias).await?; + Ok(AdminRpc::Ok(format!("Bucket {} was created.", name))) + } - if let BucketState::Present(state) = bucket.state.get_mut() { - state.website.update(query.allow); - self.garage.bucket_table.insert(&bucket).await?; - let msg = if query.allow { - format!("Website access allowed for {}", &query.bucket) - } else { - format!("Website access denied for {}", &query.bucket) - }; + async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> { + let mut bucket_alias = self + .garage + .bucket_alias_table + .get(&EmptyKey, &query.name) + .await? + .filter(|a| !a.is_deleted()) + .ok_or_message(format!("Bucket {} does not exist", query.name))?; - Ok(AdminRpc::Ok(msg)) - } else { - unreachable!(); + let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id; + + // Check bucket doesn't have other aliases + let mut bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option().unwrap(); + if bucket_state + .aliases + .items() + .iter() + .filter(|(_, _, active)| *active) + .any(|(name, _, _)| name != &query.name) + { + return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name))); + } + if bucket_state + .local_aliases + .items() + .iter() + .any(|(_, _, active)| *active) + { + return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name))); + } + + // Check bucket is empty + let objects = self + .garage + .object_table + .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10) + .await?; + if !objects.is_empty() { + return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); + } + + if !query.yes { + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); + } + + // --- done checking, now commit --- + // 1. delete authorization from keys that had access + for (key_id, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { + if !key.state.is_deleted() { + self.update_key_bucket(&key, bucket.id, false, false) + .await?; } + } else { + return Err(Error::Message(format!("Key not found: {}", key_id))); } } + // 2. delete bucket alias + bucket_alias.state.update(Deletable::Deleted); + self.garage.bucket_alias_table.insert(&bucket_alias).await?; + // 3. delete bucket alias + bucket.state = Deletable::delete(); + self.garage.bucket_table.insert(&bucket).await?; + + Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) + } + + async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_message("Bucket not found")?; + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let key = self.get_existing_key(&query.key_pattern).await?; + + let allow_read = query.read || key.allow_read(&bucket_id); + let allow_write = query.write || key.allow_write(&bucket_id); + + let new_perm = self + .update_key_bucket(&key, bucket_id, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &key.key_id, new_perm) + .await?; + + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &key.key_id, &query.bucket, allow_read, allow_write + ))) + } + + async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_message("Bucket not found")?; + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let key = self.get_existing_key(&query.key_pattern).await?; + + let allow_read = !query.read && key.allow_read(&bucket_id); + let allow_write = !query.write && key.allow_write(&bucket_id); + + let new_perm = self + .update_key_bucket(&key, bucket_id, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &key.key_id, new_perm) + .await?; + + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &key.key_id, &query.bucket, allow_read, allow_write + ))) + } + + async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> { + let mut bucket_alias = self + .garage + .bucket_alias_table + .get(&EmptyKey, &query.bucket) + .await? + .filter(|a| !a.is_deleted()) + .ok_or_message(format!("Bucket {} does not exist", query.bucket))?; + + let mut state = bucket_alias.state.get().as_option().unwrap().clone(); + + if !(query.allow ^ query.deny) { + return Err(Error::Message( + "You must specify exactly one flag, either --allow or --deny".to_string(), + )); + } + + state.website_access = query.allow; + bucket_alias.state.update(Deletable::present(state)); + self.garage.bucket_alias_table.insert(&bucket_alias).await?; + + let msg = if query.allow { + format!("Website access allowed for {}", &query.bucket) + } else { + format!("Website access denied for {}", &query.bucket) + }; + + Ok(AdminRpc::Ok(msg)) } async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { - KeyOperation::List => { - let key_ids = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), - 10000, - ) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.name.get().clone())) - .collect::<Vec<_>>(); - Ok(AdminRpc::KeyList(key_ids)) - } + KeyOperation::List => self.handle_list_keys().await, KeyOperation::Info(query) => { let key = self.get_existing_key(&query.key_pattern).await?; Ok(AdminRpc::KeyInfo(key)) } - KeyOperation::New(query) => { - let key = Key::new(query.name.clone()); - self.garage.key_table.insert(&key).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::Rename(query) => { - let mut key = self.get_existing_key(&query.key_pattern).await?; - key.name.update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::Delete(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - if !query.yes { - return Err(Error::BadRpc( - "Add --yes flag to really perform this operation".to_string(), - )); - } - // --- done checking, now commit --- - for (ab_name, _, _) in key.authorized_buckets.items().iter() { - if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { - if !bucket.is_deleted() { - self.update_bucket_key(bucket, &key.key_id, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Bucket not found: {}", ab_name))); + KeyOperation::New(query) => self.handle_create_key(query).await, + KeyOperation::Rename(query) => self.handle_rename_key(query).await, + KeyOperation::Delete(query) => self.handle_delete_key(query).await, + KeyOperation::Import(query) => self.handle_import_key(query).await, + } + } + + async fn handle_list_keys(&self) -> Result<AdminRpc, Error> { + let key_ids = self + .garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + ) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.name.get().clone())) + .collect::<Vec<_>>(); + Ok(AdminRpc::KeyList(key_ids)) + } + + async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> { + let key = Key::new(query.name.clone()); + self.garage.key_table.insert(&key).await?; + Ok(AdminRpc::KeyInfo(key)) + } + + async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> { + let mut key = self.get_existing_key(&query.key_pattern).await?; + key.name.update(query.new_name.clone()); + self.garage.key_table.insert(&key).await?; + Ok(AdminRpc::KeyInfo(key)) + } + + async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> { + let mut key = self.get_existing_key(&query.key_pattern).await?; + if !query.yes { + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); + } + let state = key.state.as_option_mut().unwrap(); + + // --- done checking, now commit --- + // 1. Delete local aliases + for (alias, _, to) in state.local_aliases.items().iter() { + if let Deletable::Present(bucket_id) = to { + if let Some(mut bucket) = self.garage.bucket_table.get(bucket_id, &EmptyKey).await? + { + if let Deletable::Present(bucket_state) = &mut bucket.state { + bucket_state.local_aliases = bucket_state + .local_aliases + .update_mutator((key.key_id.to_string(), alias.to_string()), false); + self.garage.bucket_table.insert(&bucket).await?; } + } else { + // ignore } - let del_key = Key::delete(key.key_id.to_string()); - self.garage.key_table.insert(&del_key).await?; - Ok(AdminRpc::Ok(format!( - "Key {} was deleted successfully.", - key.key_id - ))) } - KeyOperation::Import(query) => { - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; - if prev_key.is_some() { - return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + // 2. Delete authorized buckets + for (ab_id, auth) in state.authorized_buckets.items().iter() { + if let Some(bucket) = self.garage.bucket_table.get(ab_id, &EmptyKey).await? { + let new_perm = BucketKeyPerm { + timestamp: increment_logical_clock(auth.timestamp), + allow_read: false, + allow_write: false, + }; + if !bucket.is_deleted() { + self.update_bucket_key(bucket, &key.key_id, new_perm) + .await?; } - let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); - self.garage.key_table.insert(&imported_key).await?; - Ok(AdminRpc::KeyInfo(imported_key)) + } else { + // ignore } } + // 3. Actually delete key + key.state = Deletable::delete(); + self.garage.key_table.insert(&key).await?; + + Ok(AdminRpc::Ok(format!( + "Key {} was deleted successfully.", + key.key_id + ))) } - #[allow(clippy::ptr_arg)] - async fn get_existing_bucket(&self, bucket: &String) -> Result<Bucket, Error> { - self.garage - .bucket_table - .get(&EmptyKey, bucket) - .await? - .filter(|b| !b.is_deleted()) - .map(Ok) - .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket)))) + async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> { + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; + if prev_key.is_some() { + return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); + self.garage.key_table.insert(&imported_key).await?; + Ok(AdminRpc::KeyInfo(imported_key)) } async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> { @@ -267,7 +399,7 @@ impl AdminRpcHandler { ) .await? .into_iter() - .filter(|k| !k.deleted.get()) + .filter(|k| !k.state.is_deleted()) .collect::<Vec<_>>(); if candidates.len() != 1 { Err(Error::Message(format!( @@ -279,51 +411,48 @@ impl AdminRpcHandler { } } - /// Update **bucket table** to inform of the new linked key - async fn update_bucket_key( - &self, - mut bucket: Bucket, - key_id: &str, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - if let BucketState::Present(params) = bucket.state.get_mut() { - let ak = &mut params.authorized_keys; - let old_ak = ak.take_and_clear(); - ak.merge(&old_ak.update_mutator( - key_id.to_string(), - PermissionSet { - allow_read, - allow_write, - }, - )); - } else { - return Err(Error::Message( - "Bucket is deleted in update_bucket_key".to_string(), - )); - } - self.garage.bucket_table.insert(&bucket).await?; - Ok(()) - } - /// Update **key table** to inform of the new linked bucket async fn update_key_bucket( &self, key: &Key, - bucket: &str, + bucket_id: Uuid, allow_read: bool, allow_write: bool, - ) -> Result<(), Error> { + ) -> Result<BucketKeyPerm, Error> { let mut key = key.clone(); - let old_map = key.authorized_buckets.take_and_clear(); - key.authorized_buckets.merge(&old_map.update_mutator( - bucket.to_string(), - PermissionSet { + let mut key_state = key.state.as_option_mut().unwrap(); + + let perm = key_state + .authorized_buckets + .get(&bucket_id) + .cloned() + .map(|old_perm| BucketKeyPerm { + timestamp: increment_logical_clock(old_perm.timestamp), allow_read, allow_write, - }, - )); + }) + .unwrap_or(BucketKeyPerm { + timestamp: now_msec(), + allow_read, + allow_write, + }); + + key_state.authorized_buckets = Map::put_mutator(bucket_id, perm); + self.garage.key_table.insert(&key).await?; + Ok(perm) + } + + /// Update **bucket table** to inform of the new linked key + async fn update_bucket_key( + &self, + mut bucket: Bucket, + key_id: &str, + new_perm: BucketKeyPerm, + ) -> Result<(), Error> { + bucket.state.as_option_mut().unwrap().authorized_keys = + Map::put_mutator(key_id.to_string(), new_perm); + self.garage.bucket_table.insert(&bucket).await?; Ok(()) } |