From bff5333c622ce5c8c3a151b4bd2d7ad2d04004c2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Oct 2021 17:29:27 +0200 Subject: Move things around, improvements to CLI --- src/api/error.rs | 4 +- src/garage/admin.rs | 504 +++++++++++++++++++++++++++++++++++ src/garage/admin_rpc.rs | 504 ----------------------------------- src/garage/cli.rs | 661 ---------------------------------------------- src/garage/cli/cmd.rs | 284 ++++++++++++++++++++ src/garage/cli/init.rs | 65 +++++ src/garage/cli/mod.rs | 9 + src/garage/cli/structs.rs | 296 +++++++++++++++++++++ src/garage/cli/util.rs | 83 ++++++ src/garage/main.rs | 85 ++---- src/garage/server.rs | 2 +- src/rpc/rpc_helper.rs | 2 +- src/rpc/system.rs | 63 ++++- src/util/error.rs | 42 ++- src/web/error.rs | 2 +- 15 files changed, 1357 insertions(+), 1249 deletions(-) create mode 100644 src/garage/admin.rs delete mode 100644 src/garage/admin_rpc.rs delete mode 100644 src/garage/cli.rs create mode 100644 src/garage/cli/cmd.rs create mode 100644 src/garage/cli/init.rs create mode 100644 src/garage/cli/mod.rs create mode 100644 src/garage/cli/structs.rs create mode 100644 src/garage/cli/util.rs (limited to 'src') diff --git a/src/api/error.rs b/src/api/error.rs index 35fa404f..31ba7230 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -83,7 +83,7 @@ impl Error { Error::NotFound => StatusCode::NOT_FOUND, Error::Forbidden(_) => StatusCode::FORBIDDEN, Error::InternalError( - GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(_, _, _), ) => StatusCode::SERVICE_UNAVAILABLE, Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => { StatusCode::INTERNAL_SERVER_ERROR @@ -98,7 +98,7 @@ impl Error { Error::Forbidden(_) => "AccessDenied", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::InternalError( - GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(_, _, _), ) => "ServiceUnavailable", Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError", _ => "InvalidRequest", diff --git a/src/garage/admin.rs b/src/garage/admin.rs new file mode 100644 index 00000000..b5fc9a7e --- /dev/null +++ b/src/garage/admin.rs @@ -0,0 +1,504 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_util::error::Error; + +use garage_table::crdt::Crdt; +use garage_table::replication::*; +use garage_table::*; + +use garage_rpc::*; + +use garage_model::bucket_table::*; +use garage_model::garage::Garage; +use garage_model::key_table::*; + +use crate::cli::*; +use crate::repair::Repair; + +pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpc { + BucketOperation(BucketOperation), + KeyOperation(KeyOperation), + LaunchRepair(RepairOpt), + Stats(StatsOpt), + + // Replies + Ok(String), + BucketList(Vec), + BucketInfo(Bucket), + KeyList(Vec<(String, String)>), + KeyInfo(Key), +} + +impl Rpc for AdminRpc { + type Response = Result; +} + +pub struct AdminRpcHandler { + garage: Arc, + endpoint: Arc>, +} + +impl AdminRpcHandler { + pub fn new(garage: Arc) -> Arc { + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { garage, endpoint }); + admin.endpoint.set_handler(admin.clone()); + admin + } + + async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { + match cmd { + BucketOperation::List => { + let bucket_names = self + .garage + .bucket_table + .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .await? + .iter() + .map(|b| b.name.to_string()) + .collect::>(); + Ok(AdminRpc::BucketList(bucket_names)) + } + BucketOperation::Info(query) => { + let bucket = self.get_existing_bucket(&query.name).await?; + Ok(AdminRpc::BucketInfo(bucket)) + } + BucketOperation::Create(query) => { + let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { + Some(mut bucket) => { + if !bucket.is_deleted() { + return Err(Error::BadRpc(format!( + "Bucket {} already exists", + query.name + ))); + } + bucket + .state + .update(BucketState::Present(BucketParams::new())); + bucket + } + None => Bucket::new(query.name.clone()), + }; + self.garage.bucket_table.insert(&bucket).await?; + Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name))) + } + BucketOperation::Delete(query) => { + let mut bucket = self.get_existing_bucket(&query.name).await?; + let objects = self + .garage + .object_table + .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10) + .await?; + if !objects.is_empty() { + return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); + } + if !query.yes { + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); + } + // --- done checking, now commit --- + for (key_id, _, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { + if !key.deleted.get() { + self.update_key_bucket(&key, &bucket.name, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Key not found: {}", key_id))); + } + } + bucket.state.update(BucketState::Deleted); + self.garage.bucket_table.insert(&bucket).await?; + Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) + } + BucketOperation::Allow(query) => { + let key = self.get_existing_key(&query.key_pattern).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = query.read || key.allow_read(&query.bucket); + let allow_write = query.write || key.allow_write(&query.bucket); + self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) + .await?; + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &key.key_id, &query.bucket, allow_read, allow_write + ))) + } + BucketOperation::Deny(query) => { + let key = self.get_existing_key(&query.key_pattern).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = !query.read && key.allow_read(&query.bucket); + let allow_write = !query.write && key.allow_write(&query.bucket); + self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) + .await?; + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &key.key_id, &query.bucket, allow_read, allow_write + ))) + } + BucketOperation::Website(query) => { + let mut bucket = self.get_existing_bucket(&query.bucket).await?; + + if !(query.allow ^ query.deny) { + return Err(Error::Message( + "You must specify exactly one flag, either --allow or --deny".to_string(), + )); + } + + if let BucketState::Present(state) = bucket.state.get_mut() { + state.website.update(query.allow); + self.garage.bucket_table.insert(&bucket).await?; + let msg = if query.allow { + format!("Website access allowed for {}", &query.bucket) + } else { + format!("Website access denied for {}", &query.bucket) + }; + + Ok(AdminRpc::Ok(msg)) + } else { + unreachable!(); + } + } + } + } + + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result { + match cmd { + KeyOperation::List => { + let key_ids = self + .garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + ) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.name.get().clone())) + .collect::>(); + Ok(AdminRpc::KeyList(key_ids)) + } + KeyOperation::Info(query) => { + let key = self.get_existing_key(&query.key_pattern).await?; + Ok(AdminRpc::KeyInfo(key)) + } + KeyOperation::New(query) => { + let key = Key::new(query.name.clone()); + self.garage.key_table.insert(&key).await?; + Ok(AdminRpc::KeyInfo(key)) + } + KeyOperation::Rename(query) => { + let mut key = self.get_existing_key(&query.key_pattern).await?; + key.name.update(query.new_name.clone()); + self.garage.key_table.insert(&key).await?; + Ok(AdminRpc::KeyInfo(key)) + } + KeyOperation::Delete(query) => { + let key = self.get_existing_key(&query.key_pattern).await?; + if !query.yes { + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); + } + // --- done checking, now commit --- + for (ab_name, _, _) in key.authorized_buckets.items().iter() { + if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { + if !bucket.is_deleted() { + self.update_bucket_key(bucket, &key.key_id, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Bucket not found: {}", ab_name))); + } + } + let del_key = Key::delete(key.key_id.to_string()); + self.garage.key_table.insert(&del_key).await?; + Ok(AdminRpc::Ok(format!( + "Key {} was deleted successfully.", + key.key_id + ))) + } + KeyOperation::Import(query) => { + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; + if prev_key.is_some() { + return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); + self.garage.key_table.insert(&imported_key).await?; + Ok(AdminRpc::KeyInfo(imported_key)) + } + } + } + + #[allow(clippy::ptr_arg)] + async fn get_existing_bucket(&self, bucket: &String) -> Result { + self.garage + .bucket_table + .get(&EmptyKey, bucket) + .await? + .filter(|b| !b.is_deleted()) + .map(Ok) + .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket)))) + } + + async fn get_existing_key(&self, pattern: &str) -> Result { + let candidates = self + .garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Matches(pattern.to_string())), + 10, + ) + .await? + .into_iter() + .filter(|k| !k.deleted.get()) + .collect::>(); + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } + } + + /// Update **bucket table** to inform of the new linked key + async fn update_bucket_key( + &self, + mut bucket: Bucket, + key_id: &str, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + if let BucketState::Present(params) = bucket.state.get_mut() { + let ak = &mut params.authorized_keys; + let old_ak = ak.take_and_clear(); + ak.merge(&old_ak.update_mutator( + key_id.to_string(), + PermissionSet { + allow_read, + allow_write, + }, + )); + } else { + return Err(Error::Message( + "Bucket is deleted in update_bucket_key".to_string(), + )); + } + self.garage.bucket_table.insert(&bucket).await?; + Ok(()) + } + + /// Update **key table** to inform of the new linked bucket + async fn update_key_bucket( + &self, + key: &Key, + bucket: &str, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let mut key = key.clone(); + let old_map = key.authorized_buckets.take_and_clear(); + key.authorized_buckets.merge(&old_map.update_mutator( + bucket.to_string(), + PermissionSet { + allow_read, + allow_write, + }, + )); + self.garage.key_table.insert(&key).await?; + Ok(()) + } + + async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { + if !opt.yes { + return Err(Error::BadRpc( + "Please provide the --yes flag to initiate repair operations.".to_string(), + )); + } + if opt.all_nodes { + let mut opt_to_send = opt.clone(); + opt_to_send.all_nodes = false; + + let mut failures = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.config.members.keys() { + let node = (*node).into(); + let resp = self + .endpoint + .call( + &node, + &AdminRpc::LaunchRepair(opt_to_send.clone()), + PRIO_NORMAL, + ) + .await; + if !matches!(resp, Ok(Ok(_))) { + failures.push(node); + } + } + if failures.is_empty() { + Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) + } else { + Err(Error::Message(format!( + "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", + failures + ))) + } + } else { + let repair = Repair { + garage: self.garage.clone(), + }; + self.garage + .system + .background + .spawn_worker("Repair worker".into(), move |must_exit| async move { + repair.repair_worker(opt, must_exit).await + }); + Ok(AdminRpc::Ok(format!( + "Repair launched on {:?}", + self.garage.system.id + ))) + } + } + + async fn handle_stats(&self, opt: StatsOpt) -> Result { + if opt.all_nodes { + let mut ret = String::new(); + let ring = self.garage.system.ring.borrow().clone(); + + for node in ring.config.members.keys() { + let mut opt = opt.clone(); + opt.all_nodes = false; + + writeln!(&mut ret, "\n======================").unwrap(); + writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); + + let node_id = (*node).into(); + match self + .endpoint + .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) + .await? + { + Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), + Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), + Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(), + } + } + Ok(AdminRpc::Ok(ret)) + } else { + Ok(AdminRpc::Ok(self.gather_stats_local(opt))) + } + } + + fn gather_stats_local(&self, opt: StatsOpt) -> String { + let mut ret = String::new(); + writeln!( + &mut ret, + "\nGarage version: {}", + option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + )) + ) + .unwrap(); + + // Gather ring statistics + let ring = self.garage.system.ring.borrow().clone(); + let mut ring_nodes = HashMap::new(); + for (_i, loc) in ring.partitions().iter() { + for n in ring.get_nodes(loc, ring.replication_factor).iter() { + if !ring_nodes.contains_key(n) { + ring_nodes.insert(*n, 0usize); + } + *ring_nodes.get_mut(n).unwrap() += 1; + } + } + writeln!(&mut ret, "\nRing nodes & partition count:").unwrap(); + for (n, c) in ring_nodes.iter() { + writeln!(&mut ret, " {:?} {}", n, c).unwrap(); + } + + self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.key_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.object_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.version_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt); + + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + if opt.detailed { + writeln!( + &mut ret, + " number of blocks: {}", + self.garage.block_manager.rc_len() + ) + .unwrap(); + } + writeln!( + &mut ret, + " resync queue length: {}", + self.garage.block_manager.resync_queue_len() + ) + .unwrap(); + + ret + } + + fn gather_table_stats(&self, to: &mut String, t: &Arc>, opt: &StatsOpt) + where + F: TableSchema + 'static, + R: TableReplication + 'static, + { + writeln!(to, "\nTable stats for {}", t.data.name).unwrap(); + if opt.detailed { + writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); + writeln!( + to, + " Merkle tree size: {}", + t.merkle_updater.merkle_tree_len() + ) + .unwrap(); + } + writeln!( + to, + " Merkle updater todo queue length: {}", + t.merkle_updater.todo_len() + ) + .unwrap(); + writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); + } +} + +#[async_trait] +impl EndpointHandler for AdminRpcHandler { + async fn handle( + self: &Arc, + message: &AdminRpc, + _from: NodeID, + ) -> Result { + match message { + AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, + _ => Err(Error::BadRpc("Invalid RPC".to_string())), + } + } +} diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs deleted file mode 100644 index b5fc9a7e..00000000 --- a/src/garage/admin_rpc.rs +++ /dev/null @@ -1,504 +0,0 @@ -use std::collections::HashMap; -use std::fmt::Write; -use std::sync::Arc; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use garage_util::error::Error; - -use garage_table::crdt::Crdt; -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::*; - -use garage_model::bucket_table::*; -use garage_model::garage::Garage; -use garage_model::key_table::*; - -use crate::cli::*; -use crate::repair::Repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -pub enum AdminRpc { - BucketOperation(BucketOperation), - KeyOperation(KeyOperation), - LaunchRepair(RepairOpt), - Stats(StatsOpt), - - // Replies - Ok(String), - BucketList(Vec), - BucketInfo(Bucket), - KeyList(Vec<(String, String)>), - KeyInfo(Key), -} - -impl Rpc for AdminRpc { - type Response = Result; -} - -pub struct AdminRpcHandler { - garage: Arc, - endpoint: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc) -> Arc { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { garage, endpoint }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { - match cmd { - BucketOperation::List => { - let bucket_names = self - .garage - .bucket_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) - .await? - .iter() - .map(|b| b.name.to_string()) - .collect::>(); - Ok(AdminRpc::BucketList(bucket_names)) - } - BucketOperation::Info(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - Ok(AdminRpc::BucketInfo(bucket)) - } - BucketOperation::Create(query) => { - let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { - Some(mut bucket) => { - if !bucket.is_deleted() { - return Err(Error::BadRpc(format!( - "Bucket {} already exists", - query.name - ))); - } - bucket - .state - .update(BucketState::Present(BucketParams::new())); - bucket - } - None => Bucket::new(query.name.clone()), - }; - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name))) - } - BucketOperation::Delete(query) => { - let mut bucket = self.get_existing_bucket(&query.name).await?; - let objects = self - .garage - .object_table - .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10) - .await?; - if !objects.is_empty() { - return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); - } - if !query.yes { - return Err(Error::BadRpc( - "Add --yes flag to really perform this operation".to_string(), - )); - } - // --- done checking, now commit --- - for (key_id, _, _) in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { - if !key.deleted.get() { - self.update_key_bucket(&key, &bucket.name, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Key not found: {}", key_id))); - } - } - bucket.state.update(BucketState::Deleted); - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) - } - BucketOperation::Allow(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = query.read || key.allow_read(&query.bucket); - let allow_write = query.write || key.allow_write(&query.bucket); - self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) - .await?; - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &key.key_id, &query.bucket, allow_read, allow_write - ))) - } - BucketOperation::Deny(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = !query.read && key.allow_read(&query.bucket); - let allow_write = !query.write && key.allow_write(&query.bucket); - self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) - .await?; - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &key.key_id, &query.bucket, allow_read, allow_write - ))) - } - BucketOperation::Website(query) => { - let mut bucket = self.get_existing_bucket(&query.bucket).await?; - - if !(query.allow ^ query.deny) { - return Err(Error::Message( - "You must specify exactly one flag, either --allow or --deny".to_string(), - )); - } - - if let BucketState::Present(state) = bucket.state.get_mut() { - state.website.update(query.allow); - self.garage.bucket_table.insert(&bucket).await?; - let msg = if query.allow { - format!("Website access allowed for {}", &query.bucket) - } else { - format!("Website access denied for {}", &query.bucket) - }; - - Ok(AdminRpc::Ok(msg)) - } else { - unreachable!(); - } - } - } - } - - async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result { - match cmd { - KeyOperation::List => { - let key_ids = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), - 10000, - ) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.name.get().clone())) - .collect::>(); - Ok(AdminRpc::KeyList(key_ids)) - } - KeyOperation::Info(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::New(query) => { - let key = Key::new(query.name.clone()); - self.garage.key_table.insert(&key).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::Rename(query) => { - let mut key = self.get_existing_key(&query.key_pattern).await?; - key.name.update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::Delete(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - if !query.yes { - return Err(Error::BadRpc( - "Add --yes flag to really perform this operation".to_string(), - )); - } - // --- done checking, now commit --- - for (ab_name, _, _) in key.authorized_buckets.items().iter() { - if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { - if !bucket.is_deleted() { - self.update_bucket_key(bucket, &key.key_id, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Bucket not found: {}", ab_name))); - } - } - let del_key = Key::delete(key.key_id.to_string()); - self.garage.key_table.insert(&del_key).await?; - Ok(AdminRpc::Ok(format!( - "Key {} was deleted successfully.", - key.key_id - ))) - } - KeyOperation::Import(query) => { - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; - if prev_key.is_some() { - return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); - } - let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); - self.garage.key_table.insert(&imported_key).await?; - Ok(AdminRpc::KeyInfo(imported_key)) - } - } - } - - #[allow(clippy::ptr_arg)] - async fn get_existing_bucket(&self, bucket: &String) -> Result { - self.garage - .bucket_table - .get(&EmptyKey, bucket) - .await? - .filter(|b| !b.is_deleted()) - .map(Ok) - .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket)))) - } - - async fn get_existing_key(&self, pattern: &str) -> Result { - let candidates = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Matches(pattern.to_string())), - 10, - ) - .await? - .into_iter() - .filter(|k| !k.deleted.get()) - .collect::>(); - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} matching keys", - candidates.len() - ))) - } else { - Ok(candidates.into_iter().next().unwrap()) - } - } - - /// Update **bucket table** to inform of the new linked key - async fn update_bucket_key( - &self, - mut bucket: Bucket, - key_id: &str, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - if let BucketState::Present(params) = bucket.state.get_mut() { - let ak = &mut params.authorized_keys; - let old_ak = ak.take_and_clear(); - ak.merge(&old_ak.update_mutator( - key_id.to_string(), - PermissionSet { - allow_read, - allow_write, - }, - )); - } else { - return Err(Error::Message( - "Bucket is deleted in update_bucket_key".to_string(), - )); - } - self.garage.bucket_table.insert(&bucket).await?; - Ok(()) - } - - /// Update **key table** to inform of the new linked bucket - async fn update_key_bucket( - &self, - key: &Key, - bucket: &str, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - let mut key = key.clone(); - let old_map = key.authorized_buckets.take_and_clear(); - key.authorized_buckets.merge(&old_map.update_mutator( - bucket.to_string(), - PermissionSet { - allow_read, - allow_write, - }, - )); - self.garage.key_table.insert(&key).await?; - Ok(()) - } - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRpc( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.config.members.keys() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - &AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::Message(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - let repair = Repair { - garage: self.garage.clone(), - }; - self.garage - .system - .background - .spawn_worker("Repair worker".into(), move |must_exit| async move { - repair.repair_worker(opt, must_exit).await - }); - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } - - async fn handle_stats(&self, opt: StatsOpt) -> Result { - if opt.all_nodes { - let mut ret = String::new(); - let ring = self.garage.system.ring.borrow().clone(); - - for node in ring.config.members.keys() { - let mut opt = opt.clone(); - opt.all_nodes = false; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) - .await? - { - Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(), - } - } - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt))) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> String { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {}", - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - )) - ) - .unwrap(); - - // Gather ring statistics - let ring = self.garage.system.ring.borrow().clone(); - let mut ring_nodes = HashMap::new(); - for (_i, loc) in ring.partitions().iter() { - for n in ring.get_nodes(loc, ring.replication_factor).iter() { - if !ring_nodes.contains_key(n) { - ring_nodes.insert(*n, 0usize); - } - *ring_nodes.get_mut(n).unwrap() += 1; - } - } - writeln!(&mut ret, "\nRing nodes & partition count:").unwrap(); - for (n, c) in ring_nodes.iter() { - writeln!(&mut ret, " {:?} {}", n, c).unwrap(); - } - - self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.key_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.object_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.version_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt); - - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - if opt.detailed { - writeln!( - &mut ret, - " number of blocks: {}", - self.garage.block_manager.rc_len() - ) - .unwrap(); - } - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync_queue_len() - ) - .unwrap(); - - ret - } - - fn gather_table_stats(&self, to: &mut String, t: &Arc>, opt: &StatsOpt) - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - writeln!(to, "\nTable stats for {}", t.data.name).unwrap(); - if opt.detailed { - writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); - writeln!( - to, - " Merkle tree size: {}", - t.merkle_updater.merkle_tree_len() - ) - .unwrap(); - } - writeln!( - to, - " Merkle updater todo queue length: {}", - t.merkle_updater.todo_len() - ) - .unwrap(); - writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); - } -} - -#[async_trait] -impl EndpointHandler for AdminRpcHandler { - async fn handle( - self: &Arc, - message: &AdminRpc, - _from: NodeID, - ) -> Result { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - _ => Err(Error::BadRpc("Invalid RPC".to_string())), - } - } -} diff --git a/src/garage/cli.rs b/src/garage/cli.rs deleted file mode 100644 index 63bb506a..00000000 --- a/src/garage/cli.rs +++ /dev/null @@ -1,661 +0,0 @@ -use std::collections::HashSet; - -use serde::{Deserialize, Serialize}; -use structopt::StructOpt; - -use garage_util::data::Uuid; -use garage_util::error::Error; - -use garage_rpc::ring::*; -use garage_rpc::system::*; -use garage_rpc::*; - -use garage_model::bucket_table::*; -use garage_model::key_table::*; - -use crate::admin_rpc::*; - -#[derive(StructOpt, Debug)] -pub enum Command { - /// Run Garage server - #[structopt(name = "server")] - Server, - - /// Print identifier (public key) of this garage node. - /// Generates a new keypair if necessary. - #[structopt(name = "node-id")] - NodeId(NodeIdOpt), - - /// Get network status - #[structopt(name = "status")] - Status, - - /// Garage node operations - #[structopt(name = "node")] - Node(NodeOperation), - - /// Bucket operations - #[structopt(name = "bucket")] - Bucket(BucketOperation), - - /// Key operations - #[structopt(name = "key")] - Key(KeyOperation), - - /// Start repair of node data - #[structopt(name = "repair")] - Repair(RepairOpt), - - /// Gather node statistics - #[structopt(name = "stats")] - Stats(StatsOpt), -} - -#[derive(StructOpt, Debug)] -pub enum NodeOperation { - /// Connect to Garage node that is currently isolated from the system - #[structopt(name = "connect")] - Connect(ConnectNodeOpt), - - /// Configure Garage node - #[structopt(name = "configure")] - Configure(ConfigureNodeOpt), - - /// Remove Garage node from cluster - #[structopt(name = "remove")] - Remove(RemoveNodeOpt), -} - -#[derive(StructOpt, Debug)] -pub struct NodeIdOpt { - /// Do not print usage instructions to stderr - #[structopt(short = "q", long = "quiet")] - pub(crate) quiet: bool, -} - -#[derive(StructOpt, Debug)] -pub struct ConnectNodeOpt { - /// Node public key and address, in the format: - /// `@:` - node: String, -} - -#[derive(StructOpt, Debug)] -pub struct ConfigureNodeOpt { - /// Node to configure (prefix of hexadecimal node id) - node_id: String, - - /// Location (zone or datacenter) of the node - #[structopt(short = "z", long = "zone")] - zone: Option, - - /// Capacity (in relative terms, use 1 to represent your smallest server) - #[structopt(short = "c", long = "capacity")] - capacity: Option, - - /// Gateway-only node - #[structopt(short = "g", long = "gateway")] - gateway: bool, - - /// Optional node tag - #[structopt(short = "t", long = "tag")] - tag: Option, - - /// Replaced node(s): list of node IDs that will be removed from the current cluster - #[structopt(long = "replace")] - replace: Vec, -} - -#[derive(StructOpt, Debug)] -pub struct RemoveNodeOpt { - /// Node to configure (prefix of hexadecimal node id) - node_id: String, - - /// If this flag is not given, the node won't be removed - #[structopt(long = "yes")] - yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub enum BucketOperation { - /// List buckets - #[structopt(name = "list")] - List, - - /// Get bucket info - #[structopt(name = "info")] - Info(BucketOpt), - - /// Create bucket - #[structopt(name = "create")] - Create(BucketOpt), - - /// Delete bucket - #[structopt(name = "delete")] - Delete(DeleteBucketOpt), - - /// Allow key to read or write to bucket - #[structopt(name = "allow")] - Allow(PermBucketOpt), - - /// Deny key from reading or writing to bucket - #[structopt(name = "deny")] - Deny(PermBucketOpt), - - /// Expose as website or not - #[structopt(name = "website")] - Website(WebsiteOpt), -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct WebsiteOpt { - /// Create - #[structopt(long = "allow")] - pub allow: bool, - - /// Delete - #[structopt(long = "deny")] - pub deny: bool, - - /// Bucket name - pub bucket: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct BucketOpt { - /// Bucket name - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct DeleteBucketOpt { - /// Bucket name - pub name: String, - - /// If this flag is not given, the bucket won't be deleted - #[structopt(long = "yes")] - pub yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct PermBucketOpt { - /// Access key name or ID - #[structopt(long = "key")] - pub key_pattern: String, - - /// Allow/deny read operations - #[structopt(long = "read")] - pub read: bool, - - /// Allow/deny write operations - #[structopt(long = "write")] - pub write: bool, - - /// Bucket name - pub bucket: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub enum KeyOperation { - /// List keys - #[structopt(name = "list")] - List, - - /// Get key info - #[structopt(name = "info")] - Info(KeyOpt), - - /// Create new key - #[structopt(name = "new")] - New(KeyNewOpt), - - /// Rename key - #[structopt(name = "rename")] - Rename(KeyRenameOpt), - - /// Delete key - #[structopt(name = "delete")] - Delete(KeyDeleteOpt), - - /// Import key - #[structopt(name = "import")] - Import(KeyImportOpt), -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyOpt { - /// ID or name of the key - pub key_pattern: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyNewOpt { - /// Name of the key - #[structopt(long = "name", default_value = "Unnamed key")] - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyRenameOpt { - /// ID or name of the key - pub key_pattern: String, - - /// New name of the key - pub new_name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyDeleteOpt { - /// ID or name of the key - pub key_pattern: String, - - /// Confirm deletion - #[structopt(long = "yes")] - pub yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyImportOpt { - /// Access key ID - pub key_id: String, - - /// Secret access key - pub secret_key: String, - - /// Key name - #[structopt(short = "n", default_value = "Imported key")] - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct RepairOpt { - /// Launch repair operation on all nodes - #[structopt(short = "a", long = "all-nodes")] - pub all_nodes: bool, - - /// Confirm the launch of the repair operation - #[structopt(long = "yes")] - pub yes: bool, - - #[structopt(subcommand)] - pub what: Option, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum RepairWhat { - /// Only do a full sync of metadata tables - #[structopt(name = "tables")] - Tables, - /// Only repair (resync/rebalance) the set of stored blocks - #[structopt(name = "blocks")] - Blocks, - /// Only redo the propagation of object deletions to the version table (slow) - #[structopt(name = "versions")] - Versions, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) - #[structopt(name = "block_refs")] - BlockRefs, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct StatsOpt { - /// Gather statistics from all nodes - #[structopt(short = "a", long = "all-nodes")] - pub all_nodes: bool, - - /// Gather detailed statistics (this can be long) - #[structopt(short = "d", long = "detailed")] - pub detailed: bool, -} - -pub async fn cli_cmd( - cmd: Command, - system_rpc_endpoint: &Endpoint, - admin_rpc_endpoint: &Endpoint, - rpc_host: NodeID, -) -> Result<(), Error> { - match cmd { - Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, - Command::Node(NodeOperation::Connect(connect_opt)) => { - cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await - } - Command::Node(NodeOperation::Configure(configure_opt)) => { - cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await - } - Command::Node(NodeOperation::Remove(remove_opt)) => { - cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await - } - Command::Bucket(bo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await - } - Command::Key(ko) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await - } - Command::Repair(ro) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await - } - Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, - _ => unreachable!(), - } -} - -pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - let config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; - for adv in status.iter().filter(|adv| adv.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", - id = adv.id, - host = adv.status.hostname, - addr = adv.addr, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); - } else { - healthy_nodes.push(format!( - "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED", - id = adv.id, - h = adv.status.hostname, - addr = adv.addr, - )); - } - } - format_table(healthy_nodes); - - let status_keys = status.iter().map(|adv| adv.id).collect::>(); - let failure_case_1 = status.iter().any(|adv| !adv.is_up); - let failure_case_2 = config - .members - .iter() - .any(|(id, _)| !status_keys.contains(id)); - if failure_case_1 || failure_case_2 { - println!("\n==== FAILED NODES ===="); - let mut failed_nodes = - vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; - for adv in status.iter().filter(|adv| !adv.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - failed_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago", - id = adv.id, - host = adv.status.hostname, - addr = adv.addr, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - last_seen = "FIXME", // FIXME was (now_msec() - adv.last_seen) / 1000, - )); - } - } - for (id, cfg) in config.members.iter() { - if !status.iter().any(|adv| adv.id == *id) { - failed_nodes.push(format!( - "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", - id = id, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); - } - } - format_table(failed_nodes); - } - - Ok(()) -} - -pub async fn cmd_connect( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: ConnectNodeOpt, -) -> Result<(), Error> { - match rpc_cli - .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) - .await?? - { - SystemRpc::Ok => { - println!("Success."); - Ok(()) - } - r => Err(Error::BadRpc(format!("Unexpected response: {:?}", r))), - } -} - -pub async fn cmd_configure( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: ConfigureNodeOpt, -) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; - - let mut config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - for replaced in args.replace.iter() { - let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; - if config.members.remove(&replaced_node).is_none() { - return Err(Error::Message(format!( - "Cannot replace node {:?} as it is not in current configuration", - replaced_node - ))); - } - } - - if args.capacity.is_some() && args.gateway { - return Err(Error::Message( - "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); - } - if args.capacity == Some(0) { - return Err(Error::Message("Invalid capacity value: 0".into())); - } - - let new_entry = match config.members.get(&added_node) { - None => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - _ => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NetworkConfigEntry { - zone: args.zone.expect("Please specifiy a zone with the -z flag"), - capacity, - tag: args.tag.unwrap_or_default(), - } - } - Some(old) => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - _ => old.capacity, - }; - NetworkConfigEntry { - zone: args.zone.unwrap_or_else(|| old.zone.to_string()), - capacity, - tag: args.tag.unwrap_or_else(|| old.tag.to_string()), - } - } - }; - - config.members.insert(added_node, new_entry); - config.version += 1; - - rpc_cli - .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await??; - Ok(()) -} - -pub async fn cmd_remove( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: RemoveNodeOpt, -) -> Result<(), Error> { - let mut config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?; - - if !args.yes { - return Err(Error::Message(format!( - "Add the flag --yes to really remove {:?} from the cluster", - deleted_node - ))); - } - - config.members.remove(&deleted_node); - config.version += 1; - - rpc_cli - .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await??; - Ok(()) -} - -pub async fn cmd_admin( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: AdminRpc, -) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { - AdminRpc::Ok(msg) => { - println!("{}", msg); - } - AdminRpc::BucketList(bl) => { - println!("List of buckets:"); - for bucket in bl { - println!("{}", bucket); - } - } - AdminRpc::BucketInfo(bucket) => { - print_bucket_info(&bucket); - } - AdminRpc::KeyList(kl) => { - println!("List of keys:"); - for key in kl { - println!("{}\t{}", key.0, key.1); - } - } - AdminRpc::KeyInfo(key) => { - print_key_info(&key); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} - -// --- Utility functions ---- - -fn print_key_info(key: &Key) { - println!("Key name: {}", key.name.get()); - println!("Key ID: {}", key.key_id); - println!("Secret key: {}", key.secret_key); - if key.deleted.get() { - println!("Key is deleted."); - } else { - println!("Authorized buckets:"); - for (b, _, perm) in key.authorized_buckets.items().iter() { - println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write); - } - } -} - -fn print_bucket_info(bucket: &Bucket) { - println!("Bucket name: {}", bucket.name); - match bucket.state.get() { - BucketState::Deleted => println!("Bucket is deleted."), - BucketState::Present(p) => { - println!("Authorized keys:"); - for (k, _, perm) in p.authorized_keys.items().iter() { - println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write); - } - println!("Website access: {}", p.website.get()); - } - }; -} - -fn format_table(data: Vec) { - let data = data - .iter() - .map(|s| s.split('\t').collect::>()) - .collect::>(); - - let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); - let mut column_size = vec![0; columns]; - - let mut out = String::new(); - - for row in data.iter() { - for (i, col) in row.iter().enumerate() { - column_size[i] = std::cmp::max(column_size[i], col.chars().count()); - } - } - - for row in data.iter() { - for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { - out.push_str(col); - (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); - } - out.push_str(&row[row.len() - 1]); - out.push('\n'); - } - - print!("{}", out); -} - -pub fn find_matching_node( - cand: impl std::iter::Iterator, - pattern: &str, -) -> Result { - let mut candidates = vec![]; - for c in cand { - if hex::encode(&c).starts_with(&pattern) { - candidates.push(c); - } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0]) - } -} diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs new file mode 100644 index 00000000..c386b141 --- /dev/null +++ b/src/garage/cli/cmd.rs @@ -0,0 +1,284 @@ +use std::collections::HashSet; + +use garage_util::error::*; + +use garage_rpc::ring::*; +use garage_rpc::system::*; +use garage_rpc::*; + +use crate::admin::*; +use crate::cli::*; + +pub async fn cli_command_dispatch( + cmd: Command, + system_rpc_endpoint: &Endpoint, + admin_rpc_endpoint: &Endpoint, + rpc_host: NodeID, +) -> Result<(), Error> { + match cmd { + Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await + } + Command::Node(NodeOperation::Configure(configure_opt)) => { + cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await + } + Command::Node(NodeOperation::Remove(remove_opt)) => { + cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await + } + Command::Bucket(bo) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await + } + Command::Key(ko) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await + } + Command::Repair(ro) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await + } + Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, + _ => unreachable!(), + } +} + +pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + let config = match rpc_cli + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + println!("==== HEALTHY NODES ===="); + let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; + for adv in status.iter().filter(|adv| adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + )); + } else { + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED", + id = adv.id, + h = adv.status.hostname, + addr = adv.addr, + )); + } + } + format_table(healthy_nodes); + + let status_keys = status.iter().map(|adv| adv.id).collect::>(); + let failure_case_1 = status.iter().any(|adv| !adv.is_up); + let failure_case_2 = config + .members + .iter() + .any(|(id, _)| !status_keys.contains(id)); + if failure_case_1 || failure_case_2 { + println!("\n==== FAILED NODES ===="); + let mut failed_nodes = + vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; + for adv in status.iter().filter(|adv| !adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + failed_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + last_seen = "FIXME", // FIXME was (now_msec() - adv.last_seen) / 1000, + )); + } + } + for (id, cfg) in config.members.iter() { + if !status_keys.contains(id) { + failed_nodes.push(format!( + "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", + id = id, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + )); + } + } + format_table(failed_nodes); + } + + Ok(()) +} + +pub async fn cmd_connect( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: ConnectNodeOpt, +) -> Result<(), Error> { + match rpc_cli + .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) + .await?? + { + SystemRpc::Ok => { + println!("Success."); + Ok(()) + } + r => Err(Error::BadRpc(format!("Unexpected response: {:?}", r))), + } +} + +pub async fn cmd_configure( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: ConfigureNodeOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; + + let mut config = match rpc_cli + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + for replaced in args.replace.iter() { + let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; + if config.members.remove(&replaced_node).is_none() { + return Err(Error::Message(format!( + "Cannot replace node {:?} as it is not in current configuration", + replaced_node + ))); + } + } + + if args.capacity.is_some() && args.gateway { + return Err(Error::Message( + "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); + } + if args.capacity == Some(0) { + return Err(Error::Message("Invalid capacity value: 0".into())); + } + + let new_entry = match config.members.get(&added_node) { + None => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + _ => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NetworkConfigEntry { + zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, + capacity, + tag: args.tag.unwrap_or_default(), + } + } + Some(old) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + _ => old.capacity, + }; + NetworkConfigEntry { + zone: args.zone.unwrap_or_else(|| old.zone.to_string()), + capacity, + tag: args.tag.unwrap_or_else(|| old.tag.to_string()), + } + } + }; + + config.members.insert(added_node, new_entry); + config.version += 1; + + rpc_cli + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await??; + Ok(()) +} + +pub async fn cmd_remove( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: RemoveNodeOpt, +) -> Result<(), Error> { + let mut config = match rpc_cli + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?; + + if !args.yes { + return Err(Error::Message(format!( + "Add the flag --yes to really remove {:?} from the cluster", + deleted_node + ))); + } + + config.members.remove(&deleted_node); + config.version += 1; + + rpc_cli + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await??; + Ok(()) +} + +pub async fn cmd_admin( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: AdminRpc, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { + AdminRpc::Ok(msg) => { + println!("{}", msg); + } + AdminRpc::BucketList(bl) => { + println!("List of buckets:"); + for bucket in bl { + println!("{}", bucket); + } + } + AdminRpc::BucketInfo(bucket) => { + print_bucket_info(&bucket); + } + AdminRpc::KeyList(kl) => { + println!("List of keys:"); + for key in kl { + println!("{}\t{}", key.0, key.1); + } + } + AdminRpc::KeyInfo(key) => { + print_key_info(&key); + } + r => { + error!("Unexpected response: {:?}", r); + } + } + Ok(()) +} + +// --- Utility functions ---- diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs new file mode 100644 index 00000000..9eda085f --- /dev/null +++ b/src/garage/cli/init.rs @@ -0,0 +1,65 @@ +use std::path::PathBuf; + +use garage_util::error::*; + +pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)"; + +pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { + let config = garage_util::config::read_config(config_file.clone()).err_context(format!( + "Unable to read configuration file {}", + config_file.to_string_lossy(), + ))?; + + let node_id = + garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?; + + let idstr = if let Some(addr) = config.rpc_public_addr { + let idstr = format!("{}@{}", hex::encode(&node_id), addr); + println!("{}", idstr); + idstr + } else { + let idstr = hex::encode(&node_id); + println!("{}", idstr); + + if !quiet { + eprintln!("WARNING: I don't know the public address to reach this node."); + eprintln!("In all of the instructions below, replace 127.0.0.1:3901 by the appropriate address and port."); + } + + format!("{}@127.0.0.1:3901", idstr) + }; + + if !quiet { + eprintln!(); + eprintln!( + "To instruct a node to connect to this node, run the following command on that node:" + ); + eprintln!(" garage [-c ] node connect {}", idstr); + eprintln!(); + eprintln!("Or instruct them to connect from here by running:"); + eprintln!( + " garage -c {} -h node connect {}", + config_file.to_string_lossy(), + idstr + ); + eprintln!( + "where is their own node identifier in the format: @:" + ); + eprintln!(); + eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:"); + eprintln!(" bootstrap_peers = ["); + eprintln!(" \"{}\",", idstr); + eprintln!(" ..."); + eprintln!(" ]"); + eprintln!(); + eprintln!( + r#"Security notice: Garage's intra-cluster communications are secured primarily by the shared +secret value rpc_secret. However, an attacker that knows rpc_secret (for example if it +leaks) cannot connect if they do not know any of the identifiers of the nodes in the +cluster. It is thus a good security measure to try to keep them secret if possible. + "# + ); + } + + Ok(()) +} diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs new file mode 100644 index 00000000..1567f377 --- /dev/null +++ b/src/garage/cli/mod.rs @@ -0,0 +1,9 @@ +pub(crate) mod cmd; +pub(crate) mod init; +pub(crate) mod structs; +pub(crate) mod util; + +pub(crate) use cmd::*; +pub(crate) use init::*; +pub(crate) use structs::*; +pub(crate) use util::*; diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs new file mode 100644 index 00000000..f134cd49 --- /dev/null +++ b/src/garage/cli/structs.rs @@ -0,0 +1,296 @@ +use serde::{Deserialize, Serialize}; + +use structopt::StructOpt; + +#[derive(StructOpt, Debug)] +pub enum Command { + /// Run Garage server + #[structopt(name = "server")] + Server, + + /// Print identifier (public key) of this garage node. + /// Generates a new keypair if necessary. + #[structopt(name = "node-id")] + NodeId(NodeIdOpt), + + /// Get network status + #[structopt(name = "status")] + Status, + + /// Garage node operations + #[structopt(name = "node")] + Node(NodeOperation), + + /// Bucket operations + #[structopt(name = "bucket")] + Bucket(BucketOperation), + + /// Key operations + #[structopt(name = "key")] + Key(KeyOperation), + + /// Start repair of node data + #[structopt(name = "repair")] + Repair(RepairOpt), + + /// Gather node statistics + #[structopt(name = "stats")] + Stats(StatsOpt), +} + +#[derive(StructOpt, Debug)] +pub enum NodeOperation { + /// Connect to Garage node that is currently isolated from the system + #[structopt(name = "connect")] + Connect(ConnectNodeOpt), + + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureNodeOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveNodeOpt), +} + +#[derive(StructOpt, Debug)] +pub struct NodeIdOpt { + /// Do not print usage instructions to stderr + #[structopt(short = "q", long = "quiet")] + pub(crate) quiet: bool, +} + +#[derive(StructOpt, Debug)] +pub struct ConnectNodeOpt { + /// Node public key and address, in the format: + /// `@:` + pub(crate) node: String, +} + +#[derive(StructOpt, Debug)] +pub struct ConfigureNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + pub(crate) node_id: String, + + /// Location (zone or datacenter) of the node + #[structopt(short = "z", long = "zone")] + pub(crate) zone: Option, + + /// Capacity (in relative terms, use 1 to represent your smallest server) + #[structopt(short = "c", long = "capacity")] + pub(crate) capacity: Option, + + /// Gateway-only node + #[structopt(short = "g", long = "gateway")] + pub(crate) gateway: bool, + + /// Optional node tag + #[structopt(short = "t", long = "tag")] + pub(crate) tag: Option, + + /// Replaced node(s): list of node IDs that will be removed from the current cluster + #[structopt(long = "replace")] + pub(crate) replace: Vec, +} + +#[derive(StructOpt, Debug)] +pub struct RemoveNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + pub(crate) node_id: String, + + /// If this flag is not given, the node won't be removed + #[structopt(long = "yes")] + pub(crate) yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum BucketOperation { + /// List buckets + #[structopt(name = "list")] + List, + + /// Get bucket info + #[structopt(name = "info")] + Info(BucketOpt), + + /// Create bucket + #[structopt(name = "create")] + Create(BucketOpt), + + /// Delete bucket + #[structopt(name = "delete")] + Delete(DeleteBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "allow")] + Allow(PermBucketOpt), + + /// Deny key from reading or writing to bucket + #[structopt(name = "deny")] + Deny(PermBucketOpt), + + /// Expose as website or not + #[structopt(name = "website")] + Website(WebsiteOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct WebsiteOpt { + /// Create + #[structopt(long = "allow")] + pub allow: bool, + + /// Delete + #[structopt(long = "deny")] + pub deny: bool, + + /// Bucket name + pub bucket: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct BucketOpt { + /// Bucket name + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct DeleteBucketOpt { + /// Bucket name + pub name: String, + + /// If this flag is not given, the bucket won't be deleted + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct PermBucketOpt { + /// Access key name or ID + #[structopt(long = "key")] + pub key_pattern: String, + + /// Allow/deny read operations + #[structopt(long = "read")] + pub read: bool, + + /// Allow/deny write operations + #[structopt(long = "write")] + pub write: bool, + + /// Bucket name + pub bucket: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum KeyOperation { + /// List keys + #[structopt(name = "list")] + List, + + /// Get key info + #[structopt(name = "info")] + Info(KeyOpt), + + /// Create new key + #[structopt(name = "new")] + New(KeyNewOpt), + + /// Rename key + #[structopt(name = "rename")] + Rename(KeyRenameOpt), + + /// Delete key + #[structopt(name = "delete")] + Delete(KeyDeleteOpt), + + /// Import key + #[structopt(name = "import")] + Import(KeyImportOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyOpt { + /// ID or name of the key + pub key_pattern: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyNewOpt { + /// Name of the key + #[structopt(long = "name", default_value = "Unnamed key")] + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyRenameOpt { + /// ID or name of the key + pub key_pattern: String, + + /// New name of the key + pub new_name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyDeleteOpt { + /// ID or name of the key + pub key_pattern: String, + + /// Confirm deletion + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyImportOpt { + /// Access key ID + pub key_id: String, + + /// Secret access key + pub secret_key: String, + + /// Key name + #[structopt(short = "n", default_value = "Imported key")] + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct RepairOpt { + /// Launch repair operation on all nodes + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: Option, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum RepairWhat { + /// Only do a full sync of metadata tables + #[structopt(name = "tables")] + Tables, + /// Only repair (resync/rebalance) the set of stored blocks + #[structopt(name = "blocks")] + Blocks, + /// Only redo the propagation of object deletions to the version table (slow) + #[structopt(name = "versions")] + Versions, + /// Only redo the propagation of version deletions to the block ref table (extremely slow) + #[structopt(name = "block_refs")] + BlockRefs, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct StatsOpt { + /// Gather statistics from all nodes + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Gather detailed statistics (this can be long) + #[structopt(short = "d", long = "detailed")] + pub detailed: bool, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs new file mode 100644 index 00000000..28b4d8ea --- /dev/null +++ b/src/garage/cli/util.rs @@ -0,0 +1,83 @@ +use garage_util::data::Uuid; +use garage_util::error::*; + +use garage_model::bucket_table::*; +use garage_model::key_table::*; + +pub fn print_key_info(key: &Key) { + println!("Key name: {}", key.name.get()); + println!("Key ID: {}", key.key_id); + println!("Secret key: {}", key.secret_key); + if key.deleted.get() { + println!("Key is deleted."); + } else { + println!("Authorized buckets:"); + for (b, _, perm) in key.authorized_buckets.items().iter() { + println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write); + } + } +} + +pub fn print_bucket_info(bucket: &Bucket) { + println!("Bucket name: {}", bucket.name); + match bucket.state.get() { + BucketState::Deleted => println!("Bucket is deleted."), + BucketState::Present(p) => { + println!("Authorized keys:"); + for (k, _, perm) in p.authorized_keys.items().iter() { + println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write); + } + println!("Website access: {}", p.website.get()); + } + }; +} + +pub fn format_table(data: Vec) { + let data = data + .iter() + .map(|s| s.split('\t').collect::>()) + .collect::>(); + + let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); + let mut column_size = vec![0; columns]; + + let mut out = String::new(); + + for row in data.iter() { + for (i, col) in row.iter().enumerate() { + column_size[i] = std::cmp::max(column_size[i], col.chars().count()); + } + } + + for row in data.iter() { + for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { + out.push_str(col); + (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); + } + out.push_str(&row[row.len() - 1]); + out.push('\n'); + } + + print!("{}", out); +} + +pub fn find_matching_node( + cand: impl std::iter::Iterator, + pattern: &str, +) -> Result { + let mut candidates = vec![]; + for c in cand { + if hex::encode(&c).starts_with(&pattern) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0]) + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index 57767df1..7f7196d9 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -4,7 +4,7 @@ #[macro_use] extern crate log; -mod admin_rpc; +mod admin; mod cli; mod repair; mod server; @@ -16,12 +16,12 @@ use structopt::StructOpt; use netapp::util::parse_and_resolve_peer_addr; use netapp::NetworkKey; -use garage_util::error::Error; +use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use admin_rpc::*; +use admin::*; use cli::*; #[derive(StructOpt, Debug)] @@ -66,7 +66,7 @@ async fn main() { }; if let Err(e) = res { - error!("{}", e); + eprintln!("Error: {}", e); std::process::exit(1); } } @@ -74,7 +74,7 @@ async fn main() { async fn cli_command(opt: Opt) -> Result<(), Error> { let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() { Some(garage_util::config::read_config(opt.config_file.clone()) - .map_err(|e| Error::Message(format!("Unable to read configuration file {}: {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy(), e)))?) + .err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?) } else { None }; @@ -84,11 +84,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { .rpc_secret .as_ref() .or_else(|| config.as_ref().map(|c| &c.rpc_secret)) - .expect("No RPC secret provided"); + .ok_or("No RPC secret provided")?; let network_key = NetworkKey::from_slice( - &hex::decode(net_key_hex_str).expect("Invalid RPC secret key (bad hex)")[..], + &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], ) - .expect("Invalid RPC secret provided (wrong length)"); + .ok_or("Invalid RPC secret provided (wrong length)")?; // Generate a temporary keypair for our RPC client let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); @@ -97,77 +97,22 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { // Find and parse the address of the target host let (id, addr) = if let Some(h) = opt.rpc_host { - let (id, addrs) = parse_and_resolve_peer_addr(&h).expect("Invalid RPC host"); + let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is @:.", h))?; (id, addrs[0]) } else if let Some(a) = config.as_ref().map(|c| c.rpc_public_addr).flatten() { - let node_key = garage_rpc::system::gen_node_key(&config.unwrap().metadata_dir) - .map_err(|e| Error::Message(format!("Unable to read or generate node key: {}", e)))?; - (node_key.public_key(), a) + let node_id = garage_rpc::system::read_node_id(&config.unwrap().metadata_dir) + .err_context(READ_KEY_ERROR)?; + (node_id, a) } else { return Err(Error::Message("No RPC host provided".into())); }; // Connect to target host - netapp.clone().try_connect(addr, id).await?; + netapp.clone().try_connect(addr, id).await + .err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; let system_rpc_endpoint = netapp.endpoint::(SYSTEM_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::(ADMIN_RPC_PATH.into()); - cli_cmd(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await -} - -fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { - let config = garage_util::config::read_config(config_file.clone()).map_err(|e| { - Error::Message(format!( - "Unable to read configuration file {}: {}", - config_file.to_string_lossy(), - e - )) - })?; - - let node_key = garage_rpc::system::gen_node_key(&config.metadata_dir) - .map_err(|e| Error::Message(format!("Unable to read or generate node key: {}", e)))?; - - let idstr = if let Some(addr) = config.rpc_public_addr { - let idstr = format!("{}@{}", hex::encode(&node_key.public_key()), addr); - println!("{}", idstr); - idstr - } else { - let idstr = hex::encode(&node_key.public_key()); - println!("{}", idstr); - - if !quiet { - eprintln!("WARNING: I don't know the public address to reach this node."); - eprintln!("In all of the instructions below, replace 127.0.0.1:3901 by the appropriate address and port."); - } - - format!("{}@127.0.0.1:3901", idstr) - }; - - if !quiet { - eprintln!(); - eprintln!( - "To instruct a node to connect to this node, run the following command on that node:" - ); - eprintln!(" garage [-c ] node connect {}", idstr); - eprintln!(); - eprintln!("Or instruct them to connect from here by running:"); - eprintln!( - " garage -c {} -h node connect {}", - config_file.to_string_lossy(), - idstr - ); - eprintln!( - "where is their own node identifier in the format: @:" - ); - eprintln!(); - eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:"); - eprintln!(" bootstrap_peers = ["); - eprintln!(" \"{}\",", idstr); - eprintln!(" ..."); - eprintln!(" ]"); - eprintln!(); - } - - Ok(()) + cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await } diff --git a/src/garage/server.rs b/src/garage/server.rs index cd92d157..f4d62e91 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -10,7 +10,7 @@ use garage_api::run_api_server; use garage_model::garage::Garage; use garage_web::run_web_server; -use crate::admin_rpc::*; +use crate::admin::*; async fn wait_from(mut chan: watch::Receiver) { while !*chan.borrow() { diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 90b3cf15..7413508e 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -203,7 +203,7 @@ impl RpcHelper { Ok(results) } else { let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::TooManyErrors(errors)) + Err(Error::Quorum(results.len(), to.len(), errors)) } } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 3031063d..a62ee6f5 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -24,7 +24,7 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; use garage_util::config::Config; use garage_util::data::Uuid; -use garage_util::error::Error; +use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; @@ -39,6 +39,8 @@ const PING_TIMEOUT: Duration = Duration::from_secs(2); /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; +pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret"; + /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] pub enum SystemRpc { @@ -113,6 +115,22 @@ pub struct KnownNodeInfo { pub status: NodeStatus, } +pub fn read_node_id(metadata_dir: &Path) -> Result { + let mut pubkey_file = metadata_dir.to_path_buf(); + pubkey_file.push("node_key.pub"); + + let mut f = std::fs::File::open(pubkey_file.as_path())?; + let mut d = vec![]; + f.read_to_end(&mut d)?; + if d.len() != 32 { + return Err(Error::Message("Corrupt node_key.pub file".to_string())); + } + + let mut key = [0u8; 32]; + key.copy_from_slice(&d[..]); + Ok(NodeID::from_slice(&key[..]).unwrap()) +} + pub fn gen_node_key(metadata_dir: &Path) -> Result { let mut key_file = metadata_dir.to_path_buf(); key_file.push("node_key"); @@ -128,10 +146,30 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result { key.copy_from_slice(&d[..]); Ok(NodeKey::from_slice(&key[..]).unwrap()) } else { - let (_, key) = ed25519::gen_keypair(); + if !metadata_dir.exists() { + info!("Metadata directory does not exist, creating it."); + std::fs::create_dir(&metadata_dir)?; + } + + info!("Generating new node key pair."); + let (pubkey, key) = ed25519::gen_keypair(); + + { + use std::os::unix::fs::PermissionsExt; + let mut f = std::fs::File::create(key_file.as_path())?; + let mut perm = f.metadata()?.permissions(); + perm.set_mode(0o600); + std::fs::set_permissions(key_file.as_path(), perm)?; + f.write_all(&key[..])?; + } + + { + let mut pubkey_file = metadata_dir.to_path_buf(); + pubkey_file.push("node_key.pub"); + let mut f2 = std::fs::File::create(pubkey_file.as_path())?; + f2.write_all(&pubkey[..])?; + } - let mut f = std::fs::File::create(key_file.as_path())?; - f.write_all(&key[..])?; Ok(key) } } @@ -252,7 +290,7 @@ impl System { rpc_public_addr, ) .await - .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e))) + .err_context("Error while publishing Consul service") } /// Save network configuration to disc @@ -282,7 +320,13 @@ impl System { })?; let mut errors = vec![]; for ip in addrs.iter() { - match self.netapp.clone().try_connect(*ip, pubkey).await { + match self + .netapp + .clone() + .try_connect(*ip, pubkey) + .await + .err_context(CONNECT_ERROR_MESSAGE) + { Ok(()) => return Ok(SystemRpc::Ok), Err(e) => { errors.push((*ip, e)); @@ -445,7 +489,12 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id)); + tokio::spawn( + self.netapp + .clone() + .try_connect(node_addr, node_id) + .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)), + ); } } diff --git a/src/util/error.rs b/src/util/error.rs index 390327f1..c8d3c680 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -47,8 +47,13 @@ pub enum Error { #[error(display = "Timeout")] Timeout, - #[error(display = "Too many errors: {:?}", _0)] - TooManyErrors(Vec), + #[error( + display = "Could not reach quorum. {} of {} request succeeded, others returned errors: {:?}", + _0, + _1, + _2 + )] + Quorum(usize, usize, Vec), #[error(display = "Bad RPC: {}", _0)] BadRpc(String), @@ -81,6 +86,39 @@ impl From> for Error { } } +impl<'a> From<&'a str> for Error { + fn from(v: &'a str) -> Error { + Error::Message(v.to_string()) + } +} + +impl From for Error { + fn from(v: String) -> Error { + Error::Message(v) + } +} + +pub trait ErrorContext { + fn err_context>(self, ctx: C) -> Result; +} + +impl ErrorContext for Result +where + E: std::fmt::Display, +{ + #[inline] + fn err_context>(self, ctx: C) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(Error::Message(format!( + "{}\nOriginal error: {}", + ctx.borrow(), + e + ))), + } + } +} + // Custom serialization for our error type, for use in RPC. // Errors are serialized as a string of their Display representation. // Upon deserialization, they all become a RemoteError with the diff --git a/src/web/error.rs b/src/web/error.rs index 5ac27914..2ed7139f 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -39,7 +39,7 @@ impl Error { Error::NotFound => StatusCode::NOT_FOUND, Error::ApiError(e) => e.http_status_code(), Error::InternalError( - GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(_, _, _), ) => StatusCode::SERVICE_UNAVAILABLE, Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::BAD_REQUEST, -- cgit v1.2.3