diff options
author | Alex <lx@deuxfleurs.fr> | 2025-01-30 16:18:38 +0000 |
---|---|---|
committer | Alex <lx@deuxfleurs.fr> | 2025-01-30 16:18:38 +0000 |
commit | 3192088aac0e1795401304a8dec715cd343538cf (patch) | |
tree | bfea9c13130481a3463f8bba2c2fe85fe3be6cd4 /src/garage | |
parent | 6ed78abb5cd09f4e5ca5effe5d6137faf33133f8 (diff) | |
parent | 5a89350b382f9a24d4e81b056f88dc16a5daa080 (diff) | |
download | garage-3192088aac0e1795401304a8dec715cd343538cf.tar.gz garage-3192088aac0e1795401304a8dec715cd343538cf.zip |
Merge pull request 'admin refactoring: refactor CLI to use Admin API requests (step 2)' (#943) from refactor-admin into next-v2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/943
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin/bucket.rs | 449 | ||||
-rw-r--r-- | src/garage/admin/key.rs | 161 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 47 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 220 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 210 | ||||
-rw-r--r-- | src/garage/cli/mod.rs | 1 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 243 | ||||
-rw-r--r-- | src/garage/cli_v2/bucket.rs | 523 | ||||
-rw-r--r-- | src/garage/cli_v2/cluster.rs | 158 | ||||
-rw-r--r-- | src/garage/cli_v2/key.rs | 227 | ||||
-rw-r--r-- | src/garage/cli_v2/layout.rs | 284 | ||||
-rw-r--r-- | src/garage/cli_v2/mod.rs | 106 | ||||
-rw-r--r-- | src/garage/main.rs | 16 |
13 files changed, 1341 insertions, 1304 deletions
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 1bdc6086..26d54084 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -1,15 +1,6 @@ -use std::collections::HashMap; use std::fmt::Write; -use garage_util::crdt::*; -use garage_util::time::*; - -use garage_table::*; - -use garage_model::bucket_alias_table::*; -use garage_model::bucket_table::*; use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::permission::*; use crate::cli::*; @@ -18,451 +9,13 @@ use super::*; impl AdminRpcHandler { pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> { match cmd { - BucketOperation::List => self.handle_list_buckets().await, - BucketOperation::Info(query) => self.handle_bucket_info(query).await, - BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await, - BucketOperation::Delete(query) => self.handle_delete_bucket(query).await, - BucketOperation::Alias(query) => self.handle_alias_bucket(query).await, - BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await, - BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, - BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, - BucketOperation::Website(query) => self.handle_bucket_website(query).await, - BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, BucketOperation::CleanupIncompleteUploads(query) => { self.handle_bucket_cleanup_incomplete_uploads(query).await } + _ => unreachable!(), } } - async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> { - let buckets = self - .garage - .bucket_table - .get_range( - &EmptyKey, - None, - Some(DeletedFilter::NotDeleted), - 10000, - EnumerationOrder::Forward, - ) - .await?; - - Ok(AdminRpc::BucketList(buckets)) - } - - async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> { - let bucket_id = self - .garage - .bucket_helper() - .admin_get_existing_matching_bucket(&query.name) - .await?; - - let bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let counters = self - .garage - .object_counter_table - .table - .get(&bucket_id, &EmptyKey) - .await? - .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) - .unwrap_or_default(); - - let mpu_counters = self - .garage - .mpu_counter_table - .table - .get(&bucket_id, &EmptyKey) - .await? - .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) - .unwrap_or_default(); - - let mut relevant_keys = HashMap::new(); - for (k, _) in bucket - .state - .as_option() - .unwrap() - .authorized_keys - .items() - .iter() - { - if let Some(key) = self - .garage - .key_table - .get(&EmptyKey, k) - .await? - .filter(|k| !k.is_deleted()) - { - relevant_keys.insert(k.clone(), key); - } - } - for ((k, _), _, _) in bucket - .state - .as_option() - .unwrap() - .local_aliases - .items() - .iter() - { - if relevant_keys.contains_key(k) { - continue; - } - if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? { - relevant_keys.insert(k.clone(), key); - } - } - - Ok(AdminRpc::BucketInfo { - bucket, - relevant_keys, - counters, - mpu_counters, - }) - } - - #[allow(clippy::ptr_arg)] - async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> { - if !is_valid_bucket_name(name) { - return Err(Error::BadRequest(format!( - "{}: {}", - name, INVALID_BUCKET_NAME_MESSAGE - ))); - } - - let helper = self.garage.locked_helper().await; - - if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? { - if alias.state.get().is_some() { - return Err(Error::BadRequest(format!("Bucket {} already exists", name))); - } - } - - // ---- done checking, now commit ---- - - let bucket = Bucket::new(); - self.garage.bucket_table.insert(&bucket).await?; - - helper.set_global_bucket_alias(bucket.id, name).await?; - - Ok(AdminRpc::Ok(format!("Bucket {} was created.", name))) - } - - async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.locked_helper().await; - - let bucket_id = helper - .bucket() - .admin_get_existing_matching_bucket(&query.name) - .await?; - - // Get the alias, but keep in minde here the bucket name - // given in parameter can also be directly the bucket's ID. - // In that case bucket_alias will be None, and - // we can still delete the bucket if it has zero aliases - // (a condition which we try to prevent but that could still happen somehow). - // We just won't try to delete an alias entry because there isn't one. - let bucket_alias = self - .garage - .bucket_alias_table - .get(&EmptyKey, &query.name) - .await?; - - // Check bucket doesn't have other aliases - let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?; - let bucket_state = bucket.state.as_option().unwrap(); - if bucket_state - .aliases - .items() - .iter() - .filter(|(_, _, active)| *active) - .any(|(name, _, _)| name != &query.name) - { - return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name))); - } - if bucket_state - .local_aliases - .items() - .iter() - .any(|(_, _, active)| *active) - { - return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name))); - } - - // Check bucket is empty - if !helper.bucket().is_bucket_empty(bucket_id).await? { - return Err(Error::BadRequest(format!( - "Bucket {} is not empty", - query.name - ))); - } - - if !query.yes { - return Err(Error::BadRequest( - "Add --yes flag to really perform this operation".to_string(), - )); - } - - // --- done checking, now commit --- - // 1. delete authorization from keys that had access - for (key_id, _) in bucket.authorized_keys() { - helper - .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) - .await?; - } - - // 2. delete bucket alias - if bucket_alias.is_some() { - helper - .purge_global_bucket_alias(bucket_id, &query.name) - .await?; - } - - // 3. delete bucket - bucket.state = Deletable::delete(); - self.garage.bucket_table.insert(&bucket).await?; - - Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) - } - - async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.locked_helper().await; - - let bucket_id = helper - .bucket() - .admin_get_existing_matching_bucket(&query.existing_bucket) - .await?; - - if let Some(key_pattern) = &query.local { - let key = helper.key().get_existing_matching_key(key_pattern).await?; - - helper - .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name) - .await?; - Ok(AdminRpc::Ok(format!( - "Alias {} now points to bucket {:?} in namespace of key {}", - query.new_name, bucket_id, key.key_id - ))) - } else { - helper - .set_global_bucket_alias(bucket_id, &query.new_name) - .await?; - Ok(AdminRpc::Ok(format!( - "Alias {} now points to bucket {:?}", - query.new_name, bucket_id - ))) - } - } - - async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.locked_helper().await; - - if let Some(key_pattern) = &query.local { - let key = helper.key().get_existing_matching_key(key_pattern).await?; - - let bucket_id = key - .state - .as_option() - .unwrap() - .local_aliases - .get(&query.name) - .cloned() - .flatten() - .ok_or_bad_request("Bucket not found")?; - - helper - .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name) - .await?; - - Ok(AdminRpc::Ok(format!( - "Alias {} no longer points to bucket {:?} in namespace of key {}", - &query.name, bucket_id, key.key_id - ))) - } else { - let bucket_id = helper - .bucket() - .resolve_global_bucket_name(&query.name) - .await? - .ok_or_bad_request("Bucket not found")?; - - helper - .unset_global_bucket_alias(bucket_id, &query.name) - .await?; - - Ok(AdminRpc::Ok(format!( - "Alias {} no longer points to bucket {:?}", - &query.name, bucket_id - ))) - } - } - - async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.locked_helper().await; - - let bucket_id = helper - .bucket() - .admin_get_existing_matching_bucket(&query.bucket) - .await?; - let key = helper - .key() - .get_existing_matching_key(&query.key_pattern) - .await?; - - let allow_read = query.read || key.allow_read(&bucket_id); - let allow_write = query.write || key.allow_write(&bucket_id); - let allow_owner = query.owner || key.allow_owner(&bucket_id); - - helper - .set_bucket_key_permissions( - bucket_id, - &key.key_id, - BucketKeyPerm { - timestamp: now_msec(), - allow_read, - allow_write, - allow_owner, - }, - ) - .await?; - - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}, owner {}.", - &key.key_id, &query.bucket, allow_read, allow_write, allow_owner - ))) - } - - async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.locked_helper().await; - - let bucket_id = helper - .bucket() - .admin_get_existing_matching_bucket(&query.bucket) - .await?; - let key = helper - .key() - .get_existing_matching_key(&query.key_pattern) - .await?; - - let allow_read = !query.read && key.allow_read(&bucket_id); - let allow_write = !query.write && key.allow_write(&bucket_id); - let allow_owner = !query.owner && key.allow_owner(&bucket_id); - - helper - .set_bucket_key_permissions( - bucket_id, - &key.key_id, - BucketKeyPerm { - timestamp: now_msec(), - allow_read, - allow_write, - allow_owner, - }, - ) - .await?; - - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}, owner {}.", - &key.key_id, &query.bucket, allow_read, allow_write, allow_owner - ))) - } - - async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> { - let bucket_id = self - .garage - .bucket_helper() - .admin_get_existing_matching_bucket(&query.bucket) - .await?; - - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let bucket_state = bucket.state.as_option_mut().unwrap(); - - if !(query.allow ^ query.deny) { - return Err(Error::BadRequest( - "You must specify exactly one flag, either --allow or --deny".to_string(), - )); - } - - let website = if query.allow { - Some(WebsiteConfig { - index_document: query.index_document.clone(), - error_document: query.error_document.clone(), - }) - } else { - None - }; - - bucket_state.website_config.update(website); - self.garage.bucket_table.insert(&bucket).await?; - - let msg = if query.allow { - format!("Website access allowed for {}", &query.bucket) - } else { - format!("Website access denied for {}", &query.bucket) - }; - - Ok(AdminRpc::Ok(msg)) - } - - async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> { - let bucket_id = self - .garage - .bucket_helper() - .admin_get_existing_matching_bucket(&query.bucket) - .await?; - - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let bucket_state = bucket.state.as_option_mut().unwrap(); - - if query.max_size.is_none() && query.max_objects.is_none() { - return Err(Error::BadRequest( - "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(), - )); - } - - let mut quotas = bucket_state.quotas.get().clone(); - - match query.max_size.as_ref().map(String::as_ref) { - Some("none") => quotas.max_size = None, - Some(v) => { - let bs = v - .parse::<bytesize::ByteSize>() - .ok_or_bad_request(format!("Invalid size specified: {}", v))?; - quotas.max_size = Some(bs.as_u64()); - } - _ => (), - } - - match query.max_objects.as_ref().map(String::as_ref) { - Some("none") => quotas.max_objects = None, - Some(v) => { - let mo = v - .parse::<u64>() - .ok_or_bad_request(format!("Invalid number specified: {}", v))?; - quotas.max_objects = Some(mo); - } - _ => (), - } - - bucket_state.quotas.update(quotas); - self.garage.bucket_table.insert(&bucket).await?; - - Ok(AdminRpc::Ok(format!( - "Quotas updated for {}", - &query.bucket - ))) - } - async fn handle_bucket_cleanup_incomplete_uploads( &self, query: &CleanupIncompleteUploadsOpt, diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs deleted file mode 100644 index bd010d2c..00000000 --- a/src/garage/admin/key.rs +++ /dev/null @@ -1,161 +0,0 @@ -use std::collections::HashMap; - -use garage_table::*; - -use garage_model::helper::error::*; -use garage_model::key_table::*; - -use crate::cli::*; - -use super::*; - -impl AdminRpcHandler { - pub(super) async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { - match cmd { - KeyOperation::List => self.handle_list_keys().await, - KeyOperation::Info(query) => self.handle_key_info(query).await, - KeyOperation::Create(query) => self.handle_create_key(query).await, - KeyOperation::Rename(query) => self.handle_rename_key(query).await, - KeyOperation::Delete(query) => self.handle_delete_key(query).await, - KeyOperation::Allow(query) => self.handle_allow_key(query).await, - KeyOperation::Deny(query) => self.handle_deny_key(query).await, - KeyOperation::Import(query) => self.handle_import_key(query).await, - } - } - - async fn handle_list_keys(&self) -> Result<AdminRpc, Error> { - let key_ids = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), - 10000, - EnumerationOrder::Forward, - ) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone())) - .collect::<Vec<_>>(); - Ok(AdminRpc::KeyList(key_ids)) - } - - async fn handle_key_info(&self, query: &KeyInfoOpt) -> Result<AdminRpc, Error> { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - - if !query.show_secret { - key.state.as_option_mut().unwrap().secret_key = "(redacted)".into(); - } - - self.key_info_result(key).await - } - - async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> { - let key = Key::new(&query.name); - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - key.params_mut() - .unwrap() - .name - .update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.locked_helper().await; - - let mut key = helper - .key() - .get_existing_matching_key(&query.key_pattern) - .await?; - - if !query.yes { - return Err(Error::BadRequest( - "Add --yes flag to really perform this operation".to_string(), - )); - } - - helper.delete_key(&mut key).await?; - - Ok(AdminRpc::Ok(format!( - "Key {} was deleted successfully.", - key.key_id - ))) - } - - async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - if query.create_bucket { - key.params_mut().unwrap().allow_create_bucket.update(true); - } - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - if query.create_bucket { - key.params_mut().unwrap().allow_create_bucket.update(false); - } - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> { - if !query.yes { - return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string())); - } - - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; - if prev_key.is_some() { - return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); - } - - let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name) - .ok_or_bad_request("Invalid key format")?; - self.garage.key_table.insert(&imported_key).await?; - - self.key_info_result(imported_key).await - } - - async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> { - let mut relevant_buckets = HashMap::new(); - - for (id, _) in key - .state - .as_option() - .unwrap() - .authorized_buckets - .items() - .iter() - { - if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? { - relevant_buckets.insert(*id, b); - } - } - - Ok(AdminRpc::KeyInfo(key, relevant_buckets)) - } -} diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index e2468143..1888a208 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -1,6 +1,5 @@ mod block; mod bucket; -mod key; use std::collections::HashMap; use std::fmt::Write; @@ -23,13 +22,15 @@ use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::key_table::*; use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; +use garage_api::admin::api::{AdminApiRequest, TaggedAdminApiResponse}; +use garage_api::admin::EndpointHandler as AdminApiEndpoint; +use garage_api::generic_server::ApiError; + use crate::cli::*; use crate::repair::online::launch_online_repair; @@ -39,7 +40,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; #[allow(clippy::large_enum_variant)] pub enum AdminRpc { BucketOperation(BucketOperation), - KeyOperation(KeyOperation), LaunchRepair(RepairOpt), Stats(StatsOpt), Worker(WorkerOperation), @@ -48,15 +48,6 @@ pub enum AdminRpc { // Replies Ok(String), - BucketList(Vec<Bucket>), - BucketInfo { - bucket: Bucket, - relevant_keys: HashMap<String, Key>, - counters: HashMap<String, i64>, - mpu_counters: HashMap<String, i64>, - }, - KeyList(Vec<(String, String)>), - KeyInfo(Key, HashMap<Uuid, Bucket>), WorkerList( HashMap<usize, garage_util::background::WorkerInfo>, WorkerListOpt, @@ -70,6 +61,15 @@ pub enum AdminRpc { versions: Vec<Result<Version, Uuid>>, uploads: Vec<MultipartUpload>, }, + + // Proxying HTTP Admin API endpoints + ApiRequest(AdminApiRequest), + ApiOkResponse(TaggedAdminApiResponse), + ApiErrorResponse { + http_code: u16, + error_code: String, + message: String, + }, } impl Rpc for AdminRpc { @@ -503,6 +503,25 @@ impl AdminRpcHandler { } } } + + // ================== PROXYING ADMIN API REQUESTS =================== + + async fn handle_api_request( + self: &Arc<Self>, + req: &AdminApiRequest, + ) -> Result<AdminRpc, Error> { + let req = req.clone(); + info!("Proxied admin API request: {}", req.name()); + let res = req.handle(&self.garage).await; + match res { + Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())), + Err(e) => Ok(AdminRpc::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } } #[async_trait] @@ -514,12 +533,12 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { ) -> Result<AdminRpc, Error> { match message { AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, + AdminRpc::ApiRequest(r) => self.handle_api_request(r).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 44d3d96c..a6540c65 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,10 +1,5 @@ -use std::collections::{HashMap, HashSet}; -use std::time::Duration; - -use format_table::format_table; use garage_util::error::*; -use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; @@ -13,204 +8,6 @@ use garage_model::helper::error::Error as HelperError; use crate::admin::*; use crate::cli::*; -pub async fn cli_command_dispatch( - cmd: Command, - system_rpc_endpoint: &Endpoint<SystemRpc, ()>, - admin_rpc_endpoint: &Endpoint<AdminRpc, ()>, - rpc_host: NodeID, -) -> Result<(), HelperError> { - match cmd { - Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?), - Command::Node(NodeOperation::Connect(connect_opt)) => { - Ok(cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await?) - } - Command::Layout(layout_opt) => { - Ok(cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await?) - } - Command::Bucket(bo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await - } - Command::Key(ko) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await - } - Command::Repair(ro) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await - } - Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, - Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, - Command::Block(bo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await - } - Command::Meta(mo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await - } - _ => unreachable!(), - } -} - -pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { - let status = fetch_status(rpc_cli, rpc_host).await?; - let layout = fetch_layout(rpc_cli, rpc_host).await?; - - println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = - vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; - for adv in status.iter().filter(|adv| adv.is_up) { - let host = adv.status.hostname.as_deref().unwrap_or("?"); - let addr = match adv.addr { - Some(addr) => addr.to_string(), - None => "N/A".to_string(), - }; - if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) { - let data_avail = match &adv.status.data_disk_avail { - _ if cfg.capacity.is_none() => "N/A".into(), - Some((avail, total)) => { - let pct = (*avail as f64) / (*total as f64) * 100.; - let avail = bytesize::ByteSize::b(*avail); - format!("{} ({:.1}%)", avail, pct) - } - None => "?".into(), - }; - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", - id = adv.id, - host = host, - addr = addr, - tags = cfg.tags.join(","), - zone = cfg.zone, - capacity = cfg.capacity_string(), - data_avail = data_avail, - )); - } else { - let prev_role = layout - .versions - .iter() - .rev() - .find_map(|x| match x.roles.get(&adv.id) { - Some(NodeRoleV(Some(cfg))) => Some(cfg), - _ => None, - }); - if let Some(cfg) = prev_role { - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...", - id = adv.id, - host = host, - addr = addr, - tags = cfg.tags.join(","), - zone = cfg.zone, - )); - } else { - let new_role = match layout.staging.get().roles.get(&adv.id) { - Some(NodeRoleV(Some(_))) => "pending...", - _ => "NO ROLE ASSIGNED", - }; - healthy_nodes.push(format!( - "{id:?}\t{h}\t{addr}\t\t\t{new_role}", - id = adv.id, - h = host, - addr = addr, - new_role = new_role, - )); - } - } - } - format_table(healthy_nodes); - - // Determine which nodes are unhealthy and print that to stdout - let status_map = status - .iter() - .map(|adv| (adv.id, adv)) - .collect::<HashMap<_, _>>(); - - let tf = timeago::Formatter::new(); - let mut drain_msg = false; - let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()]; - let mut listed = HashSet::new(); - for ver in layout.versions.iter().rev() { - for (node, _, role) in ver.roles.items().iter() { - let cfg = match role { - NodeRoleV(Some(role)) if role.capacity.is_some() => role, - _ => continue, - }; - - if listed.contains(node) { - continue; - } - listed.insert(*node); - - let adv = status_map.get(node); - if adv.map(|x| x.is_up).unwrap_or(false) { - continue; - } - - // Node is in a layout version, is not a gateway node, and is not up: - // it is in a failed state, add proper line to the output - let (host, last_seen) = match adv { - Some(adv) => ( - adv.status.hostname.as_deref().unwrap_or("?"), - adv.last_seen_secs_ago - .map(|s| tf.convert(Duration::from_secs(s))) - .unwrap_or_else(|| "never seen".into()), - ), - None => ("??", "never seen".into()), - }; - let capacity = if ver.version == layout.current().version { - cfg.capacity_string() - } else { - drain_msg = true; - "draining metadata...".to_string() - }; - failed_nodes.push(format!( - "{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", - id = node, - host = host, - tags = cfg.tags.join(","), - zone = cfg.zone, - capacity = capacity, - last_seen = last_seen, - )); - } - } - - if failed_nodes.len() > 1 { - println!("\n==== FAILED NODES ===="); - format_table(failed_nodes); - if drain_msg { - println!(); - println!("Your cluster is expecting to drain data from nodes that are currently unavailable."); - println!("If these nodes are definitely dead, please review the layout history with"); - println!( - "`garage layout history` and use `garage layout skip-dead-nodes` to force progress." - ); - } - } - - if print_staging_role_changes(&layout) { - println!(); - println!("Please use `garage layout show` to check the proposed new layout and apply it."); - println!(); - } - - Ok(()) -} - -pub async fn cmd_connect( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - args: ConnectNodeOpt, -) -> Result<(), Error> { - match rpc_cli - .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL) - .await?? - { - SystemRpc::Ok => { - println!("Success."); - Ok(()) - } - m => Err(Error::unexpected_rpc_message(m)), - } -} - pub async fn cmd_admin( rpc_cli: &Endpoint<AdminRpc, ()>, rpc_host: NodeID, @@ -220,23 +17,6 @@ pub async fn cmd_admin( AdminRpc::Ok(msg) => { println!("{}", msg); } - AdminRpc::BucketList(bl) => { - print_bucket_list(bl); - } - AdminRpc::BucketInfo { - bucket, - relevant_keys, - counters, - mpu_counters, - } => { - print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters); - } - AdminRpc::KeyList(kl) => { - print_key_list(kl); - } - AdminRpc::KeyInfo(key, rb) => { - print_key_info(&key, &rb); - } AdminRpc::WorkerList(wi, wlo) => { print_worker_list(wi, wlo); } diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index f053eef4..bb81d144 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,7 +1,6 @@ use bytesize::ByteSize; use format_table::format_table; -use garage_util::crdt::Crdt; use garage_util::error::*; use garage_rpc::layout::*; @@ -10,174 +9,6 @@ use garage_rpc::*; use crate::cli::*; -pub async fn cli_layout_command_dispatch( - cmd: LayoutOperation, - system_rpc_endpoint: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, -) -> Result<(), Error> { - match cmd { - LayoutOperation::Assign(assign_opt) => { - cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await - } - LayoutOperation::Remove(remove_opt) => { - cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await - } - LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await, - LayoutOperation::Apply(apply_opt) => { - cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await - } - LayoutOperation::Revert(revert_opt) => { - cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await - } - LayoutOperation::Config(config_opt) => { - cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await - } - LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await, - LayoutOperation::SkipDeadNodes(assume_sync_opt) => { - cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await - } - } -} - -pub async fn cmd_assign_role( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - args: AssignRoleOpt, -) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let all_nodes = layout.get_all_nodes(); - - let added_nodes = args - .node_ids - .iter() - .map(|node_id| { - find_matching_node( - status - .iter() - .map(|adv| adv.id) - .chain(all_nodes.iter().cloned()), - node_id, - ) - }) - .collect::<Result<Vec<_>, _>>()?; - - let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging.get().roles); - - for replaced in args.replace.iter() { - let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?; - match roles.get(&replaced_node) { - Some(NodeRoleV(Some(_))) => { - layout - .staging - .get_mut() - .roles - .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); - } - _ => { - return Err(Error::Message(format!( - "Cannot replace node {:?} as it is not currently in planned layout", - replaced_node - ))); - } - } - } - - if args.capacity.is_some() && args.gateway { - return Err(Error::Message( - "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); - } - if args.capacity == Some(ByteSize::b(0)) { - return Err(Error::Message("Invalid capacity value: 0".into())); - } - - for added_node in added_nodes { - let new_entry = match roles.get(&added_node) { - Some(NodeRoleV(Some(old))) => { - let capacity = match args.capacity { - Some(c) => Some(c.as_u64()), - None if args.gateway => None, - None => old.capacity, - }; - let tags = if args.tags.is_empty() { - old.tags.clone() - } else { - args.tags.clone() - }; - NodeRole { - zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()), - capacity, - tags, - } - } - _ => { - let capacity = match args.capacity { - Some(c) => Some(c.as_u64()), - None if args.gateway => None, - None => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NodeRole { - zone: args - .zone - .clone() - .ok_or("Please specify a zone with the -z flag")?, - capacity, - tags: args.tags.clone(), - } - } - }; - - layout - .staging - .get_mut() - .roles - .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); - } - - send_layout(rpc_cli, rpc_host, layout).await?; - - println!("Role changes are staged but not yet committed."); - println!("Use `garage layout show` to view staged role changes,"); - println!("and `garage layout apply` to enact staged changes."); - Ok(()) -} - -pub async fn cmd_remove_role( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - args: RemoveRoleOpt, -) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging.get().roles); - - let deleted_node = - find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; - - layout - .staging - .get_mut() - .roles - .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); - - send_layout(rpc_cli, rpc_host, layout).await?; - - println!("Role removal is staged but not yet committed."); - println!("Use `garage layout show` to view staged role changes,"); - println!("and `garage layout apply` to enact staged changes."); - Ok(()) -} - pub async fn cmd_show_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, @@ -226,47 +57,6 @@ pub async fn cmd_show_layout( Ok(()) } -pub async fn cmd_apply_layout( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - apply_opt: ApplyLayoutOpt, -) -> Result<(), Error> { - let layout = fetch_layout(rpc_cli, rpc_host).await?; - - let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?; - for line in msg.iter() { - println!("{}", line); - } - - send_layout(rpc_cli, rpc_host, layout).await?; - - println!("New cluster layout with updated role assignment has been applied in cluster."); - println!("Data will now be moved around between nodes accordingly."); - - Ok(()) -} - -pub async fn cmd_revert_layout( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - revert_opt: RevertLayoutOpt, -) -> Result<(), Error> { - if !revert_opt.yes { - return Err(Error::Message( - "Please add the --yes flag to run the layout revert operation".into(), - )); - } - - let layout = fetch_layout(rpc_cli, rpc_host).await?; - - let layout = layout.revert_staged_changes()?; - - send_layout(rpc_cli, rpc_host, layout).await?; - - println!("All proposed role changes in cluster layout have been canceled."); - Ok(()) -} - pub async fn cmd_config_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index e131f62c..30f566e2 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -8,6 +8,5 @@ pub(crate) mod convert_db; pub(crate) use cmd::*; pub(crate) use init::*; -pub(crate) use layout::*; pub(crate) use structs::*; pub(crate) use util::*; diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 21c14f42..a3a1480e 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -3,257 +3,16 @@ use std::time::Duration; use format_table::format_table; use garage_util::background::*; -use garage_util::crdt::*; use garage_util::data::*; -use garage_util::error::*; use garage_util::time::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_model::bucket_table::*; -use garage_model::key_table::*; -use garage_model::s3::mpu_table::{self, MultipartUpload}; -use garage_model::s3::object_table; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; -pub fn print_bucket_list(bl: Vec<Bucket>) { - println!("List of buckets:"); - - let mut table = vec![]; - for bucket in bl { - let aliases = bucket - .aliases() - .iter() - .filter(|(_, _, active)| *active) - .map(|(name, _, _)| name.to_string()) - .collect::<Vec<_>>(); - let local_aliases_n = match &bucket - .local_aliases() - .iter() - .filter(|(_, _, active)| *active) - .collect::<Vec<_>>()[..] - { - [] => "".into(), - [((k, n), _, _)] => format!("{}:{}", k, n), - s => format!("[{} local aliases]", s.len()), - }; - - table.push(format!( - "\t{}\t{}\t{}", - aliases.join(","), - local_aliases_n, - hex::encode(bucket.id), - )); - } - format_table(table); -} - -pub fn print_key_list(kl: Vec<(String, String)>) { - println!("List of keys:"); - let mut table = vec![]; - for key in kl { - table.push(format!("\t{}\t{}", key.0, key.1)); - } - format_table(table); -} - -pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) { - let bucket_global_aliases = |b: &Uuid| { - if let Some(bucket) = relevant_buckets.get(b) { - if let Some(p) = bucket.state.as_option() { - return p - .aliases - .items() - .iter() - .filter(|(_, _, active)| *active) - .map(|(a, _, _)| a.clone()) - .collect::<Vec<_>>() - .join(", "); - } - } - - "".to_string() - }; - - match &key.state { - Deletable::Present(p) => { - println!("Key name: {}", p.name.get()); - println!("Key ID: {}", key.key_id); - println!("Secret key: {}", p.secret_key); - println!("Can create buckets: {}", p.allow_create_bucket.get()); - println!("\nKey-specific bucket aliases:"); - let mut table = vec![]; - for (alias_name, _, alias) in p.local_aliases.items().iter() { - if let Some(bucket_id) = alias { - table.push(format!( - "\t{}\t{}\t{}", - alias_name, - bucket_global_aliases(bucket_id), - hex::encode(bucket_id) - )); - } - } - format_table(table); - - println!("\nAuthorized buckets:"); - let mut table = vec![]; - for (bucket_id, perm) in p.authorized_buckets.items().iter() { - if !perm.is_any() { - continue; - } - let rflag = if perm.allow_read { "R" } else { " " }; - let wflag = if perm.allow_write { "W" } else { " " }; - let oflag = if perm.allow_owner { "O" } else { " " }; - let local_aliases = p - .local_aliases - .items() - .iter() - .filter(|(_, _, a)| *a == Some(*bucket_id)) - .map(|(a, _, _)| a.clone()) - .collect::<Vec<_>>() - .join(", "); - table.push(format!( - "\t{}{}{}\t{}\t{}\t{:?}", - rflag, - wflag, - oflag, - bucket_global_aliases(bucket_id), - local_aliases, - bucket_id - )); - } - format_table(table); - } - Deletable::Deleted => { - println!("Key {} is deleted.", key.key_id); - } - } -} - -pub fn print_bucket_info( - bucket: &Bucket, - relevant_keys: &HashMap<String, Key>, - counters: &HashMap<String, i64>, - mpu_counters: &HashMap<String, i64>, -) { - let key_name = |k| { - relevant_keys - .get(k) - .map(|k| k.params().unwrap().name.get().as_str()) - .unwrap_or("<deleted>") - }; - - println!("Bucket: {}", hex::encode(bucket.id)); - match &bucket.state { - Deletable::Deleted => println!("Bucket is deleted."), - Deletable::Present(p) => { - let size = - bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64); - println!( - "\nSize: {} ({})", - size.to_string_as(true), - size.to_string_as(false) - ); - println!( - "Objects: {}", - *counters.get(object_table::OBJECTS).unwrap_or(&0) - ); - println!( - "Unfinished uploads (multipart and non-multipart): {}", - *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0) - ); - println!( - "Unfinished multipart uploads: {}", - *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0) - ); - let mpu_size = - bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64); - println!( - "Size of unfinished multipart uploads: {} ({})", - mpu_size.to_string_as(true), - mpu_size.to_string_as(false), - ); - - println!("\nWebsite access: {}", p.website_config.get().is_some()); - - let quotas = p.quotas.get(); - if quotas.max_size.is_some() || quotas.max_objects.is_some() { - println!("\nQuotas:"); - if let Some(ms) = quotas.max_size { - let ms = bytesize::ByteSize::b(ms); - println!( - " maximum size: {} ({})", - ms.to_string_as(true), - ms.to_string_as(false) - ); - } - if let Some(mo) = quotas.max_objects { - println!(" maximum number of objects: {}", mo); - } - } - - println!("\nGlobal aliases:"); - for (alias, _, active) in p.aliases.items().iter() { - if *active { - println!(" {}", alias); - } - } - - println!("\nKey-specific aliases:"); - let mut table = vec![]; - for ((key_id, alias), _, active) in p.local_aliases.items().iter() { - if *active { - table.push(format!("\t{} ({})\t{}", key_id, key_name(key_id), alias)); - } - } - format_table(table); - - println!("\nAuthorized keys:"); - let mut table = vec![]; - for (k, perm) in p.authorized_keys.items().iter() { - if !perm.is_any() { - continue; - } - let rflag = if perm.allow_read { "R" } else { " " }; - let wflag = if perm.allow_write { "W" } else { " " }; - let oflag = if perm.allow_owner { "O" } else { " " }; - table.push(format!( - "\t{}{}{}\t{}\t{}", - rflag, - wflag, - oflag, - k, - key_name(k) - )); - } - format_table(table); - } - }; -} - -pub fn find_matching_node( - cand: impl std::iter::Iterator<Item = Uuid>, - pattern: &str, -) -> Result<Uuid, Error> { - let mut candidates = vec![]; - for c in cand { - if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) { - candidates.push(c); - } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0]) - } -} - pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { let mut wi = wi.into_iter().collect::<Vec<_>>(); wi.sort_by_key(|(tid, info)| { diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs new file mode 100644 index 00000000..837ce783 --- /dev/null +++ b/src/garage/cli_v2/bucket.rs @@ -0,0 +1,523 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli as cli_v1; +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_bucket(&self, cmd: BucketOperation) -> Result<(), Error> { + match cmd { + BucketOperation::List => self.cmd_list_buckets().await, + BucketOperation::Info(query) => self.cmd_bucket_info(query).await, + BucketOperation::Create(query) => self.cmd_create_bucket(query).await, + BucketOperation::Delete(query) => self.cmd_delete_bucket(query).await, + BucketOperation::Alias(query) => self.cmd_alias_bucket(query).await, + BucketOperation::Unalias(query) => self.cmd_unalias_bucket(query).await, + BucketOperation::Allow(query) => self.cmd_bucket_allow(query).await, + BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await, + BucketOperation::Website(query) => self.cmd_bucket_website(query).await, + BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await, + + // TODO + x => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::BucketOperation(x), + ) + .await + .ok_or_message("old error"), + } + } + + pub async fn cmd_list_buckets(&self) -> Result<(), Error> { + let buckets = self.api_request(ListBucketsRequest).await?; + + println!("List of buckets:"); + + let mut table = vec![]; + for bucket in buckets.0.iter() { + let local_aliases_n = match &bucket.local_aliases[..] { + [] => "".into(), + [alias] => format!("{}:{}", alias.access_key_id, alias.alias), + s => format!("[{} local aliases]", s.len()), + }; + + table.push(format!( + "\t{}\t{}\t{}", + bucket.global_aliases.join(","), + local_aliases_n, + bucket.id, + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_bucket_info(&self, opt: BucketOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.name), + }) + .await?; + + println!("Bucket: {}", bucket.id); + + let size = bytesize::ByteSize::b(bucket.bytes as u64); + println!( + "\nSize: {} ({})", + size.to_string_as(true), + size.to_string_as(false) + ); + println!("Objects: {}", bucket.objects); + println!( + "Unfinished uploads (multipart and non-multipart): {}", + bucket.unfinished_uploads, + ); + println!( + "Unfinished multipart uploads: {}", + bucket.unfinished_multipart_uploads + ); + let mpu_size = bytesize::ByteSize::b(bucket.unfinished_multipart_uploads as u64); + println!( + "Size of unfinished multipart uploads: {} ({})", + mpu_size.to_string_as(true), + mpu_size.to_string_as(false), + ); + + println!("\nWebsite access: {}", bucket.website_access); + + if bucket.quotas.max_size.is_some() || bucket.quotas.max_objects.is_some() { + println!("\nQuotas:"); + if let Some(ms) = bucket.quotas.max_size { + let ms = bytesize::ByteSize::b(ms); + println!( + " maximum size: {} ({})", + ms.to_string_as(true), + ms.to_string_as(false) + ); + } + if let Some(mo) = bucket.quotas.max_objects { + println!(" maximum number of objects: {}", mo); + } + } + + println!("\nGlobal aliases:"); + for alias in bucket.global_aliases { + println!(" {}", alias); + } + + println!("\nKey-specific aliases:"); + let mut table = vec![]; + for key in bucket.keys.iter() { + for alias in key.bucket_local_aliases.iter() { + table.push(format!("\t{} ({})\t{}", key.access_key_id, key.name, alias)); + } + } + format_table(table); + + println!("\nAuthorized keys:"); + let mut table = vec![]; + for key in bucket.keys.iter() { + if !(key.permissions.read || key.permissions.write || key.permissions.owner) { + continue; + } + let rflag = if key.permissions.read { "R" } else { " " }; + let wflag = if key.permissions.write { "W" } else { " " }; + let oflag = if key.permissions.owner { "O" } else { " " }; + table.push(format!( + "\t{}{}{}\t{}\t{}", + rflag, wflag, oflag, key.access_key_id, key.name + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_create_bucket(&self, opt: BucketOpt) -> Result<(), Error> { + self.api_request(CreateBucketRequest { + global_alias: Some(opt.name.clone()), + local_alias: None, + }) + .await?; + + println!("Bucket {} was created.", opt.name); + + Ok(()) + } + + pub async fn cmd_delete_bucket(&self, opt: DeleteBucketOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.name.clone()), + }) + .await?; + + // CLI-only checks: the bucket must not have other aliases + if bucket + .global_aliases + .iter() + .find(|a| **a != opt.name) + .is_some() + { + return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", opt.name))); + } + + if bucket + .keys + .iter() + .any(|k| !k.bucket_local_aliases.is_empty()) + { + return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", opt.name))); + } + + if !opt.yes { + println!("About to delete bucket {}.", bucket.id); + return Err(Error::Message( + "Add --yes flag to really perform this operation".to_string(), + )); + } + + self.api_request(DeleteBucketRequest { + id: bucket.id.clone(), + }) + .await?; + + println!("Bucket {} has been deleted.", bucket.id); + + Ok(()) + } + + pub async fn cmd_alias_bucket(&self, opt: AliasBucketOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.existing_bucket.clone()), + }) + .await?; + + if let Some(key_pat) = &opt.local { + let key = self + .api_request(GetKeyInfoRequest { + search: Some(key_pat.clone()), + id: None, + show_secret_key: false, + }) + .await?; + + self.api_request(AddBucketAliasRequest { + bucket_id: bucket.id.clone(), + alias: BucketAliasEnum::Local { + local_alias: opt.new_name.clone(), + access_key_id: key.access_key_id.clone(), + }, + }) + .await?; + + println!( + "Alias {} now points to bucket {:.16} in namespace of key {}", + opt.new_name, bucket.id, key.access_key_id + ) + } else { + self.api_request(AddBucketAliasRequest { + bucket_id: bucket.id.clone(), + alias: BucketAliasEnum::Global { + global_alias: opt.new_name.clone(), + }, + }) + .await?; + + println!( + "Alias {} now points to bucket {:.16}", + opt.new_name, bucket.id + ) + } + + Ok(()) + } + + pub async fn cmd_unalias_bucket(&self, opt: UnaliasBucketOpt) -> Result<(), Error> { + if let Some(key_pat) = &opt.local { + let key = self + .api_request(GetKeyInfoRequest { + search: Some(key_pat.clone()), + id: None, + show_secret_key: false, + }) + .await?; + + let bucket = key + .buckets + .iter() + .find(|x| x.local_aliases.contains(&opt.name)) + .ok_or_message(format!( + "No bucket called {} in namespace of key {}", + opt.name, key.access_key_id + ))?; + + self.api_request(RemoveBucketAliasRequest { + bucket_id: bucket.id.clone(), + alias: BucketAliasEnum::Local { + access_key_id: key.access_key_id.clone(), + local_alias: opt.name.clone(), + }, + }) + .await?; + + println!( + "Alias {} no longer points to bucket {:.16} in namespace of key {}", + &opt.name, bucket.id, key.access_key_id + ) + } else { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: Some(opt.name.clone()), + search: None, + }) + .await?; + + self.api_request(RemoveBucketAliasRequest { + bucket_id: bucket.id.clone(), + alias: BucketAliasEnum::Global { + global_alias: opt.name.clone(), + }, + }) + .await?; + + println!( + "Alias {} no longer points to bucket {:.16}", + opt.name, bucket.id + ) + } + + Ok(()) + } + + pub async fn cmd_bucket_allow(&self, opt: PermBucketOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.bucket.clone()), + }) + .await?; + + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern.clone()), + show_secret_key: false, + }) + .await?; + + self.api_request(AllowBucketKeyRequest(BucketKeyPermChangeRequest { + bucket_id: bucket.id.clone(), + access_key_id: key.access_key_id.clone(), + permissions: ApiBucketKeyPerm { + read: opt.read, + write: opt.write, + owner: opt.owner, + }, + })) + .await?; + + let new_bucket = self + .api_request(GetBucketInfoRequest { + id: Some(bucket.id), + global_alias: None, + search: None, + }) + .await?; + + if let Some(new_key) = new_bucket + .keys + .iter() + .find(|k| k.access_key_id == key.access_key_id) + { + println!( + "New permissions for key {} on bucket {:.16}:\n read {}\n write {}\n owner {}", + key.access_key_id, + new_bucket.id, + new_key.permissions.read, + new_key.permissions.write, + new_key.permissions.owner + ); + } else { + println!( + "Access key {} has no permissions on bucket {:.16}", + key.access_key_id, new_bucket.id + ); + } + + Ok(()) + } + + pub async fn cmd_bucket_deny(&self, opt: PermBucketOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.bucket.clone()), + }) + .await?; + + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern.clone()), + show_secret_key: false, + }) + .await?; + + self.api_request(DenyBucketKeyRequest(BucketKeyPermChangeRequest { + bucket_id: bucket.id.clone(), + access_key_id: key.access_key_id.clone(), + permissions: ApiBucketKeyPerm { + read: opt.read, + write: opt.write, + owner: opt.owner, + }, + })) + .await?; + + let new_bucket = self + .api_request(GetBucketInfoRequest { + id: Some(bucket.id), + global_alias: None, + search: None, + }) + .await?; + + if let Some(new_key) = new_bucket + .keys + .iter() + .find(|k| k.access_key_id == key.access_key_id) + { + println!( + "New permissions for key {} on bucket {:.16}:\n read {}\n write {}\n owner {}", + key.access_key_id, + new_bucket.id, + new_key.permissions.read, + new_key.permissions.write, + new_key.permissions.owner + ); + } else { + println!( + "Access key {} no longer has permissions on bucket {:.16}", + key.access_key_id, new_bucket.id + ); + } + + Ok(()) + } + + pub async fn cmd_bucket_website(&self, opt: WebsiteOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.bucket.clone()), + }) + .await?; + + if !(opt.allow ^ opt.deny) { + return Err(Error::Message( + "You must specify exactly one flag, either --allow or --deny".to_string(), + )); + } + + let wa = if opt.allow { + UpdateBucketWebsiteAccess { + enabled: true, + index_document: Some(opt.index_document.clone()), + error_document: opt + .error_document + .or(bucket.website_config.and_then(|x| x.error_document.clone())), + } + } else { + UpdateBucketWebsiteAccess { + enabled: false, + index_document: None, + error_document: None, + } + }; + + self.api_request(UpdateBucketRequest { + id: bucket.id, + body: UpdateBucketRequestBody { + website_access: Some(wa), + quotas: None, + }, + }) + .await?; + + if opt.allow { + println!("Website access allowed for {}", &opt.bucket); + } else { + println!("Website access denied for {}", &opt.bucket); + } + + Ok(()) + } + + pub async fn cmd_bucket_set_quotas(&self, opt: SetQuotasOpt) -> Result<(), Error> { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(opt.bucket.clone()), + }) + .await?; + + if opt.max_size.is_none() && opt.max_objects.is_none() { + return Err(Error::Message( + "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(), + )); + } + + let new_quotas = ApiBucketQuotas { + max_size: match opt.max_size.as_deref() { + Some("none") => None, + Some(v) => Some( + v.parse::<bytesize::ByteSize>() + .ok_or_message(format!("Invalid size specified: {}", v))? + .as_u64(), + ), + None => bucket.quotas.max_size, + }, + max_objects: match opt.max_objects.as_deref() { + Some("none") => None, + Some(v) => Some( + v.parse::<u64>() + .ok_or_message(format!("Invalid number: {}", v))?, + ), + None => bucket.quotas.max_objects, + }, + }; + + self.api_request(UpdateBucketRequest { + id: bucket.id.clone(), + body: UpdateBucketRequestBody { + website_access: None, + quotas: Some(new_quotas), + }, + }) + .await?; + + println!("Quotas updated for bucket {:.16}", bucket.id); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli_v2/cluster.rs new file mode 100644 index 00000000..34a28674 --- /dev/null +++ b/src/garage/cli_v2/cluster.rs @@ -0,0 +1,158 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::layout::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_status(&self) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + + println!("==== HEALTHY NODES ===="); + + let mut healthy_nodes = + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; + + for adv in status.nodes.iter().filter(|adv| adv.is_up) { + let host = adv.hostname.as_deref().unwrap_or("?"); + let addr = match adv.addr { + Some(addr) => addr.to_string(), + None => "N/A".to_string(), + }; + if let Some(cfg) = &adv.role { + let data_avail = match &adv.data_partition { + _ if cfg.capacity.is_none() => "N/A".into(), + Some(FreeSpaceResp { available, total }) => { + let pct = (*available as f64) / (*total as f64) * 100.; + let avail_str = bytesize::ByteSize::b(*available); + format!("{} ({:.1}%)", avail_str, pct) + } + None => "?".into(), + }; + healthy_nodes.push(format!( + "{id:.16}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", + id = adv.id, + host = host, + addr = addr, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = capacity_string(cfg.capacity), + data_avail = data_avail, + )); + } else { + let status = match layout.staged_role_changes.iter().find(|x| x.id == adv.id) { + Some(NodeRoleChange { + action: NodeRoleChangeEnum::Update { .. }, + .. + }) => "pending...", + _ if adv.draining => "draining metadata..", + _ => "NO ROLE ASSIGNED", + }; + healthy_nodes.push(format!( + "{id:.16}\t{h}\t{addr}\t\t\t{status}", + id = adv.id, + h = host, + addr = addr, + status = status, + )); + } + } + format_table(healthy_nodes); + + let tf = timeago::Formatter::new(); + let mut drain_msg = false; + let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()]; + for adv in status.nodes.iter().filter(|x| !x.is_up) { + let node = &adv.id; + + let host = adv.hostname.as_deref().unwrap_or("?"); + let last_seen = adv + .last_seen_secs_ago + .map(|s| tf.convert(Duration::from_secs(s))) + .unwrap_or_else(|| "never seen".into()); + + if let Some(cfg) = &adv.role { + let capacity = capacity_string(cfg.capacity); + + failed_nodes.push(format!( + "{id:.16}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", + id = node, + host = host, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = capacity, + last_seen = last_seen, + )); + } else { + let status = match layout.staged_role_changes.iter().find(|x| x.id == adv.id) { + Some(NodeRoleChange { + action: NodeRoleChangeEnum::Update { .. }, + .. + }) => "pending...", + _ if adv.draining => { + drain_msg = true; + "draining metadata.." + } + _ => unreachable!(), + }; + + failed_nodes.push(format!( + "{id:.16}\t{host}\t\t\t{status}\t{last_seen}", + id = node, + host = host, + status = status, + last_seen = last_seen, + )); + } + } + + if failed_nodes.len() > 1 { + println!("\n==== FAILED NODES ===="); + format_table(failed_nodes); + if drain_msg { + println!(); + println!("Your cluster is expecting to drain data from nodes that are currently unavailable."); + println!( + "If these nodes are definitely dead, please review the layout history with" + ); + println!( + "`garage layout history` and use `garage layout skip-dead-nodes` to force progress." + ); + } + } + + if print_staging_role_changes(&layout) { + println!(); + println!( + "Please use `garage layout show` to check the proposed new layout and apply it." + ); + println!(); + } + + Ok(()) + } + + pub async fn cmd_connect(&self, opt: ConnectNodeOpt) -> Result<(), Error> { + let res = self + .api_request(ConnectClusterNodesRequest(vec![opt.node])) + .await?; + if res.0.len() != 1 { + return Err(Error::Message(format!("unexpected response: {:?}", res))); + } + let res = res.0.into_iter().next().unwrap(); + if res.success { + println!("Success."); + Ok(()) + } else { + Err(Error::Message(format!( + "Failure: {}", + res.error.unwrap_or_default() + ))) + } + } +} diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli_v2/key.rs new file mode 100644 index 00000000..ff403a9a --- /dev/null +++ b/src/garage/cli_v2/key.rs @@ -0,0 +1,227 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_key(&self, cmd: KeyOperation) -> Result<(), Error> { + match cmd { + KeyOperation::List => self.cmd_list_keys().await, + KeyOperation::Info(query) => self.cmd_key_info(query).await, + KeyOperation::Create(query) => self.cmd_create_key(query).await, + KeyOperation::Rename(query) => self.cmd_rename_key(query).await, + KeyOperation::Delete(query) => self.cmd_delete_key(query).await, + KeyOperation::Allow(query) => self.cmd_allow_key(query).await, + KeyOperation::Deny(query) => self.cmd_deny_key(query).await, + KeyOperation::Import(query) => self.cmd_import_key(query).await, + } + } + + pub async fn cmd_list_keys(&self) -> Result<(), Error> { + let keys = self.api_request(ListKeysRequest).await?; + + println!("List of keys:"); + let mut table = vec![]; + for key in keys.0.iter() { + table.push(format!("\t{}\t{}", key.id, key.name)); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_key_info(&self, opt: KeyInfoOpt) -> Result<(), Error> { + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern), + show_secret_key: opt.show_secret, + }) + .await?; + + print_key_info(&key); + + Ok(()) + } + + pub async fn cmd_create_key(&self, opt: KeyNewOpt) -> Result<(), Error> { + let key = self + .api_request(CreateKeyRequest { + name: Some(opt.name), + }) + .await?; + + print_key_info(&key.0); + + Ok(()) + } + + pub async fn cmd_rename_key(&self, opt: KeyRenameOpt) -> Result<(), Error> { + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern), + show_secret_key: false, + }) + .await?; + + let new_key = self + .api_request(UpdateKeyRequest { + id: key.access_key_id, + body: UpdateKeyRequestBody { + name: Some(opt.new_name), + allow: None, + deny: None, + }, + }) + .await?; + + print_key_info(&new_key.0); + + Ok(()) + } + + pub async fn cmd_delete_key(&self, opt: KeyDeleteOpt) -> Result<(), Error> { + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern), + show_secret_key: false, + }) + .await?; + + if !opt.yes { + println!("About to delete key {}...", key.access_key_id); + return Err(Error::Message( + "Add --yes flag to really perform this operation".to_string(), + )); + } + + self.api_request(DeleteKeyRequest { + id: key.access_key_id.clone(), + }) + .await?; + + println!("Access key {} has been deleted.", key.access_key_id); + + Ok(()) + } + + pub async fn cmd_allow_key(&self, opt: KeyPermOpt) -> Result<(), Error> { + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern), + show_secret_key: false, + }) + .await?; + + let new_key = self + .api_request(UpdateKeyRequest { + id: key.access_key_id, + body: UpdateKeyRequestBody { + name: None, + allow: Some(KeyPerm { + create_bucket: opt.create_bucket, + }), + deny: None, + }, + }) + .await?; + + print_key_info(&new_key.0); + + Ok(()) + } + + pub async fn cmd_deny_key(&self, opt: KeyPermOpt) -> Result<(), Error> { + let key = self + .api_request(GetKeyInfoRequest { + id: None, + search: Some(opt.key_pattern), + show_secret_key: false, + }) + .await?; + + let new_key = self + .api_request(UpdateKeyRequest { + id: key.access_key_id, + body: UpdateKeyRequestBody { + name: None, + allow: None, + deny: Some(KeyPerm { + create_bucket: opt.create_bucket, + }), + }, + }) + .await?; + + print_key_info(&new_key.0); + + Ok(()) + } + + pub async fn cmd_import_key(&self, opt: KeyImportOpt) -> Result<(), Error> { + if !opt.yes { + return Err(Error::Message("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string())); + } + + let new_key = self + .api_request(ImportKeyRequest { + name: Some(opt.name), + access_key_id: opt.key_id, + secret_access_key: opt.secret_key, + }) + .await?; + + print_key_info(&new_key.0); + + Ok(()) + } +} + +fn print_key_info(key: &GetKeyInfoResponse) { + println!("Key name: {}", key.name); + println!("Key ID: {}", key.access_key_id); + println!( + "Secret key: {}", + key.secret_access_key.as_deref().unwrap_or("(redacted)") + ); + println!("Can create buckets: {}", key.permissions.create_bucket); + + println!("\nKey-specific bucket aliases:"); + let mut table = vec![]; + for bucket in key.buckets.iter() { + for la in bucket.local_aliases.iter() { + table.push(format!( + "\t{}\t{}\t{}", + la, + bucket.global_aliases.join(","), + bucket.id + )); + } + } + format_table(table); + + println!("\nAuthorized buckets:"); + let mut table = vec![]; + for bucket in key.buckets.iter() { + let rflag = if bucket.permissions.read { "R" } else { " " }; + let wflag = if bucket.permissions.write { "W" } else { " " }; + let oflag = if bucket.permissions.owner { "O" } else { " " }; + table.push(format!( + "\t{}{}{}\t{}\t{}\t{:.16}", + rflag, + wflag, + oflag, + bucket.global_aliases.join(","), + bucket.local_aliases.join(","), + bucket.id + )); + } + format_table(table); +} diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs new file mode 100644 index 00000000..d44771c7 --- /dev/null +++ b/src/garage/cli_v2/layout.rs @@ -0,0 +1,284 @@ +use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::layout as cli_v1; +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> { + match cmd { + LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await, + LayoutOperation::Remove(remove_opt) => self.cmd_remove_role(remove_opt).await, + LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await, + LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await, + + // TODO + LayoutOperation::Show => { + cli_v1::cmd_show_layout(&self.system_rpc_endpoint, self.rpc_host).await + } + LayoutOperation::Config(config_opt) => { + cli_v1::cmd_config_layout(&self.system_rpc_endpoint, self.rpc_host, config_opt) + .await + } + LayoutOperation::History => { + cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await + } + LayoutOperation::SkipDeadNodes(assume_sync_opt) => { + cli_v1::cmd_layout_skip_dead_nodes( + &self.system_rpc_endpoint, + self.rpc_host, + assume_sync_opt, + ) + .await + } + } + } + + pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + + let all_node_ids_iter = status + .nodes + .iter() + .map(|x| x.id.as_str()) + .chain(layout.roles.iter().map(|x| x.id.as_str())); + + let mut actions = vec![]; + + for node in opt.replace.iter() { + let id = find_matching_node(all_node_ids_iter.clone(), &node)?; + + actions.push(NodeRoleChange { + id, + action: NodeRoleChangeEnum::Remove { remove: true }, + }); + } + + for node in opt.node_ids.iter() { + let id = find_matching_node(all_node_ids_iter.clone(), &node)?; + + let current = get_staged_or_current_role(&id, &layout); + + let zone = opt + .zone + .clone() + .or_else(|| current.as_ref().map(|c| c.zone.clone())) + .ok_or_message("Please specify a zone with the -z flag")?; + + let capacity = if opt.gateway { + if opt.capacity.is_some() { + return Err(Error::Message("Please specify only -c or -g".into())); + } + None + } else if let Some(cap) = opt.capacity { + Some(cap.as_u64()) + } else { + current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity + }; + + let tags = if !opt.tags.is_empty() { + opt.tags.clone() + } else if let Some(cur) = current.as_ref() { + cur.tags.clone() + } else { + vec![] + }; + + actions.push(NodeRoleChange { + id, + action: NodeRoleChangeEnum::Update { + zone, + capacity, + tags, + }, + }); + } + + self.api_request(UpdateClusterLayoutRequest(actions)) + .await?; + + println!("Role changes are staged but not yet committed."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) + } + + pub async fn cmd_remove_role(&self, opt: RemoveRoleOpt) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + + let all_node_ids_iter = status + .nodes + .iter() + .map(|x| x.id.as_str()) + .chain(layout.roles.iter().map(|x| x.id.as_str())); + + let id = find_matching_node(all_node_ids_iter.clone(), &opt.node_id)?; + + let actions = vec![NodeRoleChange { + id, + action: NodeRoleChangeEnum::Remove { remove: true }, + }]; + + self.api_request(UpdateClusterLayoutRequest(actions)) + .await?; + + println!("Role removal is staged but not yet committed."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) + } + + pub async fn cmd_apply_layout(&self, apply_opt: ApplyLayoutOpt) -> Result<(), Error> { + let missing_version_error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + + let req = ApplyClusterLayoutRequest { + version: apply_opt.version.ok_or_message(missing_version_error)?, + }; + let res = self.api_request(req).await?; + + for line in res.message.iter() { + println!("{}", line); + } + + println!("New cluster layout with updated role assignment has been applied in cluster."); + println!("Data will now be moved around between nodes accordingly."); + + Ok(()) + } + + pub async fn cmd_revert_layout(&self, revert_opt: RevertLayoutOpt) -> Result<(), Error> { + if !revert_opt.yes { + return Err(Error::Message( + "Please add the --yes flag to run the layout revert operation".into(), + )); + } + + self.api_request(RevertClusterLayoutRequest).await?; + + println!("All proposed role changes in cluster layout have been canceled."); + Ok(()) + } +} + +// -------------------------- +// ---- helper functions ---- +// -------------------------- + +pub fn capacity_string(v: Option<u64>) -> String { + match v { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } +} + +pub fn get_staged_or_current_role( + id: &str, + layout: &GetClusterLayoutResponse, +) -> Option<NodeRoleResp> { + for node in layout.staged_role_changes.iter() { + if node.id == id { + return match &node.action { + NodeRoleChangeEnum::Remove { .. } => None, + NodeRoleChangeEnum::Update { + zone, + capacity, + tags, + } => Some(NodeRoleResp { + id: id.to_string(), + zone: zone.to_string(), + capacity: *capacity, + tags: tags.clone(), + }), + }; + } + } + + for node in layout.roles.iter() { + if node.id == id { + return Some(node.clone()); + } + } + + None +} + +pub fn find_matching_node<'a>( + cand: impl std::iter::Iterator<Item = &'a str>, + pattern: &'a str, +) -> Result<String, Error> { + let mut candidates = vec![]; + for c in cand { + if c.starts_with(pattern) && !candidates.contains(&c) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0].to_string()) + } +} + +pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool { + let has_role_changes = !layout.staged_role_changes.is_empty(); + + // TODO!! Layout parameters + let has_layout_changes = false; + + if has_role_changes || has_layout_changes { + println!(); + println!("==== STAGED ROLE CHANGES ===="); + if has_role_changes { + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for change in layout.staged_role_changes.iter() { + match &change.action { + NodeRoleChangeEnum::Update { + tags, + zone, + capacity, + } => { + let tags = tags.join(","); + table.push(format!( + "{:.16}\t{}\t{}\t{}", + change.id, + tags, + zone, + capacity_string(*capacity), + )); + } + NodeRoleChangeEnum::Remove { .. } => { + table.push(format!("{:.16}\tREMOVED", change.id)); + } + } + } + format_table(table); + println!(); + } + //TODO + /* + if has_layout_changes { + println!( + "Zone redundancy: {}", + staging.parameters.get().zone_redundancy + ); + } + */ + true + } else { + false + } +} diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs new file mode 100644 index 00000000..692b7c1c --- /dev/null +++ b/src/garage/cli_v2/mod.rs @@ -0,0 +1,106 @@ +pub mod bucket; +pub mod cluster; +pub mod key; +pub mod layout; + +use std::convert::TryFrom; +use std::sync::Arc; +use std::time::Duration; + +use garage_util::error::*; + +use garage_rpc::system::*; +use garage_rpc::*; + +use garage_api::admin::api::*; +use garage_api::admin::EndpointHandler as AdminApiEndpoint; + +use crate::admin::*; +use crate::cli as cli_v1; +use crate::cli::structs::*; +use crate::cli::Command; + +pub struct Cli { + pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>, + pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>, + pub rpc_host: NodeID, +} + +impl Cli { + pub async fn handle(&self, cmd: Command) -> Result<(), Error> { + match cmd { + Command::Status => self.cmd_status().await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + self.cmd_connect(connect_opt).await + } + Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, + Command::Bucket(bo) => self.cmd_bucket(bo).await, + Command::Key(ko) => self.cmd_key(ko).await, + + // TODO + Command::Repair(ro) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::LaunchRepair(ro), + ) + .await + .ok_or_message("cli_v1"), + Command::Stats(so) => { + cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so)) + .await + .ok_or_message("cli_v1") + } + Command::Worker(wo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::Worker(wo), + ) + .await + .ok_or_message("cli_v1"), + Command::Block(bo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::BlockOperation(bo), + ) + .await + .ok_or_message("cli_v1"), + Command::Meta(mo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::MetaOperation(mo), + ) + .await + .ok_or_message("cli_v1"), + + _ => unreachable!(), + } + } + + pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error> + where + T: AdminApiEndpoint, + AdminApiRequest: From<T>, + <T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>, + { + let req = AdminApiRequest::from(req); + let req_name = req.name(); + match self + .admin_rpc_endpoint + .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL) + .await? + .ok_or_message("rpc")? + { + AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp) + .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))), + AdminRpc::ApiErrorResponse { + http_code, + error_code, + message, + } => Err(Error::Message(format!( + "{} returned {} ({}): {}", + req_name, error_code, http_code, message + ))), + m => Err(Error::unexpected_rpc_message(m)), + } + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index ac95e854..08c7cee7 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -6,6 +6,7 @@ extern crate tracing; mod admin; mod cli; +mod cli_v2; mod repair; mod secrets; mod server; @@ -34,8 +35,6 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use garage_model::helper::error::Error as HelperError; - use admin::*; use cli::*; use secrets::Secrets; @@ -284,10 +283,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); - match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await { - Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))), - Err(HelperError::BadRequest(b)) => Err(Error::Message(b)), - Err(e) => Err(Error::Message(format!("{}", e))), - Ok(x) => Ok(x), - } + let cli = cli_v2::Cli { + system_rpc_endpoint, + admin_rpc_endpoint, + rpc_host: id, + }; + + cli.handle(opt.cmd).await } |