aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/cli.rs')
-rw-r--r--src/garage/cli.rs168
1 files changed, 88 insertions, 80 deletions
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index f9e67fac..91ec5ab2 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -1,6 +1,5 @@
-use std::cmp::max;
-use std::collections::HashSet;
-use std::net::SocketAddr;
+//use std::cmp::max;
+//use std::collections::HashSet;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
@@ -8,11 +7,11 @@ use structopt::StructOpt;
use garage_util::data::Uuid;
use garage_util::error::Error;
-use garage_util::time::*;
+//use garage_util::time::*;
-use garage_rpc::membership::*;
use garage_rpc::ring::*;
-use garage_rpc::rpc_client::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
@@ -298,54 +297,65 @@ pub struct StatsOpt {
pub async fn cli_cmd(
cmd: Command,
- membership_rpc_cli: RpcAddrClient<Message>,
- admin_rpc_cli: RpcAddrClient<AdminRpc>,
- rpc_host: SocketAddr,
+ system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
+ admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
+ rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
- Command::Status => cmd_status(membership_rpc_cli, rpc_host).await,
+ Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(membership_rpc_cli, rpc_host, configure_opt).await
+ cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await
+ cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
}
Command::Bucket(bo) => {
- cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::BucketOperation(bo)).await
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
}
- Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::KeyOperation(ko)).await,
- Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::LaunchRepair(ro)).await,
- Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Key(ko) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
+ }
+ Command::Repair(ro) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
+ }
+ Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
_ => unreachable!(),
}
}
-pub async fn cmd_status(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
-) -> Result<(), Error> {
+pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseNodesUp(nodes) => nodes,
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
+ println!("STATUS:");
+ for node in status {
+ println!("{:?}", node);
+ }
+ println!("CONFIG: (v{})", config.version);
+ for (id, node) in config.members {
+ println!("{} {:?}", hex::encode(id.as_slice()), node);
+ }
+
+ /* TODO
let (hostname_len, addr_len, tag_len, zone_len) = status
.iter()
- .map(|adv| (adv, config.members.get(&adv.id)))
- .map(|(adv, cfg)| {
+ .map(|(id, addr, _)| (addr, config.members.get(&adv.id)))
+ .map(|(addr, cfg)| {
(
- adv.state_info.hostname.len(),
- adv.addr.to_string().len(),
+ 8,
+ addr.to_string().len(),
cfg.map(|c| c.tag.len()).unwrap_or(0),
cfg.map(|c| c.zone.len()).unwrap_or(0),
)
@@ -355,13 +365,13 @@ pub async fn cmd_status(
});
println!("Healthy nodes:");
- for adv in status.iter().filter(|x| x.is_up) {
+ for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
"{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}",
- id = adv.id,
- host = adv.state_info.hostname,
- addr = adv.addr,
+ id = id,
+ host = "",
+ addr = addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
@@ -373,36 +383,36 @@ pub async fn cmd_status(
} else {
println!(
"{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED",
- id = adv.id,
- h = adv.state_info.hostname,
- addr = adv.addr,
- h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad = " ".repeat(addr_len - adv.addr.to_string().len()),
+ id = id,
+ h = "",
+ addr = addr,
+ h_pad = " ".repeat(hostname_len - "".len()),
+ a_pad = " ".repeat(addr_len - addr.to_string().len()),
);
}
}
- let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|x| !x.is_up);
+ let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>();
+ let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
- for adv in status.iter().filter(|x| !x.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
+ for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) {
+ if let Some(cfg) = config.members.get(&id) {
println!(
"{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago",
- id=adv.id,
- host=adv.state_info.hostname,
- addr=adv.addr,
+ id=id,
+ host="",
+ addr=addr,
tag=cfg.tag,
zone=cfg.zone,
capacity=cfg.capacity_string(),
- last_seen=(now_msec() - adv.last_seen) / 1000,
- h_pad=" ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad=" ".repeat(addr_len - adv.addr.to_string().len()),
+ last_seen=(now_msec() - 0) / 1000,
+ h_pad=" ".repeat(hostname_len - "".len()),
+ a_pad=" ".repeat(addr_len - addr.to_string().len()),
t_pad=" ".repeat(tag_len - cfg.tag.len()),
z_pad=" ".repeat(zone_len - cfg.zone.len()),
);
@@ -411,12 +421,12 @@ pub async fn cmd_status(
let (tag_len, zone_len) = config
.members
.iter()
- .filter(|(&id, _)| !status.iter().any(|x| x.id == id))
+ .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id))
.map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len()))
.fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz)));
for (id, cfg) in config.members.iter() {
- if !status.iter().any(|x| x.id == *id) {
+ if !status.iter().any(|(xid, _, _)| xid == *id) {
println!(
"{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen",
id = id,
@@ -429,6 +439,7 @@ pub async fn cmd_status(
}
}
}
+ */
Ok(())
}
@@ -455,25 +466,30 @@ pub fn find_matching_node(
}
pub async fn cmd_configure(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseNodesUp(nodes) => nodes,
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?;
+ let added_node = find_matching_node(
+ status
+ .iter()
+ .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()),
+ &args.node_id,
+ )?;
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
@@ -527,25 +543,21 @@ pub async fn cmd_configure(
config.version += 1;
rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
+ .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
+ .await?;
Ok(())
}
pub async fn cmd_remove(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
@@ -562,21 +574,17 @@ pub async fn cmd_remove(
config.version += 1;
rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
+ .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
+ .await?;
Ok(())
}
pub async fn cmd_admin(
- rpc_cli: RpcAddrClient<AdminRpc>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<AdminRpc, ()>,
+ rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
+ match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}