diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin.rs (renamed from src/garage/admin_rpc.rs) | 6 | ||||
-rw-r--r-- | src/garage/cli.rs | 657 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 287 | ||||
-rw-r--r-- | src/garage/cli/init.rs | 65 | ||||
-rw-r--r-- | src/garage/cli/mod.rs | 9 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 296 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 83 | ||||
-rw-r--r-- | src/garage/main.rs | 80 | ||||
-rw-r--r-- | src/garage/server.rs | 2 |
9 files changed, 800 insertions, 685 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin.rs index 339d5bdb..b5fc9a7e 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin.rs @@ -349,11 +349,7 @@ impl AdminRpcHandler { PRIO_NORMAL, ) .await; - let is_err = match resp { - Ok(Ok(_)) => false, - _ => true, - }; - if is_err { + if !matches!(resp, Ok(Ok(_))) { failures.push(node); } } diff --git a/src/garage/cli.rs b/src/garage/cli.rs deleted file mode 100644 index 940a5a85..00000000 --- a/src/garage/cli.rs +++ /dev/null @@ -1,657 +0,0 @@ -use std::collections::HashSet; -use std::path::PathBuf; - -use serde::{Deserialize, Serialize}; -use structopt::StructOpt; - -use garage_util::data::Uuid; -use garage_util::error::Error; -use garage_util::time::*; - -use garage_rpc::ring::*; -use garage_rpc::system::*; -use garage_rpc::*; - -use garage_model::bucket_table::*; -use garage_model::key_table::*; - -use crate::admin_rpc::*; - -#[derive(StructOpt, Debug)] -pub enum Command { - /// Run Garage server - #[structopt(name = "server")] - Server(ServerOpt), - - /// Get network status - #[structopt(name = "status")] - Status, - - /// Garage node operations - #[structopt(name = "node")] - Node(NodeOperation), - - /// Bucket operations - #[structopt(name = "bucket")] - Bucket(BucketOperation), - - /// Key operations - #[structopt(name = "key")] - Key(KeyOperation), - - /// Start repair of node data - #[structopt(name = "repair")] - Repair(RepairOpt), - - /// Gather node statistics - #[structopt(name = "stats")] - Stats(StatsOpt), -} - -#[derive(StructOpt, Debug)] -pub struct ServerOpt { - /// Configuration file - #[structopt(short = "c", long = "config", default_value = "./config.toml")] - pub config_file: PathBuf, -} - -#[derive(StructOpt, Debug)] -pub enum NodeOperation { - /// Connect to Garage node that is currently isolated from the system - #[structopt(name = "connect")] - Connect(ConnectNodeOpt), - - /// Configure Garage node - #[structopt(name = "configure")] - Configure(ConfigureNodeOpt), - - /// Remove Garage node from cluster - #[structopt(name = "remove")] - Remove(RemoveNodeOpt), -} - -#[derive(StructOpt, Debug)] -pub struct ConnectNodeOpt { - /// Node public key and address, in the format: - /// `<public key hexadecimal>@<ip or hostname>:<port>` - node: String, -} - -#[derive(StructOpt, Debug)] -pub struct ConfigureNodeOpt { - /// Node to configure (prefix of hexadecimal node id) - node_id: String, - - /// Location (zone or datacenter) of the node - #[structopt(short = "z", long = "zone")] - zone: Option<String>, - - /// Capacity (in relative terms, use 1 to represent your smallest server) - #[structopt(short = "c", long = "capacity")] - capacity: Option<u32>, - - /// Gateway-only node - #[structopt(short = "g", long = "gateway")] - gateway: bool, - - /// Optional node tag - #[structopt(short = "t", long = "tag")] - tag: Option<String>, - - /// Replaced node(s): list of node IDs that will be removed from the current cluster - #[structopt(long = "replace")] - replace: Vec<String>, -} - -#[derive(StructOpt, Debug)] -pub struct RemoveNodeOpt { - /// Node to configure (prefix of hexadecimal node id) - node_id: String, - - /// If this flag is not given, the node won't be removed - #[structopt(long = "yes")] - yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub enum BucketOperation { - /// List buckets - #[structopt(name = "list")] - List, - - /// Get bucket info - #[structopt(name = "info")] - Info(BucketOpt), - - /// Create bucket - #[structopt(name = "create")] - Create(BucketOpt), - - /// Delete bucket - #[structopt(name = "delete")] - Delete(DeleteBucketOpt), - - /// Allow key to read or write to bucket - #[structopt(name = "allow")] - Allow(PermBucketOpt), - - /// Deny key from reading or writing to bucket - #[structopt(name = "deny")] - Deny(PermBucketOpt), - - /// Expose as website or not - #[structopt(name = "website")] - Website(WebsiteOpt), -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct WebsiteOpt { - /// Create - #[structopt(long = "allow")] - pub allow: bool, - - /// Delete - #[structopt(long = "deny")] - pub deny: bool, - - /// Bucket name - pub bucket: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct BucketOpt { - /// Bucket name - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct DeleteBucketOpt { - /// Bucket name - pub name: String, - - /// If this flag is not given, the bucket won't be deleted - #[structopt(long = "yes")] - pub yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct PermBucketOpt { - /// Access key name or ID - #[structopt(long = "key")] - pub key_pattern: String, - - /// Allow/deny read operations - #[structopt(long = "read")] - pub read: bool, - - /// Allow/deny write operations - #[structopt(long = "write")] - pub write: bool, - - /// Bucket name - pub bucket: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub enum KeyOperation { - /// List keys - #[structopt(name = "list")] - List, - - /// Get key info - #[structopt(name = "info")] - Info(KeyOpt), - - /// Create new key - #[structopt(name = "new")] - New(KeyNewOpt), - - /// Rename key - #[structopt(name = "rename")] - Rename(KeyRenameOpt), - - /// Delete key - #[structopt(name = "delete")] - Delete(KeyDeleteOpt), - - /// Import key - #[structopt(name = "import")] - Import(KeyImportOpt), -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyOpt { - /// ID or name of the key - pub key_pattern: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyNewOpt { - /// Name of the key - #[structopt(long = "name", default_value = "Unnamed key")] - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyRenameOpt { - /// ID or name of the key - pub key_pattern: String, - - /// New name of the key - pub new_name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyDeleteOpt { - /// ID or name of the key - pub key_pattern: String, - - /// Confirm deletion - #[structopt(long = "yes")] - pub yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyImportOpt { - /// Access key ID - pub key_id: String, - - /// Secret access key - pub secret_key: String, - - /// Key name - #[structopt(short = "n", default_value = "Imported key")] - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct RepairOpt { - /// Launch repair operation on all nodes - #[structopt(short = "a", long = "all-nodes")] - pub all_nodes: bool, - - /// Confirm the launch of the repair operation - #[structopt(long = "yes")] - pub yes: bool, - - #[structopt(subcommand)] - pub what: Option<RepairWhat>, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum RepairWhat { - /// Only do a full sync of metadata tables - #[structopt(name = "tables")] - Tables, - /// Only repair (resync/rebalance) the set of stored blocks - #[structopt(name = "blocks")] - Blocks, - /// Only redo the propagation of object deletions to the version table (slow) - #[structopt(name = "versions")] - Versions, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) - #[structopt(name = "block_refs")] - BlockRefs, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct StatsOpt { - /// Gather statistics from all nodes - #[structopt(short = "a", long = "all-nodes")] - pub all_nodes: bool, - - /// Gather detailed statistics (this can be long) - #[structopt(short = "d", long = "detailed")] - pub detailed: bool, -} - -pub async fn cli_cmd( - cmd: Command, - system_rpc_endpoint: &Endpoint<SystemRpc, ()>, - admin_rpc_endpoint: &Endpoint<AdminRpc, ()>, - rpc_host: NodeID, -) -> Result<(), Error> { - match cmd { - Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, - Command::Node(NodeOperation::Connect(connect_opt)) => { - cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await - } - Command::Node(NodeOperation::Configure(configure_opt)) => { - cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await - } - Command::Node(NodeOperation::Remove(remove_opt)) => { - cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await - } - Command::Bucket(bo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await - } - Command::Key(ko) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await - } - Command::Repair(ro) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await - } - Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, - _ => unreachable!(), - } -} - -pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - let config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - println!("Healthy nodes:"); - let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; - for adv in status.iter().filter(|adv| adv.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", - id = adv.id, - host = adv.status.hostname, - addr = adv.addr, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); - } else { - healthy_nodes.push(format!( - "{id:?}\t{h}\t{addr}\tUNCONFIGURED/REMOVED", - id = adv.id, - h = adv.status.hostname, - addr = adv.addr, - )); - } - } - format_table(healthy_nodes); - - let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|adv| !adv.is_up); - let failure_case_2 = config - .members - .iter() - .any(|(id, _)| !status_keys.contains(id)); - if failure_case_1 || failure_case_2 { - println!("\nFailed nodes:"); - let mut failed_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; - for adv in status.iter().filter(|adv| !adv.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - failed_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago", - id = adv.id, - host = adv.status.hostname, - addr = adv.addr, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - last_seen = (now_msec() - 0) / 1000, - )); - } - } - for (id, cfg) in config.members.iter() { - if !status.iter().any(|adv| adv.id == *id) { - failed_nodes.push(format!( - "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", - id = id, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); - } - } - format_table(failed_nodes); - } - - Ok(()) -} - -pub async fn cmd_connect( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - args: ConnectNodeOpt, -) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL).await?? { - SystemRpc::Ok => { - println!("Success."); - Ok(()) - } - r => { - Err(Error::BadRpc(format!("Unexpected response: {:?}", r))) - } - } -} - -pub async fn cmd_configure( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - args: ConfigureNodeOpt, -) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; - - let mut config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - for replaced in args.replace.iter() { - let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; - if config.members.remove(&replaced_node).is_none() { - return Err(Error::Message(format!( - "Cannot replace node {:?} as it is not in current configuration", - replaced_node - ))); - } - } - - if args.capacity.is_some() && args.gateway { - return Err(Error::Message( - "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); - } - if args.capacity == Some(0) { - return Err(Error::Message("Invalid capacity value: 0".into())); - } - - let new_entry = match config.members.get(&added_node) { - None => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - _ => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NetworkConfigEntry { - zone: args.zone.expect("Please specifiy a zone with the -z flag"), - capacity, - tag: args.tag.unwrap_or_default(), - } - } - Some(old) => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - _ => old.capacity, - }; - NetworkConfigEntry { - zone: args.zone.unwrap_or_else(|| old.zone.to_string()), - capacity, - tag: args.tag.unwrap_or_else(|| old.tag.to_string()), - } - } - }; - - config.members.insert(added_node, new_entry); - config.version += 1; - - rpc_cli - .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await??; - Ok(()) -} - -pub async fn cmd_remove( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, - args: RemoveNodeOpt, -) -> Result<(), Error> { - let mut config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?; - - if !args.yes { - return Err(Error::Message(format!( - "Add the flag --yes to really remove {:?} from the cluster", - deleted_node - ))); - } - - config.members.remove(&deleted_node); - config.version += 1; - - rpc_cli - .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await??; - Ok(()) -} - -pub async fn cmd_admin( - rpc_cli: &Endpoint<AdminRpc, ()>, - rpc_host: NodeID, - args: AdminRpc, -) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { - AdminRpc::Ok(msg) => { - println!("{}", msg); - } - AdminRpc::BucketList(bl) => { - println!("List of buckets:"); - for bucket in bl { - println!("{}", bucket); - } - } - AdminRpc::BucketInfo(bucket) => { - print_bucket_info(&bucket); - } - AdminRpc::KeyList(kl) => { - println!("List of keys:"); - for key in kl { - println!("{}\t{}", key.0, key.1); - } - } - AdminRpc::KeyInfo(key) => { - print_key_info(&key); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} - -// --- Utility functions ---- - -fn print_key_info(key: &Key) { - println!("Key name: {}", key.name.get()); - println!("Key ID: {}", key.key_id); - println!("Secret key: {}", key.secret_key); - if key.deleted.get() { - println!("Key is deleted."); - } else { - println!("Authorized buckets:"); - for (b, _, perm) in key.authorized_buckets.items().iter() { - println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write); - } - } -} - -fn print_bucket_info(bucket: &Bucket) { - println!("Bucket name: {}", bucket.name); - match bucket.state.get() { - BucketState::Deleted => println!("Bucket is deleted."), - BucketState::Present(p) => { - println!("Authorized keys:"); - for (k, _, perm) in p.authorized_keys.items().iter() { - println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write); - } - println!("Website access: {}", p.website.get()); - } - }; -} - -fn format_table(data: Vec<String>) { - let data = data - .iter() - .map(|s| s.split('\t').collect::<Vec<_>>()) - .collect::<Vec<_>>(); - - let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); - let mut column_size = vec![0; columns]; - - let mut out = String::new(); - - for row in data.iter() { - for (i, col) in row.iter().enumerate() { - column_size[i] = std::cmp::max(column_size[i], col.chars().count()); - } - } - - for row in data.iter() { - for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { - out.push_str(col); - (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); - } - out.push_str(&row[row.len() - 1]); - out.push('\n'); - } - - print!("{}", out); -} - -pub fn find_matching_node( - cand: impl std::iter::Iterator<Item = Uuid>, - pattern: &str, -) -> Result<Uuid, Error> { - let mut candidates = vec![]; - for c in cand { - if hex::encode(&c).starts_with(&pattern) { - candidates.push(c); - } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0]) - } -} - diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs new file mode 100644 index 00000000..2ff46088 --- /dev/null +++ b/src/garage/cli/cmd.rs @@ -0,0 +1,287 @@ +use std::collections::HashSet; + +use garage_util::error::*; + +use garage_rpc::ring::*; +use garage_rpc::system::*; +use garage_rpc::*; + +use crate::admin::*; +use crate::cli::*; + +pub async fn cli_command_dispatch( + cmd: Command, + system_rpc_endpoint: &Endpoint<SystemRpc, ()>, + admin_rpc_endpoint: &Endpoint<AdminRpc, ()>, + rpc_host: NodeID, +) -> Result<(), Error> { + match cmd { + Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await + } + Command::Node(NodeOperation::Configure(configure_opt)) => { + cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await + } + Command::Node(NodeOperation::Remove(remove_opt)) => { + cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await + } + Command::Bucket(bo) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await + } + Command::Key(ko) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await + } + Command::Repair(ro) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await + } + Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, + _ => unreachable!(), + } +} + +pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + let config = match rpc_cli + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + println!("==== HEALTHY NODES ===="); + let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; + for adv in status.iter().filter(|adv| adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + )); + } else { + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED", + id = adv.id, + h = adv.status.hostname, + addr = adv.addr, + )); + } + } + format_table(healthy_nodes); + + let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); + let failure_case_1 = status.iter().any(|adv| !adv.is_up); + let failure_case_2 = config + .members + .iter() + .any(|(id, _)| !status_keys.contains(id)); + if failure_case_1 || failure_case_2 { + println!("\n==== FAILED NODES ===="); + let mut failed_nodes = + vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; + for adv in status.iter().filter(|adv| !adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + failed_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + last_seen = adv + .last_seen_secs_ago + .map(|s| format!("{}s ago", s)) + .unwrap_or_else(|| "never seen".into()), + )); + } + } + for (id, cfg) in config.members.iter() { + if !status_keys.contains(id) { + failed_nodes.push(format!( + "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", + id = id, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + )); + } + } + format_table(failed_nodes); + } + + Ok(()) +} + +pub async fn cmd_connect( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: ConnectNodeOpt, +) -> Result<(), Error> { + match rpc_cli + .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) + .await?? + { + SystemRpc::Ok => { + println!("Success."); + Ok(()) + } + r => Err(Error::BadRpc(format!("Unexpected response: {:?}", r))), + } +} + +pub async fn cmd_configure( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: ConfigureNodeOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; + + let mut config = match rpc_cli + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + for replaced in args.replace.iter() { + let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; + if config.members.remove(&replaced_node).is_none() { + return Err(Error::Message(format!( + "Cannot replace node {:?} as it is not in current configuration", + replaced_node + ))); + } + } + + if args.capacity.is_some() && args.gateway { + return Err(Error::Message( + "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); + } + if args.capacity == Some(0) { + return Err(Error::Message("Invalid capacity value: 0".into())); + } + + let new_entry = match config.members.get(&added_node) { + None => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + _ => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NetworkConfigEntry { + zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, + capacity, + tag: args.tag.unwrap_or_default(), + } + } + Some(old) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + _ => old.capacity, + }; + NetworkConfigEntry { + zone: args.zone.unwrap_or_else(|| old.zone.to_string()), + capacity, + tag: args.tag.unwrap_or_else(|| old.tag.to_string()), + } + } + }; + + config.members.insert(added_node, new_entry); + config.version += 1; + + rpc_cli + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await??; + Ok(()) +} + +pub async fn cmd_remove( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: RemoveNodeOpt, +) -> Result<(), Error> { + let mut config = match rpc_cli + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?; + + if !args.yes { + return Err(Error::Message(format!( + "Add the flag --yes to really remove {:?} from the cluster", + deleted_node + ))); + } + + config.members.remove(&deleted_node); + config.version += 1; + + rpc_cli + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await??; + Ok(()) +} + +pub async fn cmd_admin( + rpc_cli: &Endpoint<AdminRpc, ()>, + rpc_host: NodeID, + args: AdminRpc, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { + AdminRpc::Ok(msg) => { + println!("{}", msg); + } + AdminRpc::BucketList(bl) => { + println!("List of buckets:"); + for bucket in bl { + println!("{}", bucket); + } + } + AdminRpc::BucketInfo(bucket) => { + print_bucket_info(&bucket); + } + AdminRpc::KeyList(kl) => { + println!("List of keys:"); + for key in kl { + println!("{}\t{}", key.0, key.1); + } + } + AdminRpc::KeyInfo(key) => { + print_key_info(&key); + } + r => { + error!("Unexpected response: {:?}", r); + } + } + Ok(()) +} + +// --- Utility functions ---- diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs new file mode 100644 index 00000000..9eda085f --- /dev/null +++ b/src/garage/cli/init.rs @@ -0,0 +1,65 @@ +use std::path::PathBuf; + +use garage_util::error::*; + +pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)"; + +pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { + let config = garage_util::config::read_config(config_file.clone()).err_context(format!( + "Unable to read configuration file {}", + config_file.to_string_lossy(), + ))?; + + let node_id = + garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?; + + let idstr = if let Some(addr) = config.rpc_public_addr { + let idstr = format!("{}@{}", hex::encode(&node_id), addr); + println!("{}", idstr); + idstr + } else { + let idstr = hex::encode(&node_id); + println!("{}", idstr); + + if !quiet { + eprintln!("WARNING: I don't know the public address to reach this node."); + eprintln!("In all of the instructions below, replace 127.0.0.1:3901 by the appropriate address and port."); + } + + format!("{}@127.0.0.1:3901", idstr) + }; + + if !quiet { + eprintln!(); + eprintln!( + "To instruct a node to connect to this node, run the following command on that node:" + ); + eprintln!(" garage [-c <config file path>] node connect {}", idstr); + eprintln!(); + eprintln!("Or instruct them to connect from here by running:"); + eprintln!( + " garage -c {} -h <remote node> node connect {}", + config_file.to_string_lossy(), + idstr + ); + eprintln!( + "where <remote_node> is their own node identifier in the format: <pubkey>@<ip>:<port>" + ); + eprintln!(); + eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:"); + eprintln!(" bootstrap_peers = ["); + eprintln!(" \"{}\",", idstr); + eprintln!(" ..."); + eprintln!(" ]"); + eprintln!(); + eprintln!( + r#"Security notice: Garage's intra-cluster communications are secured primarily by the shared +secret value rpc_secret. However, an attacker that knows rpc_secret (for example if it +leaks) cannot connect if they do not know any of the identifiers of the nodes in the +cluster. It is thus a good security measure to try to keep them secret if possible. + "# + ); + } + + Ok(()) +} diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs new file mode 100644 index 00000000..1567f377 --- /dev/null +++ b/src/garage/cli/mod.rs @@ -0,0 +1,9 @@ +pub(crate) mod cmd; +pub(crate) mod init; +pub(crate) mod structs; +pub(crate) mod util; + +pub(crate) use cmd::*; +pub(crate) use init::*; +pub(crate) use structs::*; +pub(crate) use util::*; diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs new file mode 100644 index 00000000..f134cd49 --- /dev/null +++ b/src/garage/cli/structs.rs @@ -0,0 +1,296 @@ +use serde::{Deserialize, Serialize}; + +use structopt::StructOpt; + +#[derive(StructOpt, Debug)] +pub enum Command { + /// Run Garage server + #[structopt(name = "server")] + Server, + + /// Print identifier (public key) of this garage node. + /// Generates a new keypair if necessary. + #[structopt(name = "node-id")] + NodeId(NodeIdOpt), + + /// Get network status + #[structopt(name = "status")] + Status, + + /// Garage node operations + #[structopt(name = "node")] + Node(NodeOperation), + + /// Bucket operations + #[structopt(name = "bucket")] + Bucket(BucketOperation), + + /// Key operations + #[structopt(name = "key")] + Key(KeyOperation), + + /// Start repair of node data + #[structopt(name = "repair")] + Repair(RepairOpt), + + /// Gather node statistics + #[structopt(name = "stats")] + Stats(StatsOpt), +} + +#[derive(StructOpt, Debug)] +pub enum NodeOperation { + /// Connect to Garage node that is currently isolated from the system + #[structopt(name = "connect")] + Connect(ConnectNodeOpt), + + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureNodeOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveNodeOpt), +} + +#[derive(StructOpt, Debug)] +pub struct NodeIdOpt { + /// Do not print usage instructions to stderr + #[structopt(short = "q", long = "quiet")] + pub(crate) quiet: bool, +} + +#[derive(StructOpt, Debug)] +pub struct ConnectNodeOpt { + /// Node public key and address, in the format: + /// `<public key hexadecimal>@<ip or hostname>:<port>` + pub(crate) node: String, +} + +#[derive(StructOpt, Debug)] +pub struct ConfigureNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + pub(crate) node_id: String, + + /// Location (zone or datacenter) of the node + #[structopt(short = "z", long = "zone")] + pub(crate) zone: Option<String>, + + /// Capacity (in relative terms, use 1 to represent your smallest server) + #[structopt(short = "c", long = "capacity")] + pub(crate) capacity: Option<u32>, + + /// Gateway-only node + #[structopt(short = "g", long = "gateway")] + pub(crate) gateway: bool, + + /// Optional node tag + #[structopt(short = "t", long = "tag")] + pub(crate) tag: Option<String>, + + /// Replaced node(s): list of node IDs that will be removed from the current cluster + #[structopt(long = "replace")] + pub(crate) replace: Vec<String>, +} + +#[derive(StructOpt, Debug)] +pub struct RemoveNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + pub(crate) node_id: String, + + /// If this flag is not given, the node won't be removed + #[structopt(long = "yes")] + pub(crate) yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum BucketOperation { + /// List buckets + #[structopt(name = "list")] + List, + + /// Get bucket info + #[structopt(name = "info")] + Info(BucketOpt), + + /// Create bucket + #[structopt(name = "create")] + Create(BucketOpt), + + /// Delete bucket + #[structopt(name = "delete")] + Delete(DeleteBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "allow")] + Allow(PermBucketOpt), + + /// Deny key from reading or writing to bucket + #[structopt(name = "deny")] + Deny(PermBucketOpt), + + /// Expose as website or not + #[structopt(name = "website")] + Website(WebsiteOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct WebsiteOpt { + /// Create + #[structopt(long = "allow")] + pub allow: bool, + + /// Delete + #[structopt(long = "deny")] + pub deny: bool, + + /// Bucket name + pub bucket: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct BucketOpt { + /// Bucket name + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct DeleteBucketOpt { + /// Bucket name + pub name: String, + + /// If this flag is not given, the bucket won't be deleted + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct PermBucketOpt { + /// Access key name or ID + #[structopt(long = "key")] + pub key_pattern: String, + + /// Allow/deny read operations + #[structopt(long = "read")] + pub read: bool, + + /// Allow/deny write operations + #[structopt(long = "write")] + pub write: bool, + + /// Bucket name + pub bucket: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum KeyOperation { + /// List keys + #[structopt(name = "list")] + List, + + /// Get key info + #[structopt(name = "info")] + Info(KeyOpt), + + /// Create new key + #[structopt(name = "new")] + New(KeyNewOpt), + + /// Rename key + #[structopt(name = "rename")] + Rename(KeyRenameOpt), + + /// Delete key + #[structopt(name = "delete")] + Delete(KeyDeleteOpt), + + /// Import key + #[structopt(name = "import")] + Import(KeyImportOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyOpt { + /// ID or name of the key + pub key_pattern: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyNewOpt { + /// Name of the key + #[structopt(long = "name", default_value = "Unnamed key")] + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyRenameOpt { + /// ID or name of the key + pub key_pattern: String, + + /// New name of the key + pub new_name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyDeleteOpt { + /// ID or name of the key + pub key_pattern: String, + + /// Confirm deletion + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyImportOpt { + /// Access key ID + pub key_id: String, + + /// Secret access key + pub secret_key: String, + + /// Key name + #[structopt(short = "n", default_value = "Imported key")] + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct RepairOpt { + /// Launch repair operation on all nodes + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: Option<RepairWhat>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum RepairWhat { + /// Only do a full sync of metadata tables + #[structopt(name = "tables")] + Tables, + /// Only repair (resync/rebalance) the set of stored blocks + #[structopt(name = "blocks")] + Blocks, + /// Only redo the propagation of object deletions to the version table (slow) + #[structopt(name = "versions")] + Versions, + /// Only redo the propagation of version deletions to the block ref table (extremely slow) + #[structopt(name = "block_refs")] + BlockRefs, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct StatsOpt { + /// Gather statistics from all nodes + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Gather detailed statistics (this can be long) + #[structopt(short = "d", long = "detailed")] + pub detailed: bool, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs new file mode 100644 index 00000000..28b4d8ea --- /dev/null +++ b/src/garage/cli/util.rs @@ -0,0 +1,83 @@ +use garage_util::data::Uuid; +use garage_util::error::*; + +use garage_model::bucket_table::*; +use garage_model::key_table::*; + +pub fn print_key_info(key: &Key) { + println!("Key name: {}", key.name.get()); + println!("Key ID: {}", key.key_id); + println!("Secret key: {}", key.secret_key); + if key.deleted.get() { + println!("Key is deleted."); + } else { + println!("Authorized buckets:"); + for (b, _, perm) in key.authorized_buckets.items().iter() { + println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write); + } + } +} + +pub fn print_bucket_info(bucket: &Bucket) { + println!("Bucket name: {}", bucket.name); + match bucket.state.get() { + BucketState::Deleted => println!("Bucket is deleted."), + BucketState::Present(p) => { + println!("Authorized keys:"); + for (k, _, perm) in p.authorized_keys.items().iter() { + println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write); + } + println!("Website access: {}", p.website.get()); + } + }; +} + +pub fn format_table(data: Vec<String>) { + let data = data + .iter() + .map(|s| s.split('\t').collect::<Vec<_>>()) + .collect::<Vec<_>>(); + + let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); + let mut column_size = vec![0; columns]; + + let mut out = String::new(); + + for row in data.iter() { + for (i, col) in row.iter().enumerate() { + column_size[i] = std::cmp::max(column_size[i], col.chars().count()); + } + } + + for row in data.iter() { + for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { + out.push_str(col); + (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); + } + out.push_str(&row[row.len() - 1]); + out.push('\n'); + } + + print!("{}", out); +} + +pub fn find_matching_node( + cand: impl std::iter::Iterator<Item = Uuid>, + pattern: &str, +) -> Result<Uuid, Error> { + let mut candidates = vec![]; + for c in cand { + if hex::encode(&c).starts_with(&pattern) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0]) + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index 543860ca..7f7196d9 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -4,22 +4,24 @@ #[macro_use] extern crate log; -mod admin_rpc; +mod admin; mod cli; mod repair; mod server; +use std::path::PathBuf; + use structopt::StructOpt; -use netapp::util::parse_peer_addr; +use netapp::util::parse_and_resolve_peer_addr; use netapp::NetworkKey; -use garage_util::error::Error; +use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use admin_rpc::*; +use admin::*; use cli::*; #[derive(StructOpt, Debug)] @@ -34,6 +36,10 @@ struct Opt { #[structopt(short = "s", long = "rpc-secret")] pub rpc_secret: Option<String>, + /// Configuration file (garage.toml) + #[structopt(short = "c", long = "config", default_value = "/etc/garage.toml")] + pub config_file: PathBuf, + #[structopt(subcommand)] cmd: Command, } @@ -45,38 +51,68 @@ async fn main() { let opt = Opt::from_args(); - let res = if let Command::Server(server_opt) = opt.cmd { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(server_opt.config_file).await - } else { - cli_command(opt).await + let res = match opt.cmd { + Command::Server => { + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + + server::run_server(opt.config_file).await + } + Command::NodeId(node_id_opt) => node_id_command(opt.config_file, node_id_opt.quiet), + _ => cli_command(opt).await, }; if let Err(e) = res { - error!("{}", e); + eprintln!("Error: {}", e); + std::process::exit(1); } } async fn cli_command(opt: Opt) -> Result<(), Error> { - let net_key_hex_str = &opt.rpc_secret.expect("No RPC secret provided"); + let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() { + Some(garage_util::config::read_config(opt.config_file.clone()) + .err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?) + } else { + None + }; + + // Find and parse network RPC secret + let net_key_hex_str = opt + .rpc_secret + .as_ref() + .or_else(|| config.as_ref().map(|c| &c.rpc_secret)) + .ok_or("No RPC secret provided")?; let network_key = NetworkKey::from_slice( - &hex::decode(net_key_hex_str).expect("Invalid RPC secret key (bad hex)")[..], + &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], ) - .expect("Invalid RPC secret provided (wrong length)"); + .ok_or("Invalid RPC secret provided (wrong length)")?; + + // Generate a temporary keypair for our RPC client let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); let netapp = NetApp::new(network_key, sk); - let (id, addr) = - parse_peer_addr(&opt.rpc_host.expect("No RPC host provided")).expect("Invalid RPC host"); - netapp.clone().try_connect(addr, id).await?; + + // Find and parse the address of the target host + let (id, addr) = if let Some(h) = opt.rpc_host { + let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?; + (id, addrs[0]) + } else if let Some(a) = config.as_ref().map(|c| c.rpc_public_addr).flatten() { + let node_id = garage_rpc::system::read_node_id(&config.unwrap().metadata_dir) + .err_context(READ_KEY_ERROR)?; + (node_id, a) + } else { + return Err(Error::Message("No RPC host provided".into())); + }; + + // Connect to target host + netapp.clone().try_connect(addr, id).await + .err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); - cli_cmd(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await + cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await } diff --git a/src/garage/server.rs b/src/garage/server.rs index cd92d157..f4d62e91 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -10,7 +10,7 @@ use garage_api::run_api_server; use garage_model::garage::Garage; use garage_web::run_web_server; -use crate::admin_rpc::*; +use crate::admin::*; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { |