diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin_rpc.rs | 46 | ||||
-rw-r--r-- | src/garage/cli.rs | 235 | ||||
-rw-r--r-- | src/garage/main.rs | 3 | ||||
-rw-r--r-- | src/garage/server.rs | 8 |
4 files changed, 156 insertions, 136 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index b9e57c40..339d5bdb 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -31,15 +31,14 @@ pub enum AdminRpc { // Replies Ok(String), - Error(String), BucketList(Vec<String>), BucketInfo(Bucket), KeyList(Vec<(String, String)>), KeyInfo(Key), } -impl Message for AdminRpc { - type Response = AdminRpc; +impl Rpc for AdminRpc { + type Response = Result<AdminRpc, Error>; } pub struct AdminRpcHandler { @@ -341,17 +340,20 @@ impl AdminRpcHandler { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { - let node = NodeID::from_slice(node.as_slice()).unwrap(); - if self + let node = (*node).into(); + let resp = self .endpoint .call( &node, &AdminRpc::LaunchRepair(opt_to_send.clone()), PRIO_NORMAL, ) - .await - .is_err() - { + .await; + let is_err = match resp { + Ok(Ok(_)) => false, + _ => true, + }; + if is_err { failures.push(node); } } @@ -386,17 +388,17 @@ impl AdminRpcHandler { let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { - let node = NodeID::from_slice(node.as_slice()).unwrap(); - let mut opt = opt.clone(); opt.all_nodes = false; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); + + let node_id = (*node).into(); match self .endpoint - .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL) - .await + .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) + .await? { Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), @@ -486,9 +488,16 @@ impl AdminRpcHandler { .unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); } +} - async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> { - match msg { +#[async_trait] +impl EndpointHandler<AdminRpc> for AdminRpcHandler { + async fn handle( + self: &Arc<Self>, + message: &AdminRpc, + _from: NodeID, + ) -> Result<AdminRpc, Error> { + match message { AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, @@ -497,12 +506,3 @@ impl AdminRpcHandler { } } } - -#[async_trait] -impl EndpointHandler<AdminRpc> for AdminRpcHandler { - async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e))) - } -} diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 91ec5ab2..940a5a85 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -1,5 +1,4 @@ -//use std::cmp::max; -//use std::collections::HashSet; +use std::collections::HashSet; use std::path::PathBuf; use serde::{Deserialize, Serialize}; @@ -7,7 +6,7 @@ use structopt::StructOpt; use garage_util::data::Uuid; use garage_util::error::Error; -//use garage_util::time::*; +use garage_util::time::*; use garage_rpc::ring::*; use garage_rpc::system::*; @@ -58,6 +57,10 @@ pub struct ServerOpt { #[derive(StructOpt, Debug)] pub enum NodeOperation { + /// Connect to Garage node that is currently isolated from the system + #[structopt(name = "connect")] + Connect(ConnectNodeOpt), + /// Configure Garage node #[structopt(name = "configure")] Configure(ConfigureNodeOpt), @@ -68,6 +71,13 @@ pub enum NodeOperation { } #[derive(StructOpt, Debug)] +pub struct ConnectNodeOpt { + /// Node public key and address, in the format: + /// `<public key hexadecimal>@<ip or hostname>:<port>` + node: String, +} + +#[derive(StructOpt, Debug)] pub struct ConfigureNodeOpt { /// Node to configure (prefix of hexadecimal node id) node_id: String, @@ -303,6 +313,9 @@ pub async fn cli_cmd( ) -> Result<(), Error> { match cmd { Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await + } Command::Node(NodeOperation::Configure(configure_opt)) => { cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await } @@ -326,142 +339,96 @@ pub async fn cli_cmd( pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await? + .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - println!("STATUS:"); - for node in status { - println!("{:?}", node); - } - println!("CONFIG: (v{})", config.version); - for (id, node) in config.members { - println!("{} {:?}", hex::encode(id.as_slice()), node); - } - - /* TODO - let (hostname_len, addr_len, tag_len, zone_len) = status - .iter() - .map(|(id, addr, _)| (addr, config.members.get(&adv.id))) - .map(|(addr, cfg)| { - ( - 8, - addr.to_string().len(), - cfg.map(|c| c.tag.len()).unwrap_or(0), - cfg.map(|c| c.zone.len()).unwrap_or(0), - ) - }) - .fold((0, 0, 0, 0), |(h, a, t, z), (mh, ma, mt, mz)| { - (max(h, mh), max(a, ma), max(t, mt), max(z, mz)) - }); - println!("Healthy nodes:"); - for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) { + let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; + for adv in status.iter().filter(|adv| adv.is_up) { if let Some(cfg) = config.members.get(&adv.id) { - println!( - "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}", - id = id, - host = "", - addr = addr, + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), - h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad = " ".repeat(addr_len - adv.addr.to_string().len()), - t_pad = " ".repeat(tag_len - cfg.tag.len()), - z_pad = " ".repeat(zone_len - cfg.zone.len()), - ); + )); } else { - println!( - "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED", - id = id, - h = "", - addr = addr, - h_pad = " ".repeat(hostname_len - "".len()), - a_pad = " ".repeat(addr_len - addr.to_string().len()), - ); + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\tUNCONFIGURED/REMOVED", + id = adv.id, + h = adv.status.hostname, + addr = adv.addr, + )); } } + format_table(healthy_nodes); - let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up); + let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); + let failure_case_1 = status.iter().any(|adv| !adv.is_up); let failure_case_2 = config .members .iter() .any(|(id, _)| !status_keys.contains(id)); if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); - for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) { - if let Some(cfg) = config.members.get(&id) { - println!( - "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago", - id=id, - host="", - addr=addr, - tag=cfg.tag, - zone=cfg.zone, - capacity=cfg.capacity_string(), - last_seen=(now_msec() - 0) / 1000, - h_pad=" ".repeat(hostname_len - "".len()), - a_pad=" ".repeat(addr_len - addr.to_string().len()), - t_pad=" ".repeat(tag_len - cfg.tag.len()), - z_pad=" ".repeat(zone_len - cfg.zone.len()), - ); + let mut failed_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; + for adv in status.iter().filter(|adv| !adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + failed_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + last_seen = (now_msec() - 0) / 1000, + )); } } - let (tag_len, zone_len) = config - .members - .iter() - .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id)) - .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len())) - .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz))); - for (id, cfg) in config.members.iter() { - if !status.iter().any(|(xid, _, _)| xid == *id) { - println!( - "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen", + if !status.iter().any(|adv| adv.id == *id) { + failed_nodes.push(format!( + "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", id = id, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), - t_pad = " ".repeat(tag_len - cfg.tag.len()), - z_pad = " ".repeat(zone_len - cfg.zone.len()), - ); + )); } } + format_table(failed_nodes); } - */ Ok(()) } -pub fn find_matching_node( - cand: impl std::iter::Iterator<Item = Uuid>, - pattern: &str, -) -> Result<Uuid, Error> { - let mut candidates = vec![]; - for c in cand { - if hex::encode(&c).starts_with(&pattern) { - candidates.push(c); +pub async fn cmd_connect( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: ConnectNodeOpt, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL).await?? { + SystemRpc::Ok => { + println!("Success."); + Ok(()) + } + r => { + Err(Error::BadRpc(format!("Unexpected response: {:?}", r))) } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0]) } } @@ -472,22 +439,17 @@ pub async fn cmd_configure( ) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await? + .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node( - status - .iter() - .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()), - &args.node_id, - )?; + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; let mut config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -544,7 +506,7 @@ pub async fn cmd_configure( rpc_cli .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await?; + .await??; Ok(()) } @@ -555,7 +517,7 @@ pub async fn cmd_remove( ) -> Result<(), Error> { let mut config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -575,7 +537,7 @@ pub async fn cmd_remove( rpc_cli .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await?; + .await??; Ok(()) } @@ -584,7 +546,7 @@ pub async fn cmd_admin( rpc_host: NodeID, args: AdminRpc, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { AdminRpc::Ok(msg) => { println!("{}", msg); } @@ -613,6 +575,8 @@ pub async fn cmd_admin( Ok(()) } +// --- Utility functions ---- + fn print_key_info(key: &Key) { println!("Key name: {}", key.name.get()); println!("Key ID: {}", key.key_id); @@ -640,3 +604,54 @@ fn print_bucket_info(bucket: &Bucket) { } }; } + +fn format_table(data: Vec<String>) { + let data = data + .iter() + .map(|s| s.split('\t').collect::<Vec<_>>()) + .collect::<Vec<_>>(); + + let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); + let mut column_size = vec![0; columns]; + + let mut out = String::new(); + + for row in data.iter() { + for (i, col) in row.iter().enumerate() { + column_size[i] = std::cmp::max(column_size[i], col.chars().count()); + } + } + + for row in data.iter() { + for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { + out.push_str(col); + (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); + } + out.push_str(&row[row.len() - 1]); + out.push('\n'); + } + + print!("{}", out); +} + +pub fn find_matching_node( + cand: impl std::iter::Iterator<Item = Uuid>, + pattern: &str, +) -> Result<Uuid, Error> { + let mut candidates = vec![]; + for c in cand { + if hex::encode(&c).starts_with(&pattern) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0]) + } +} + diff --git a/src/garage/main.rs b/src/garage/main.rs index 7fe791b8..543860ca 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -9,8 +9,6 @@ mod cli; mod repair; mod server; -use std::net::SocketAddr; - use structopt::StructOpt; use netapp::util::parse_peer_addr; @@ -43,6 +41,7 @@ struct Opt { #[tokio::main] async fn main() { pretty_env_logger::init(); + sodiumoxide::init().expect("Unable to init sodiumoxide"); let opt = Opt::from_args(); diff --git a/src/garage/server.rs b/src/garage/server.rs index 0edf3e2d..cd92d157 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -71,8 +71,14 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Remove RPC handlers for system to break reference cycles garage.system.netapp.drop_all_handlers(); - // Await for last parts to end + // Await for netapp RPC system to end run_system.await?; + + // Break last reference cycles so that stuff can terminate properly + garage.break_reference_cycles(); + drop(garage); + + // Await for all background tasks to end await_background_done.await?; info!("Cleaning up..."); |