aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 16:55:24 +0200
commit1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch)
treed6437f105a630fa197b67446b5c3b2902335c34a /src/garage/cli.rs
parent4067797d0142ee7860aff8da95d65820d6cc0889 (diff)
downloadgarage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz
garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/garage/cli.rs')
-rw-r--r--src/garage/cli.rs235
1 files changed, 125 insertions, 110 deletions
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index 91ec5ab2..940a5a85 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -1,5 +1,4 @@
-//use std::cmp::max;
-//use std::collections::HashSet;
+use std::collections::HashSet;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
@@ -7,7 +6,7 @@ use structopt::StructOpt;
use garage_util::data::Uuid;
use garage_util::error::Error;
-//use garage_util::time::*;
+use garage_util::time::*;
use garage_rpc::ring::*;
use garage_rpc::system::*;
@@ -58,6 +57,10 @@ pub struct ServerOpt {
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
+ /// Connect to Garage node that is currently isolated from the system
+ #[structopt(name = "connect")]
+ Connect(ConnectNodeOpt),
+
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureNodeOpt),
@@ -68,6 +71,13 @@ pub enum NodeOperation {
}
#[derive(StructOpt, Debug)]
+pub struct ConnectNodeOpt {
+ /// Node public key and address, in the format:
+ /// `<public key hexadecimal>@<ip or hostname>:<port>`
+ node: String,
+}
+
+#[derive(StructOpt, Debug)]
pub struct ConfigureNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
@@ -303,6 +313,9 @@ pub async fn cli_cmd(
) -> Result<(), Error> {
match cmd {
Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
+ Command::Node(NodeOperation::Connect(connect_opt)) => {
+ cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
+ }
Command::Node(NodeOperation::Configure(configure_opt)) => {
cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
@@ -326,142 +339,96 @@ pub async fn cli_cmd(
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- println!("STATUS:");
- for node in status {
- println!("{:?}", node);
- }
- println!("CONFIG: (v{})", config.version);
- for (id, node) in config.members {
- println!("{} {:?}", hex::encode(id.as_slice()), node);
- }
-
- /* TODO
- let (hostname_len, addr_len, tag_len, zone_len) = status
- .iter()
- .map(|(id, addr, _)| (addr, config.members.get(&adv.id)))
- .map(|(addr, cfg)| {
- (
- 8,
- addr.to_string().len(),
- cfg.map(|c| c.tag.len()).unwrap_or(0),
- cfg.map(|c| c.zone.len()).unwrap_or(0),
- )
- })
- .fold((0, 0, 0, 0), |(h, a, t, z), (mh, ma, mt, mz)| {
- (max(h, mh), max(a, ma), max(t, mt), max(z, mz))
- });
-
println!("Healthy nodes:");
- for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) {
+ let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
+ for adv in status.iter().filter(|adv| adv.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
- println!(
- "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}",
- id = id,
- host = "",
- addr = addr,
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
- h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad = " ".repeat(addr_len - adv.addr.to_string().len()),
- t_pad = " ".repeat(tag_len - cfg.tag.len()),
- z_pad = " ".repeat(zone_len - cfg.zone.len()),
- );
+ ));
} else {
- println!(
- "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED",
- id = id,
- h = "",
- addr = addr,
- h_pad = " ".repeat(hostname_len - "".len()),
- a_pad = " ".repeat(addr_len - addr.to_string().len()),
- );
+ healthy_nodes.push(format!(
+ "{id:?}\t{h}\t{addr}\tUNCONFIGURED/REMOVED",
+ id = adv.id,
+ h = adv.status.hostname,
+ addr = adv.addr,
+ ));
}
}
+ format_table(healthy_nodes);
- let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up);
+ let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
+ let failure_case_1 = status.iter().any(|adv| !adv.is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
- for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) {
- if let Some(cfg) = config.members.get(&id) {
- println!(
- "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago",
- id=id,
- host="",
- addr=addr,
- tag=cfg.tag,
- zone=cfg.zone,
- capacity=cfg.capacity_string(),
- last_seen=(now_msec() - 0) / 1000,
- h_pad=" ".repeat(hostname_len - "".len()),
- a_pad=" ".repeat(addr_len - addr.to_string().len()),
- t_pad=" ".repeat(tag_len - cfg.tag.len()),
- z_pad=" ".repeat(zone_len - cfg.zone.len()),
- );
+ let mut failed_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
+ for adv in status.iter().filter(|adv| !adv.is_up) {
+ if let Some(cfg) = config.members.get(&adv.id) {
+ failed_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
+ tag = cfg.tag,
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ last_seen = (now_msec() - 0) / 1000,
+ ));
}
}
- let (tag_len, zone_len) = config
- .members
- .iter()
- .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id))
- .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len()))
- .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz)));
-
for (id, cfg) in config.members.iter() {
- if !status.iter().any(|(xid, _, _)| xid == *id) {
- println!(
- "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen",
+ if !status.iter().any(|adv| adv.id == *id) {
+ failed_nodes.push(format!(
+ "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
id = id,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
- t_pad = " ".repeat(tag_len - cfg.tag.len()),
- z_pad = " ".repeat(zone_len - cfg.zone.len()),
- );
+ ));
}
}
+ format_table(failed_nodes);
}
- */
Ok(())
}
-pub fn find_matching_node(
- cand: impl std::iter::Iterator<Item = Uuid>,
- pattern: &str,
-) -> Result<Uuid, Error> {
- let mut candidates = vec![];
- for c in cand {
- if hex::encode(&c).starts_with(&pattern) {
- candidates.push(c);
+pub async fn cmd_connect(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ args: ConnectNodeOpt,
+) -> Result<(), Error> {
+ match rpc_cli.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL).await?? {
+ SystemRpc::Ok => {
+ println!("Success.");
+ Ok(())
+ }
+ r => {
+ Err(Error::BadRpc(format!("Unexpected response: {:?}", r)))
}
- }
- if candidates.len() != 1 {
- Err(Error::Message(format!(
- "{} nodes match '{}'",
- candidates.len(),
- pattern,
- )))
- } else {
- Ok(candidates[0])
}
}
@@ -472,22 +439,17 @@ pub async fn cmd_configure(
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let added_node = find_matching_node(
- status
- .iter()
- .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()),
- &args.node_id,
- )?;
+ let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -544,7 +506,7 @@ pub async fn cmd_configure(
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await?;
+ .await??;
Ok(())
}
@@ -555,7 +517,7 @@ pub async fn cmd_remove(
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -575,7 +537,7 @@ pub async fn cmd_remove(
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await?;
+ .await??;
Ok(())
}
@@ -584,7 +546,7 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? {
+ match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
@@ -613,6 +575,8 @@ pub async fn cmd_admin(
Ok(())
}
+// --- Utility functions ----
+
fn print_key_info(key: &Key) {
println!("Key name: {}", key.name.get());
println!("Key ID: {}", key.key_id);
@@ -640,3 +604,54 @@ fn print_bucket_info(bucket: &Bucket) {
}
};
}
+
+fn format_table(data: Vec<String>) {
+ let data = data
+ .iter()
+ .map(|s| s.split('\t').collect::<Vec<_>>())
+ .collect::<Vec<_>>();
+
+ let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
+ let mut column_size = vec![0; columns];
+
+ let mut out = String::new();
+
+ for row in data.iter() {
+ for (i, col) in row.iter().enumerate() {
+ column_size[i] = std::cmp::max(column_size[i], col.chars().count());
+ }
+ }
+
+ for row in data.iter() {
+ for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
+ out.push_str(col);
+ (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
+ }
+ out.push_str(&row[row.len() - 1]);
+ out.push('\n');
+ }
+
+ print!("{}", out);
+}
+
+pub fn find_matching_node(
+ cand: impl std::iter::Iterator<Item = Uuid>,
+ pattern: &str,
+) -> Result<Uuid, Error> {
+ let mut candidates = vec![];
+ for c in cand {
+ if hex::encode(&c).starts_with(&pattern) {
+ candidates.push(c);
+ }
+ }
+ if candidates.len() != 1 {
+ Err(Error::Message(format!(
+ "{} nodes match '{}'",
+ candidates.len(),
+ pattern,
+ )))
+ } else {
+ Ok(candidates[0])
+ }
+}
+