diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-15 11:05:09 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-22 16:55:24 +0200 |
commit | 1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch) | |
tree | d6437f105a630fa197b67446b5c3b2902335c34a /src/garage/cli.rs | |
parent | 4067797d0142ee7860aff8da95d65820d6cc0889 (diff) | |
download | garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip |
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/garage/cli.rs')
-rw-r--r-- | src/garage/cli.rs | 235 |
1 files changed, 125 insertions, 110 deletions
diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 91ec5ab2..940a5a85 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -1,5 +1,4 @@ -//use std::cmp::max; -//use std::collections::HashSet; +use std::collections::HashSet; use std::path::PathBuf; use serde::{Deserialize, Serialize}; @@ -7,7 +6,7 @@ use structopt::StructOpt; use garage_util::data::Uuid; use garage_util::error::Error; -//use garage_util::time::*; +use garage_util::time::*; use garage_rpc::ring::*; use garage_rpc::system::*; @@ -58,6 +57,10 @@ pub struct ServerOpt { #[derive(StructOpt, Debug)] pub enum NodeOperation { + /// Connect to Garage node that is currently isolated from the system + #[structopt(name = "connect")] + Connect(ConnectNodeOpt), + /// Configure Garage node #[structopt(name = "configure")] Configure(ConfigureNodeOpt), @@ -68,6 +71,13 @@ pub enum NodeOperation { } #[derive(StructOpt, Debug)] +pub struct ConnectNodeOpt { + /// Node public key and address, in the format: + /// `<public key hexadecimal>@<ip or hostname>:<port>` + node: String, +} + +#[derive(StructOpt, Debug)] pub struct ConfigureNodeOpt { /// Node to configure (prefix of hexadecimal node id) node_id: String, @@ -303,6 +313,9 @@ pub async fn cli_cmd( ) -> Result<(), Error> { match cmd { Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await + } Command::Node(NodeOperation::Configure(configure_opt)) => { cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await } @@ -326,142 +339,96 @@ pub async fn cli_cmd( pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await? + .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - println!("STATUS:"); - for node in status { - println!("{:?}", node); - } - println!("CONFIG: (v{})", config.version); - for (id, node) in config.members { - println!("{} {:?}", hex::encode(id.as_slice()), node); - } - - /* TODO - let (hostname_len, addr_len, tag_len, zone_len) = status - .iter() - .map(|(id, addr, _)| (addr, config.members.get(&adv.id))) - .map(|(addr, cfg)| { - ( - 8, - addr.to_string().len(), - cfg.map(|c| c.tag.len()).unwrap_or(0), - cfg.map(|c| c.zone.len()).unwrap_or(0), - ) - }) - .fold((0, 0, 0, 0), |(h, a, t, z), (mh, ma, mt, mz)| { - (max(h, mh), max(a, ma), max(t, mt), max(z, mz)) - }); - println!("Healthy nodes:"); - for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) { + let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; + for adv in status.iter().filter(|adv| adv.is_up) { if let Some(cfg) = config.members.get(&adv.id) { - println!( - "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}", - id = id, - host = "", - addr = addr, + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), - h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad = " ".repeat(addr_len - adv.addr.to_string().len()), - t_pad = " ".repeat(tag_len - cfg.tag.len()), - z_pad = " ".repeat(zone_len - cfg.zone.len()), - ); + )); } else { - println!( - "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED", - id = id, - h = "", - addr = addr, - h_pad = " ".repeat(hostname_len - "".len()), - a_pad = " ".repeat(addr_len - addr.to_string().len()), - ); + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\tUNCONFIGURED/REMOVED", + id = adv.id, + h = adv.status.hostname, + addr = adv.addr, + )); } } + format_table(healthy_nodes); - let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up); + let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); + let failure_case_1 = status.iter().any(|adv| !adv.is_up); let failure_case_2 = config .members .iter() .any(|(id, _)| !status_keys.contains(id)); if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); - for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) { - if let Some(cfg) = config.members.get(&id) { - println!( - "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago", - id=id, - host="", - addr=addr, - tag=cfg.tag, - zone=cfg.zone, - capacity=cfg.capacity_string(), - last_seen=(now_msec() - 0) / 1000, - h_pad=" ".repeat(hostname_len - "".len()), - a_pad=" ".repeat(addr_len - addr.to_string().len()), - t_pad=" ".repeat(tag_len - cfg.tag.len()), - z_pad=" ".repeat(zone_len - cfg.zone.len()), - ); + let mut failed_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; + for adv in status.iter().filter(|adv| !adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + failed_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + last_seen = (now_msec() - 0) / 1000, + )); } } - let (tag_len, zone_len) = config - .members - .iter() - .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id)) - .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len())) - .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz))); - for (id, cfg) in config.members.iter() { - if !status.iter().any(|(xid, _, _)| xid == *id) { - println!( - "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen", + if !status.iter().any(|adv| adv.id == *id) { + failed_nodes.push(format!( + "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", id = id, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), - t_pad = " ".repeat(tag_len - cfg.tag.len()), - z_pad = " ".repeat(zone_len - cfg.zone.len()), - ); + )); } } + format_table(failed_nodes); } - */ Ok(()) } -pub fn find_matching_node( - cand: impl std::iter::Iterator<Item = Uuid>, - pattern: &str, -) -> Result<Uuid, Error> { - let mut candidates = vec![]; - for c in cand { - if hex::encode(&c).starts_with(&pattern) { - candidates.push(c); +pub async fn cmd_connect( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: ConnectNodeOpt, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL).await?? { + SystemRpc::Ok => { + println!("Success."); + Ok(()) + } + r => { + Err(Error::BadRpc(format!("Unexpected response: {:?}", r))) } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0]) } } @@ -472,22 +439,17 @@ pub async fn cmd_configure( ) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await? + .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node( - status - .iter() - .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()), - &args.node_id, - )?; + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; let mut config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -544,7 +506,7 @@ pub async fn cmd_configure( rpc_cli .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await?; + .await??; Ok(()) } @@ -555,7 +517,7 @@ pub async fn cmd_remove( ) -> Result<(), Error> { let mut config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -575,7 +537,7 @@ pub async fn cmd_remove( rpc_cli .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await?; + .await??; Ok(()) } @@ -584,7 +546,7 @@ pub async fn cmd_admin( rpc_host: NodeID, args: AdminRpc, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { AdminRpc::Ok(msg) => { println!("{}", msg); } @@ -613,6 +575,8 @@ pub async fn cmd_admin( Ok(()) } +// --- Utility functions ---- + fn print_key_info(key: &Key) { println!("Key name: {}", key.name.get()); println!("Key ID: {}", key.key_id); @@ -640,3 +604,54 @@ fn print_bucket_info(bucket: &Bucket) { } }; } + +fn format_table(data: Vec<String>) { + let data = data + .iter() + .map(|s| s.split('\t').collect::<Vec<_>>()) + .collect::<Vec<_>>(); + + let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); + let mut column_size = vec![0; columns]; + + let mut out = String::new(); + + for row in data.iter() { + for (i, col) in row.iter().enumerate() { + column_size[i] = std::cmp::max(column_size[i], col.chars().count()); + } + } + + for row in data.iter() { + for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { + out.push_str(col); + (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); + } + out.push_str(&row[row.len() - 1]); + out.push('\n'); + } + + print!("{}", out); +} + +pub fn find_matching_node( + cand: impl std::iter::Iterator<Item = Uuid>, + pattern: &str, +) -> Result<Uuid, Error> { + let mut candidates = vec![]; + for c in cand { + if hex::encode(&c).starts_with(&pattern) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0]) + } +} + |