aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-12-14 13:55:11 +0100
committerAlex Auvolat <alex@adnab.me>2022-01-04 12:45:46 +0100
commit5b1117e582db16cc5aa50840a685875cbd5501f4 (patch)
tree06fec47bf56cb08cb51334454dc15f98352c98f2 /src/garage/admin.rs
parent8f6026de5ecd44cbe0fc0bcd47638a1ece860439 (diff)
downloadgarage-5b1117e582db16cc5aa50840a685875cbd5501f4.tar.gz
garage-5b1117e582db16cc5aa50840a685875cbd5501f4.zip
New model for buckets
Diffstat (limited to 'src/garage/admin.rs')
-rw-r--r--src/garage/admin.rs545
1 files changed, 337 insertions, 208 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c7472670..6db8bfbe 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -5,17 +5,21 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use garage_util::error::Error;
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::time::*;
-use garage_table::crdt::Crdt;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::*;
+use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
+use garage_model::permission::*;
use crate::cli::*;
use crate::repair::Repair;
@@ -31,7 +35,7 @@ pub enum AdminRpc {
// Replies
Ok(String),
- BucketList(Vec<String>),
+ BucketList(Vec<BucketAlias>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
@@ -56,203 +60,331 @@ impl AdminRpcHandler {
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
- BucketOperation::List => {
- let bucket_names = self
+ BucketOperation::List => self.handle_list_buckets().await,
+ BucketOperation::Info(query) => {
+ let bucket_id = self
.garage
- .bucket_table
- .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.name)
.await?
- .iter()
- .map(|b| b.name.to_string())
- .collect::<Vec<_>>();
- Ok(AdminRpc::BucketList(bucket_names))
- }
- BucketOperation::Info(query) => {
- let bucket = self.get_existing_bucket(&query.name).await?;
- Ok(AdminRpc::BucketInfo(bucket))
- }
- BucketOperation::Create(query) => {
- let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
- Some(mut bucket) => {
- if !bucket.is_deleted() {
- return Err(Error::BadRpc(format!(
- "Bucket {} already exists",
- query.name
- )));
- }
- bucket
- .state
- .update(BucketState::Present(BucketParams::new()));
- bucket
- }
- None => Bucket::new(query.name.clone()),
- };
- self.garage.bucket_table.insert(&bucket).await?;
- Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name)))
- }
- BucketOperation::Delete(query) => {
- let mut bucket = self.get_existing_bucket(&query.name).await?;
- let objects = self
+ .ok_or_message("Bucket not found")?;
+ let bucket = self
.garage
- .object_table
- .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10)
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
.await?;
- if !objects.is_empty() {
- return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name)));
- }
- if !query.yes {
- return Err(Error::BadRpc(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
- // --- done checking, now commit ---
- for (key_id, _, _) in bucket.authorized_keys() {
- if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
- if !key.deleted.get() {
- self.update_key_bucket(&key, &bucket.name, false, false)
- .await?;
- }
- } else {
- return Err(Error::Message(format!("Key not found: {}", key_id)));
- }
- }
- bucket.state.update(BucketState::Deleted);
- self.garage.bucket_table.insert(&bucket).await?;
- Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
- }
- BucketOperation::Allow(query) => {
- let key = self.get_existing_key(&query.key_pattern).await?;
- let bucket = self.get_existing_bucket(&query.bucket).await?;
- let allow_read = query.read || key.allow_read(&query.bucket);
- let allow_write = query.write || key.allow_write(&query.bucket);
- self.update_key_bucket(&key, &query.bucket, allow_read, allow_write)
- .await?;
- self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}.",
- &key.key_id, &query.bucket, allow_read, allow_write
- )))
- }
- BucketOperation::Deny(query) => {
- let key = self.get_existing_key(&query.key_pattern).await?;
- let bucket = self.get_existing_bucket(&query.bucket).await?;
- let allow_read = !query.read && key.allow_read(&query.bucket);
- let allow_write = !query.write && key.allow_write(&query.bucket);
- self.update_key_bucket(&key, &query.bucket, allow_read, allow_write)
- .await?;
- self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}.",
- &key.key_id, &query.bucket, allow_read, allow_write
- )))
+ Ok(AdminRpc::BucketInfo(bucket))
}
- BucketOperation::Website(query) => {
- let mut bucket = self.get_existing_bucket(&query.bucket).await?;
+ BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
+ BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
+ BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
+ BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
+ BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ }
+ }
+
+ async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
+ let bucket_aliases = self
+ .garage
+ .bucket_alias_table
+ .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .await?;
+ Ok(AdminRpc::BucketList(bucket_aliases))
+ }
- if !(query.allow ^ query.deny) {
- return Err(Error::Message(
- "You must specify exactly one flag, either --allow or --deny".to_string(),
- ));
+ #[allow(clippy::ptr_arg)]
+ async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
+ let mut bucket = Bucket::new();
+ let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
+ Some(mut alias) => {
+ if !alias.state.get().is_deleted() {
+ return Err(Error::BadRpc(format!("Bucket {} already exists", name)));
}
+ alias.state.update(Deletable::Present(AliasParams {
+ bucket_id: bucket.id,
+ website_access: false,
+ }));
+ alias
+ }
+ None => BucketAlias::new(name.clone(), bucket.id, false),
+ };
+ bucket
+ .state
+ .as_option_mut()
+ .unwrap()
+ .aliases
+ .update_in_place(name.clone(), true);
+ self.garage.bucket_table.insert(&bucket).await?;
+ self.garage.bucket_alias_table.insert(&alias).await?;
+ Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
+ }
- if let BucketState::Present(state) = bucket.state.get_mut() {
- state.website.update(query.allow);
- self.garage.bucket_table.insert(&bucket).await?;
- let msg = if query.allow {
- format!("Website access allowed for {}", &query.bucket)
- } else {
- format!("Website access denied for {}", &query.bucket)
- };
+ async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
+ let mut bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.name)
+ .await?
+ .filter(|a| !a.is_deleted())
+ .ok_or_message(format!("Bucket {} does not exist", query.name))?;
- Ok(AdminRpc::Ok(msg))
- } else {
- unreachable!();
+ let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id;
+
+ // Check bucket doesn't have other aliases
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+ if bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(name, _, _)| name != &query.name)
+ {
+ return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+ if bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .any(|(_, _, active)| *active)
+ {
+ return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+
+ // Check bucket is empty
+ let objects = self
+ .garage
+ .object_table
+ .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
+ .await?;
+ if !objects.is_empty() {
+ return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name)));
+ }
+
+ if !query.yes {
+ return Err(Error::BadRpc(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
+ if !key.state.is_deleted() {
+ self.update_key_bucket(&key, bucket.id, false, false)
+ .await?;
}
+ } else {
+ return Err(Error::Message(format!("Key not found: {}", key_id)));
}
}
+ // 2. delete bucket alias
+ bucket_alias.state.update(Deletable::Deleted);
+ self.garage.bucket_alias_table.insert(&bucket_alias).await?;
+ // 3. delete bucket alias
+ bucket.state = Deletable::delete();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
+ }
+
+ async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_message("Bucket not found")?;
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
+
+ let allow_read = query.read || key.allow_read(&bucket_id);
+ let allow_write = query.write || key.allow_write(&bucket_id);
+
+ let new_perm = self
+ .update_key_bucket(&key, bucket_id, allow_read, allow_write)
+ .await?;
+ self.update_bucket_key(bucket, &key.key_id, new_perm)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write
+ )))
+ }
+
+ async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_message("Bucket not found")?;
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
+
+ let allow_read = !query.read && key.allow_read(&bucket_id);
+ let allow_write = !query.write && key.allow_write(&bucket_id);
+
+ let new_perm = self
+ .update_key_bucket(&key, bucket_id, allow_read, allow_write)
+ .await?;
+ self.update_bucket_key(bucket, &key.key_id, new_perm)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write
+ )))
+ }
+
+ async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
+ let mut bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.bucket)
+ .await?
+ .filter(|a| !a.is_deleted())
+ .ok_or_message(format!("Bucket {} does not exist", query.bucket))?;
+
+ let mut state = bucket_alias.state.get().as_option().unwrap().clone();
+
+ if !(query.allow ^ query.deny) {
+ return Err(Error::Message(
+ "You must specify exactly one flag, either --allow or --deny".to_string(),
+ ));
+ }
+
+ state.website_access = query.allow;
+ bucket_alias.state.update(Deletable::present(state));
+ self.garage.bucket_alias_table.insert(&bucket_alias).await?;
+
+ let msg = if query.allow {
+ format!("Website access allowed for {}", &query.bucket)
+ } else {
+ format!("Website access denied for {}", &query.bucket)
+ };
+
+ Ok(AdminRpc::Ok(msg))
}
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
- KeyOperation::List => {
- let key_ids = self
- .garage
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
- 10000,
- )
- .await?
- .iter()
- .map(|k| (k.key_id.to_string(), k.name.get().clone()))
- .collect::<Vec<_>>();
- Ok(AdminRpc::KeyList(key_ids))
- }
+ KeyOperation::List => self.handle_list_keys().await,
KeyOperation::Info(query) => {
let key = self.get_existing_key(&query.key_pattern).await?;
Ok(AdminRpc::KeyInfo(key))
}
- KeyOperation::New(query) => {
- let key = Key::new(query.name.clone());
- self.garage.key_table.insert(&key).await?;
- Ok(AdminRpc::KeyInfo(key))
- }
- KeyOperation::Rename(query) => {
- let mut key = self.get_existing_key(&query.key_pattern).await?;
- key.name.update(query.new_name.clone());
- self.garage.key_table.insert(&key).await?;
- Ok(AdminRpc::KeyInfo(key))
- }
- KeyOperation::Delete(query) => {
- let key = self.get_existing_key(&query.key_pattern).await?;
- if !query.yes {
- return Err(Error::BadRpc(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
- // --- done checking, now commit ---
- for (ab_name, _, _) in key.authorized_buckets.items().iter() {
- if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? {
- if !bucket.is_deleted() {
- self.update_bucket_key(bucket, &key.key_id, false, false)
- .await?;
- }
- } else {
- return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
+ KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Rename(query) => self.handle_rename_key(query).await,
+ KeyOperation::Delete(query) => self.handle_delete_key(query).await,
+ KeyOperation::Import(query) => self.handle_import_key(query).await,
+ }
+ }
+
+ async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
+ let key_ids = self
+ .garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ )
+ .await?
+ .iter()
+ .map(|k| (k.key_id.to_string(), k.name.get().clone()))
+ .collect::<Vec<_>>();
+ Ok(AdminRpc::KeyList(key_ids))
+ }
+
+ async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
+ let key = Key::new(query.name.clone());
+ self.garage.key_table.insert(&key).await?;
+ Ok(AdminRpc::KeyInfo(key))
+ }
+
+ async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
+ let mut key = self.get_existing_key(&query.key_pattern).await?;
+ key.name.update(query.new_name.clone());
+ self.garage.key_table.insert(&key).await?;
+ Ok(AdminRpc::KeyInfo(key))
+ }
+
+ async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
+ let mut key = self.get_existing_key(&query.key_pattern).await?;
+ if !query.yes {
+ return Err(Error::BadRpc(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+ let state = key.state.as_option_mut().unwrap();
+
+ // --- done checking, now commit ---
+ // 1. Delete local aliases
+ for (alias, _, to) in state.local_aliases.items().iter() {
+ if let Deletable::Present(bucket_id) = to {
+ if let Some(mut bucket) = self.garage.bucket_table.get(bucket_id, &EmptyKey).await?
+ {
+ if let Deletable::Present(bucket_state) = &mut bucket.state {
+ bucket_state.local_aliases = bucket_state
+ .local_aliases
+ .update_mutator((key.key_id.to_string(), alias.to_string()), false);
+ self.garage.bucket_table.insert(&bucket).await?;
}
+ } else {
+ // ignore
}
- let del_key = Key::delete(key.key_id.to_string());
- self.garage.key_table.insert(&del_key).await?;
- Ok(AdminRpc::Ok(format!(
- "Key {} was deleted successfully.",
- key.key_id
- )))
}
- KeyOperation::Import(query) => {
- let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
- if prev_key.is_some() {
- return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ // 2. Delete authorized buckets
+ for (ab_id, auth) in state.authorized_buckets.items().iter() {
+ if let Some(bucket) = self.garage.bucket_table.get(ab_id, &EmptyKey).await? {
+ let new_perm = BucketKeyPerm {
+ timestamp: increment_logical_clock(auth.timestamp),
+ allow_read: false,
+ allow_write: false,
+ };
+ if !bucket.is_deleted() {
+ self.update_bucket_key(bucket, &key.key_id, new_perm)
+ .await?;
}
- let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
- self.garage.key_table.insert(&imported_key).await?;
- Ok(AdminRpc::KeyInfo(imported_key))
+ } else {
+ // ignore
}
}
+ // 3. Actually delete key
+ key.state = Deletable::delete();
+ self.garage.key_table.insert(&key).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Key {} was deleted successfully.",
+ key.key_id
+ )))
}
- #[allow(clippy::ptr_arg)]
- async fn get_existing_bucket(&self, bucket: &String) -> Result<Bucket, Error> {
- self.garage
- .bucket_table
- .get(&EmptyKey, bucket)
- .await?
- .filter(|b| !b.is_deleted())
- .map(Ok)
- .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket))))
+ async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+ Ok(AdminRpc::KeyInfo(imported_key))
}
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
@@ -267,7 +399,7 @@ impl AdminRpcHandler {
)
.await?
.into_iter()
- .filter(|k| !k.deleted.get())
+ .filter(|k| !k.state.is_deleted())
.collect::<Vec<_>>();
if candidates.len() != 1 {
Err(Error::Message(format!(
@@ -279,51 +411,48 @@ impl AdminRpcHandler {
}
}
- /// Update **bucket table** to inform of the new linked key
- async fn update_bucket_key(
- &self,
- mut bucket: Bucket,
- key_id: &str,
- allow_read: bool,
- allow_write: bool,
- ) -> Result<(), Error> {
- if let BucketState::Present(params) = bucket.state.get_mut() {
- let ak = &mut params.authorized_keys;
- let old_ak = ak.take_and_clear();
- ak.merge(&old_ak.update_mutator(
- key_id.to_string(),
- PermissionSet {
- allow_read,
- allow_write,
- },
- ));
- } else {
- return Err(Error::Message(
- "Bucket is deleted in update_bucket_key".to_string(),
- ));
- }
- self.garage.bucket_table.insert(&bucket).await?;
- Ok(())
- }
-
/// Update **key table** to inform of the new linked bucket
async fn update_key_bucket(
&self,
key: &Key,
- bucket: &str,
+ bucket_id: Uuid,
allow_read: bool,
allow_write: bool,
- ) -> Result<(), Error> {
+ ) -> Result<BucketKeyPerm, Error> {
let mut key = key.clone();
- let old_map = key.authorized_buckets.take_and_clear();
- key.authorized_buckets.merge(&old_map.update_mutator(
- bucket.to_string(),
- PermissionSet {
+ let mut key_state = key.state.as_option_mut().unwrap();
+
+ let perm = key_state
+ .authorized_buckets
+ .get(&bucket_id)
+ .cloned()
+ .map(|old_perm| BucketKeyPerm {
+ timestamp: increment_logical_clock(old_perm.timestamp),
allow_read,
allow_write,
- },
- ));
+ })
+ .unwrap_or(BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ });
+
+ key_state.authorized_buckets = Map::put_mutator(bucket_id, perm);
+
self.garage.key_table.insert(&key).await?;
+ Ok(perm)
+ }
+
+ /// Update **bucket table** to inform of the new linked key
+ async fn update_bucket_key(
+ &self,
+ mut bucket: Bucket,
+ key_id: &str,
+ new_perm: BucketKeyPerm,
+ ) -> Result<(), Error> {
+ bucket.state.as_option_mut().unwrap().authorized_keys =
+ Map::put_mutator(key_id.to_string(), new_perm);
+ self.garage.bucket_table.insert(&bucket).await?;
Ok(())
}