diff options
Diffstat (limited to 'src/garage/cli.rs')
-rw-r--r-- | src/garage/cli.rs | 168 |
1 files changed, 88 insertions, 80 deletions
diff --git a/src/garage/cli.rs b/src/garage/cli.rs index f9e67fac..91ec5ab2 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -1,6 +1,5 @@ -use std::cmp::max; -use std::collections::HashSet; -use std::net::SocketAddr; +//use std::cmp::max; +//use std::collections::HashSet; use std::path::PathBuf; use serde::{Deserialize, Serialize}; @@ -8,11 +7,11 @@ use structopt::StructOpt; use garage_util::data::Uuid; use garage_util::error::Error; -use garage_util::time::*; +//use garage_util::time::*; -use garage_rpc::membership::*; use garage_rpc::ring::*; -use garage_rpc::rpc_client::*; +use garage_rpc::system::*; +use garage_rpc::*; use garage_model::bucket_table::*; use garage_model::key_table::*; @@ -298,54 +297,65 @@ pub struct StatsOpt { pub async fn cli_cmd( cmd: Command, - membership_rpc_cli: RpcAddrClient<Message>, - admin_rpc_cli: RpcAddrClient<AdminRpc>, - rpc_host: SocketAddr, + system_rpc_endpoint: &Endpoint<SystemRpc, ()>, + admin_rpc_endpoint: &Endpoint<AdminRpc, ()>, + rpc_host: NodeID, ) -> Result<(), Error> { match cmd { - Command::Status => cmd_status(membership_rpc_cli, rpc_host).await, + Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, Command::Node(NodeOperation::Configure(configure_opt)) => { - cmd_configure(membership_rpc_cli, rpc_host, configure_opt).await + cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await } Command::Node(NodeOperation::Remove(remove_opt)) => { - cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await + cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await } Command::Bucket(bo) => { - cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::BucketOperation(bo)).await + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await } - Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::KeyOperation(ko)).await, - Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::LaunchRepair(ro)).await, - Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::Stats(so)).await, + Command::Key(ko) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await + } + Command::Repair(ro) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await + } + Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, _ => unreachable!(), } } -pub async fn cmd_status( - rpc_cli: RpcAddrClient<Message>, - rpc_host: SocketAddr, -) -> Result<(), Error> { +pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await? { - Message::AdvertiseNodesUp(nodes) => nodes, + SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await? { - Message::AdvertiseConfig(cfg) => cfg, + SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; + println!("STATUS:"); + for node in status { + println!("{:?}", node); + } + println!("CONFIG: (v{})", config.version); + for (id, node) in config.members { + println!("{} {:?}", hex::encode(id.as_slice()), node); + } + + /* TODO let (hostname_len, addr_len, tag_len, zone_len) = status .iter() - .map(|adv| (adv, config.members.get(&adv.id))) - .map(|(adv, cfg)| { + .map(|(id, addr, _)| (addr, config.members.get(&adv.id))) + .map(|(addr, cfg)| { ( - adv.state_info.hostname.len(), - adv.addr.to_string().len(), + 8, + addr.to_string().len(), cfg.map(|c| c.tag.len()).unwrap_or(0), cfg.map(|c| c.zone.len()).unwrap_or(0), ) @@ -355,13 +365,13 @@ pub async fn cmd_status( }); println!("Healthy nodes:"); - for adv in status.iter().filter(|x| x.is_up) { + for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) { if let Some(cfg) = config.members.get(&adv.id) { println!( "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}", - id = adv.id, - host = adv.state_info.hostname, - addr = adv.addr, + id = id, + host = "", + addr = addr, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), @@ -373,36 +383,36 @@ pub async fn cmd_status( } else { println!( "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED", - id = adv.id, - h = adv.state_info.hostname, - addr = adv.addr, - h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad = " ".repeat(addr_len - adv.addr.to_string().len()), + id = id, + h = "", + addr = addr, + h_pad = " ".repeat(hostname_len - "".len()), + a_pad = " ".repeat(addr_len - addr.to_string().len()), ); } } - let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|x| !x.is_up); + let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>(); + let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up); let failure_case_2 = config .members .iter() .any(|(id, _)| !status_keys.contains(id)); if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); - for adv in status.iter().filter(|x| !x.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { + for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) { + if let Some(cfg) = config.members.get(&id) { println!( "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago", - id=adv.id, - host=adv.state_info.hostname, - addr=adv.addr, + id=id, + host="", + addr=addr, tag=cfg.tag, zone=cfg.zone, capacity=cfg.capacity_string(), - last_seen=(now_msec() - adv.last_seen) / 1000, - h_pad=" ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad=" ".repeat(addr_len - adv.addr.to_string().len()), + last_seen=(now_msec() - 0) / 1000, + h_pad=" ".repeat(hostname_len - "".len()), + a_pad=" ".repeat(addr_len - addr.to_string().len()), t_pad=" ".repeat(tag_len - cfg.tag.len()), z_pad=" ".repeat(zone_len - cfg.zone.len()), ); @@ -411,12 +421,12 @@ pub async fn cmd_status( let (tag_len, zone_len) = config .members .iter() - .filter(|(&id, _)| !status.iter().any(|x| x.id == id)) + .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id)) .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len())) .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz))); for (id, cfg) in config.members.iter() { - if !status.iter().any(|x| x.id == *id) { + if !status.iter().any(|(xid, _, _)| xid == *id) { println!( "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen", id = id, @@ -429,6 +439,7 @@ pub async fn cmd_status( } } } + */ Ok(()) } @@ -455,25 +466,30 @@ pub fn find_matching_node( } pub async fn cmd_configure( - rpc_cli: RpcAddrClient<Message>, - rpc_host: SocketAddr, + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, args: ConfigureNodeOpt, ) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await? { - Message::AdvertiseNodesUp(nodes) => nodes, + SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?; + let added_node = find_matching_node( + status + .iter() + .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()), + &args.node_id, + )?; let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await? { - Message::AdvertiseConfig(cfg) => cfg, + SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; @@ -527,25 +543,21 @@ pub async fn cmd_configure( config.version += 1; rpc_cli - .call( - &rpc_host, - &Message::AdvertiseConfig(config), - ADMIN_RPC_TIMEOUT, - ) - .await??; + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await?; Ok(()) } pub async fn cmd_remove( - rpc_cli: RpcAddrClient<Message>, - rpc_host: SocketAddr, + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, args: RemoveNodeOpt, ) -> Result<(), Error> { let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await? { - Message::AdvertiseConfig(cfg) => cfg, + SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; @@ -562,21 +574,17 @@ pub async fn cmd_remove( config.version += 1; rpc_cli - .call( - &rpc_host, - &Message::AdvertiseConfig(config), - ADMIN_RPC_TIMEOUT, - ) - .await??; + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await?; Ok(()) } pub async fn cmd_admin( - rpc_cli: RpcAddrClient<AdminRpc>, - rpc_host: SocketAddr, + rpc_cli: &Endpoint<AdminRpc, ()>, + rpc_host: NodeID, args: AdminRpc, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? { AdminRpc::Ok(msg) => { println!("{}", msg); } |