aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-23 16:05:43 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-23 16:06:11 +0000
commit82f4cd87195e61552348ae9d1cd27096253a4187 (patch)
tree09e150420a8071e1381b72847a692e720428f1e5 /src/main.rs
parent2fe82be3bcb326af04c4c862431237c576ed1152 (diff)
downloadgarage-82f4cd87195e61552348ae9d1cd27096253a4187.tar.gz
garage-82f4cd87195e61552348ae9d1cd27096253a4187.zip
Continue pinging nodes when they are down ; overall better handling of down nodes
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs98
1 files changed, 60 insertions, 38 deletions
diff --git a/src/main.rs b/src/main.rs
index 8985f181..06f0fe98 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -27,14 +27,16 @@ mod rpc_server;
mod server;
mod tls_util;
-use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use data::*;
use error::Error;
use membership::*;
use rpc_client::*;
@@ -107,14 +109,16 @@ pub struct ConfigureNodeOpt {
node_id: String,
/// Location (datacenter) of the node
- datacenter: String,
+ #[structopt(short = "d", long = "datacenter")]
+ datacenter: Option<String>,
/// Number of tokens
- n_tokens: u32,
+ #[structopt(short = "n", long = "n-tokens")]
+ n_tokens: Option<u32>,
/// Optionnal node tag
- #[structopt(long = "tag", default_value = "")]
- tag: String,
+ #[structopt(short = "t", long = "tag")]
+ tag: Option<String>,
}
#[derive(StructOpt, Debug)]
@@ -276,58 +280,66 @@ async fn main() {
async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
println!("Healthy nodes:");
- for adv in status.iter() {
+ for adv in status.iter().filter(|x| x.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
- "{:?}\t{}\t{}\t{}\t{}\t{}",
+ "{:?}\t{}\t{}\t[{}]\t{}\t{}",
adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens
);
+ } else {
+ println!(
+ "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED",
+ adv.id, adv.state_info.hostname, adv.addr
+ );
}
}
let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
- if config
+ let failure_case_1 = status.iter().any(|x| !x.is_up);
+ let failure_case_2 = config
.members
.iter()
- .any(|(id, _)| !status_keys.contains(id))
- {
+ .any(|(id, _)| !status_keys.contains(id));
+ if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
+ for adv in status.iter().filter(|x| !x.is_up) {
+ if let Some(cfg) = config.members.get(&adv.id) {
+ println!(
+ "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago",
+ adv.id,
+ adv.state_info.hostname,
+ adv.addr,
+ cfg.tag,
+ cfg.datacenter,
+ cfg.n_tokens,
+ (now_msec() - adv.last_seen)/1000,
+ );
+ }
+ }
for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) {
println!(
- "{:?}\t{}\t{}\t{}",
+ "{:?}\t{}\t{}\t{}\tnever seen",
id, cfg.tag, cfg.datacenter, cfg.n_tokens
);
}
}
}
- if status
- .iter()
- .any(|adv| !config.members.contains_key(&adv.id))
- {
- println!("\nUnconfigured nodes:");
- for adv in status.iter() {
- if !config.members.contains_key(&adv.id) {
- println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr);
- }
- }
- }
-
Ok(())
}
@@ -338,7 +350,7 @@ async fn cmd_configure(
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -359,20 +371,30 @@ async fn cmd_configure(
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- config.members.insert(
- candidates[0].clone(),
- NetworkConfigEntry {
- datacenter: args.datacenter,
- n_tokens: args.n_tokens,
- tag: args.tag,
+ let new_entry = match config.members.get(&candidates[0]) {
+ None => NetworkConfigEntry {
+ datacenter: args
+ .datacenter
+ .expect("Please specifiy a datacenter with the -d flag"),
+ n_tokens: args
+ .n_tokens
+ .expect("Please specifiy a number of tokens with the -n flag"),
+ tag: args.tag.unwrap_or("".to_string()),
+ },
+ Some(old) => NetworkConfigEntry {
+ datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()),
+ n_tokens: args.n_tokens.unwrap_or(old.n_tokens),
+ tag: args.tag.unwrap_or(old.tag.to_string()),
},
- );
+ };
+
+ config.members.insert(candidates[0].clone(), new_entry);
config.version += 1;
rpc_cli
@@ -381,7 +403,7 @@ async fn cmd_configure(
&Message::AdvertiseConfig(config),
ADMIN_RPC_TIMEOUT,
)
- .await?;
+ .await??;
Ok(())
}
@@ -392,7 +414,7 @@ async fn cmd_remove(
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -427,7 +449,7 @@ async fn cmd_remove(
&Message::AdvertiseConfig(config),
ADMIN_RPC_TIMEOUT,
)
- .await?;
+ .await??;
Ok(())
}
@@ -436,7 +458,7 @@ async fn cmd_admin(
rpc_host: SocketAddr,
args: AdminRPC,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? {
+ match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
AdminRPC::Ok(msg) => {
println!("{}", msg);
}