diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-23 16:05:43 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-23 16:06:11 +0000 |
commit | 82f4cd87195e61552348ae9d1cd27096253a4187 (patch) | |
tree | 09e150420a8071e1381b72847a692e720428f1e5 /src/main.rs | |
parent | 2fe82be3bcb326af04c4c862431237c576ed1152 (diff) | |
download | garage-82f4cd87195e61552348ae9d1cd27096253a4187.tar.gz garage-82f4cd87195e61552348ae9d1cd27096253a4187.zip |
Continue pinging nodes when they are down ; overall better handling of down nodes
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 98 |
1 files changed, 60 insertions, 38 deletions
diff --git a/src/main.rs b/src/main.rs index 8985f181..06f0fe98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,14 +27,16 @@ mod rpc_server; mod server; mod tls_util; -use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; + +use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use data::*; use error::Error; use membership::*; use rpc_client::*; @@ -107,14 +109,16 @@ pub struct ConfigureNodeOpt { node_id: String, /// Location (datacenter) of the node - datacenter: String, + #[structopt(short = "d", long = "datacenter")] + datacenter: Option<String>, /// Number of tokens - n_tokens: u32, + #[structopt(short = "n", long = "n-tokens")] + n_tokens: Option<u32>, /// Optionnal node tag - #[structopt(long = "tag", default_value = "")] - tag: String, + #[structopt(short = "t", long = "tag")] + tag: Option<String>, } #[derive(StructOpt, Debug)] @@ -276,58 +280,66 @@ async fn main() { async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseNodesUp(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; println!("Healthy nodes:"); - for adv in status.iter() { + for adv in status.iter().filter(|x| x.is_up) { if let Some(cfg) = config.members.get(&adv.id) { println!( - "{:?}\t{}\t{}\t{}\t{}\t{}", + "{:?}\t{}\t{}\t[{}]\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens ); + } else { + println!( + "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED", + adv.id, adv.state_info.hostname, adv.addr + ); } } let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); - if config + let failure_case_1 = status.iter().any(|x| !x.is_up); + let failure_case_2 = config .members .iter() - .any(|(id, _)| !status_keys.contains(id)) - { + .any(|(id, _)| !status_keys.contains(id)); + if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); + for adv in status.iter().filter(|x| !x.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + println!( + "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago", + adv.id, + adv.state_info.hostname, + adv.addr, + cfg.tag, + cfg.datacenter, + cfg.n_tokens, + (now_msec() - adv.last_seen)/1000, + ); + } + } for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { println!( - "{:?}\t{}\t{}\t{}", + "{:?}\t{}\t{}\t{}\tnever seen", id, cfg.tag, cfg.datacenter, cfg.n_tokens ); } } } - if status - .iter() - .any(|adv| !config.members.contains_key(&adv.id)) - { - println!("\nUnconfigured nodes:"); - for adv in status.iter() { - if !config.members.contains_key(&adv.id) { - println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr); - } - } - } - Ok(()) } @@ -338,7 +350,7 @@ async fn cmd_configure( ) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseNodesUp(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -359,20 +371,30 @@ async fn cmd_configure( let mut config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - config.members.insert( - candidates[0].clone(), - NetworkConfigEntry { - datacenter: args.datacenter, - n_tokens: args.n_tokens, - tag: args.tag, + let new_entry = match config.members.get(&candidates[0]) { + None => NetworkConfigEntry { + datacenter: args + .datacenter + .expect("Please specifiy a datacenter with the -d flag"), + n_tokens: args + .n_tokens + .expect("Please specifiy a number of tokens with the -n flag"), + tag: args.tag.unwrap_or("".to_string()), + }, + Some(old) => NetworkConfigEntry { + datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()), + n_tokens: args.n_tokens.unwrap_or(old.n_tokens), + tag: args.tag.unwrap_or(old.tag.to_string()), }, - ); + }; + + config.members.insert(candidates[0].clone(), new_entry); config.version += 1; rpc_cli @@ -381,7 +403,7 @@ async fn cmd_configure( &Message::AdvertiseConfig(config), ADMIN_RPC_TIMEOUT, ) - .await?; + .await??; Ok(()) } @@ -392,7 +414,7 @@ async fn cmd_remove( ) -> Result<(), Error> { let mut config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -427,7 +449,7 @@ async fn cmd_remove( &Message::AdvertiseConfig(config), ADMIN_RPC_TIMEOUT, ) - .await?; + .await??; Ok(()) } @@ -436,7 +458,7 @@ async fn cmd_admin( rpc_host: SocketAddr, args: AdminRPC, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? { + match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { AdminRPC::Ok(msg) => { println!("{}", msg); } |