diff options
-rw-r--r-- | TODO | 6 | ||||
-rw-r--r-- | src/error.rs | 9 | ||||
-rw-r--r-- | src/main.rs | 98 | ||||
-rw-r--r-- | src/membership.rs | 79 | ||||
-rw-r--r-- | src/rpc_client.rs | 67 |
5 files changed, 164 insertions, 95 deletions
@@ -5,12 +5,6 @@ How are we going to test that our replication method works correctly? We will have to introduce lots of dummy data and then add/remove nodes many times. -Improvements ------------- - -Membership: keep IP addresses of failed nodes and try to reping them regularly - - Attaining S3 compatibility -------------------------- diff --git a/src/error.rs b/src/error.rs index 50a0a44b..d0a46f39 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ use hyper::StatusCode; use std::io; use crate::data::Hash; +use crate::rpc_client::RPCError; #[derive(Debug, Error)] pub enum Error { @@ -33,7 +34,6 @@ pub enum Error { RMPDecode(#[error(source)] rmp_serde::decode::Error), #[error(display = "JSON error: {}", _0)] JSON(#[error(source)] serde_json::error::Error), - #[error(display = "TOML decode error: {}", _0)] TomlDecode(#[error(source)] toml::de::Error), @@ -43,8 +43,11 @@ pub enum Error { #[error(display = "Tokio join error: {}", _0)] TokioJoin(#[error(source)] tokio::task::JoinError), - #[error(display = "RPC error: {} (status code {})", _0, _1)] - RPCError(String, StatusCode), + #[error(display = "RPC call error: {}", _0)] + RPC(#[error(source)] RPCError), + + #[error(display = "Remote error: {} (status code {})", _0, _1)] + RemoteError(String, StatusCode), #[error(display = "Bad request: {}", _0)] BadRequest(String), diff --git a/src/main.rs b/src/main.rs index 8985f181..06f0fe98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,14 +27,16 @@ mod rpc_server; mod server; mod tls_util; -use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; + +use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use data::*; use error::Error; use membership::*; use rpc_client::*; @@ -107,14 +109,16 @@ pub struct ConfigureNodeOpt { node_id: String, /// Location (datacenter) of the node - datacenter: String, + #[structopt(short = "d", long = "datacenter")] + datacenter: Option<String>, /// Number of tokens - n_tokens: u32, + #[structopt(short = "n", long = "n-tokens")] + n_tokens: Option<u32>, /// Optionnal node tag - #[structopt(long = "tag", default_value = "")] - tag: String, + #[structopt(short = "t", long = "tag")] + tag: Option<String>, } #[derive(StructOpt, Debug)] @@ -276,58 +280,66 @@ async fn main() { async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseNodesUp(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; println!("Healthy nodes:"); - for adv in status.iter() { + for adv in status.iter().filter(|x| x.is_up) { if let Some(cfg) = config.members.get(&adv.id) { println!( - "{:?}\t{}\t{}\t{}\t{}\t{}", + "{:?}\t{}\t{}\t[{}]\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens ); + } else { + println!( + "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED", + adv.id, adv.state_info.hostname, adv.addr + ); } } let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); - if config + let failure_case_1 = status.iter().any(|x| !x.is_up); + let failure_case_2 = config .members .iter() - .any(|(id, _)| !status_keys.contains(id)) - { + .any(|(id, _)| !status_keys.contains(id)); + if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); + for adv in status.iter().filter(|x| !x.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + println!( + "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago", + adv.id, + adv.state_info.hostname, + adv.addr, + cfg.tag, + cfg.datacenter, + cfg.n_tokens, + (now_msec() - adv.last_seen)/1000, + ); + } + } for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { println!( - "{:?}\t{}\t{}\t{}", + "{:?}\t{}\t{}\t{}\tnever seen", id, cfg.tag, cfg.datacenter, cfg.n_tokens ); } } } - if status - .iter() - .any(|adv| !config.members.contains_key(&adv.id)) - { - println!("\nUnconfigured nodes:"); - for adv in status.iter() { - if !config.members.contains_key(&adv.id) { - println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr); - } - } - } - Ok(()) } @@ -338,7 +350,7 @@ async fn cmd_configure( ) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseNodesUp(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -359,20 +371,30 @@ async fn cmd_configure( let mut config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - config.members.insert( - candidates[0].clone(), - NetworkConfigEntry { - datacenter: args.datacenter, - n_tokens: args.n_tokens, - tag: args.tag, + let new_entry = match config.members.get(&candidates[0]) { + None => NetworkConfigEntry { + datacenter: args + .datacenter + .expect("Please specifiy a datacenter with the -d flag"), + n_tokens: args + .n_tokens + .expect("Please specifiy a number of tokens with the -n flag"), + tag: args.tag.unwrap_or("".to_string()), + }, + Some(old) => NetworkConfigEntry { + datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()), + n_tokens: args.n_tokens.unwrap_or(old.n_tokens), + tag: args.tag.unwrap_or(old.tag.to_string()), }, - ); + }; + + config.members.insert(candidates[0].clone(), new_entry); config.version += 1; rpc_cli @@ -381,7 +403,7 @@ async fn cmd_configure( &Message::AdvertiseConfig(config), ADMIN_RPC_TIMEOUT, ) - .await?; + .await??; Ok(()) } @@ -392,7 +414,7 @@ async fn cmd_remove( ) -> Result<(), Error> { let mut config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await? + .await?? { Message::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -427,7 +449,7 @@ async fn cmd_remove( &Message::AdvertiseConfig(config), ADMIN_RPC_TIMEOUT, ) - .await?; + .await??; Ok(()) } @@ -436,7 +458,7 @@ async fn cmd_admin( rpc_host: SocketAddr, args: AdminRPC, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? { + match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { AdminRPC::Ok(msg) => { println!("{}", msg); } diff --git a/src/membership.rs b/src/membership.rs index c0c88a43..87b065a7 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -4,6 +4,7 @@ use std::hash::Hasher; use std::io::Read; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -25,7 +26,7 @@ use crate::server::Config; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); -const MAX_FAILED_PINGS: usize = 3; +const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; @@ -56,6 +57,10 @@ pub struct PingMessage { pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, + + pub is_up: bool, + pub last_seen: u64, + pub state_info: StateInfo, } @@ -91,17 +96,24 @@ pub struct System { #[derive(Debug, Clone)] pub struct Status { - pub nodes: HashMap<UUID, StatusEntry>, + pub nodes: HashMap<UUID, Arc<StatusEntry>>, pub hash: Hash, } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct StatusEntry { pub addr: SocketAddr, - pub remaining_ping_attempts: usize, + pub last_seen: u64, + pub num_failures: AtomicUsize, pub state_info: StateInfo, } +impl StatusEntry { + pub fn is_up(&self) -> bool { + self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StateInfo { pub hostname: String, @@ -126,11 +138,12 @@ impl Status { let addr = SocketAddr::new(ip, info.rpc_port); let old_status = self.nodes.insert( info.id, - StatusEntry { + Arc::new(StatusEntry { addr, - remaining_ping_attempts: MAX_FAILED_PINGS, + last_seen: now_msec(), + num_failures: AtomicUsize::from(0), state_info: info.state_info.clone(), - }, + }), ); match old_status { None => { @@ -427,13 +440,15 @@ impl System { let mut to_advertise = vec![]; for (id_option, addr, ping_resp) in ping_resps { - if let Ok(Message::Ping(info)) = ping_resp { + if let Ok(Ok(Message::Ping(info))) = ping_resp { let is_new = status.handle_ping(addr.ip(), &info); if is_new { has_changes = true; to_advertise.push(AdvertisedNode { id: info.id, addr: *addr, + is_up: true, + last_seen: now_msec(), state_info: info.state_info.clone(), }); } @@ -446,21 +461,16 @@ impl System { .spawn_cancellable(self.clone().pull_config(info.id).map(Ok)); } } else if let Some(id) = id_option { - let remaining_attempts = status - .nodes - .get(id) - .map(|x| x.remaining_ping_attempts) - .unwrap_or(0); - if remaining_attempts == 0 { - warn!( - "Removing node {} after too many failed pings", - hex::encode(&id) - ); - status.nodes.remove(&id); - has_changes = true; - } else { - if let Some(st) = status.nodes.get_mut(id) { - st.remaining_ping_attempts = remaining_attempts - 1; + if let Some(st) = status.nodes.get_mut(id) { + st.num_failures.fetch_add(1, Ordering::SeqCst); + if !st.is_up() { + warn!("Node {:?} seems to be down.", id); + if !ring.config.members.contains_key(id) { + info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id); + drop(st); + status.nodes.remove(&id); + has_changes = true; + } } } } @@ -521,6 +531,8 @@ impl System { mem.push(AdvertisedNode { id: *node, addr: status.addr, + is_up: status.is_up(), + last_seen: status.last_seen, state_info, }); } @@ -548,18 +560,27 @@ impl System { let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port()); let old_self = status.nodes.insert( node.id, - StatusEntry { + Arc::new(StatusEntry { addr: self_addr, - remaining_ping_attempts: MAX_FAILED_PINGS, + last_seen: now_msec(), + num_failures: AtomicUsize::from(0), state_info: self.state_info.clone(), - }, + }), ); has_changed = match old_self { None => true, Some(x) => x.addr != self_addr, }; - } else if !status.nodes.contains_key(&node.id) { - to_ping.push((node.addr, Some(node.id))); + } else { + let ping_them = match status.nodes.get(&node.id) { + // Case 1: new node + None => true, + // Case 2: the node might have changed address + Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr, + }; + if ping_them { + to_ping.push((node.addr, Some(node.id))); + } } } if has_changed { @@ -580,8 +601,8 @@ impl System { self: Arc<Self>, adv: &NetworkConfig, ) -> Result<Message, Error> { - let mut ring: Ring = self.ring.borrow().as_ref().clone(); let update_lock = self.update_lock.lock().await; + let mut ring: Ring = self.ring.borrow().as_ref().clone(); if adv.version > ring.config.version { ring.config = adv.clone(); diff --git a/src/rpc_client.rs b/src/rpc_client.rs index e78079c2..c083fcfd 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -2,11 +2,13 @@ use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; use bytes::IntoBuf; +use err_derive::Error; use futures::future::Future; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; @@ -25,6 +27,22 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +#[derive(Debug, Error)] +pub enum RPCError { + #[error(display = "Node is down: {:?}.", _0)] + NodeDown(UUID), + #[error(display = "Timeout: {}", _0)] + Timeout(#[error(source)] tokio::time::Elapsed), + #[error(display = "HTTP error: {}", _0)] + HTTP(#[error(source)] http::Error), + #[error(display = "Hyper error: {}", _0)] + Hyper(#[error(source)] hyper::Error), + #[error(display = "Messagepack encode error: {}", _0)] + RMPEncode(#[error(source)] rmp_serde::encode::Error), + #[error(display = "Messagepack decode error: {}", _0)] + RMPDecode(#[error(source)] rmp_serde::decode::Error), +} + #[derive(Copy, Clone)] pub struct RequestStrategy { pub rs_timeout: Duration, @@ -104,19 +122,34 @@ impl<M: RpcMessage + 'static> RpcClient<M> { return local_handler(msg).await; } } - let addr = { - let status = self.status.borrow().clone(); - match status.nodes.get(to.borrow()) { - Some(status) => status.addr, - None => { - return Err(Error::Message(format!( - "Peer ID not found: {:?}", - to.borrow() - ))) + let status = self.status.borrow().clone(); + let node_status = match status.nodes.get(&to) { + Some(node_status) => { + if node_status.is_up() { + node_status + } else { + return Err(Error::from(RPCError::NodeDown(to))); } } + None => { + return Err(Error::Message(format!( + "Peer ID not found: {:?}", + to.borrow() + ))) + } }; - self.rpc_addr_client.call(&addr, msg, timeout).await + match self + .rpc_addr_client + .call(&node_status.addr, msg, timeout) + .await + { + Err(rpc_error) => { + node_status.num_failures.fetch_add(1, Ordering::SeqCst); + // TODO: Save failure info somewhere + Err(Error::from(rpc_error)) + } + Ok(x) => x, + } } pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { @@ -219,7 +252,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { to_addr: &SocketAddr, msg: MB, timeout: Duration, - ) -> Result<M, Error> + ) -> Result<Result<M, Error>, RPCError> where MB: Borrow<M>, { @@ -276,7 +309,7 @@ impl RpcHttpClient { to_addr: &SocketAddr, msg: MB, timeout: Duration, - ) -> Result<M, Error> + ) -> Result<Result<M, Error>, RPCError> where MB: Borrow<M>, M: RpcMessage, @@ -318,13 +351,9 @@ impl RpcHttpClient { let status = resp.status(); let body = hyper::body::to_bytes(resp.into_body()).await?; - match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) { - Err(e) => Err(Error::RPCError( - format!("Invalid reply (deserialize error: {})", e), - status, - )), - Ok(Err(e)) => Err(Error::RPCError(e, status)), - Ok(Ok(x)) => Ok(x), + match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())? { + Err(e) => Ok(Err(Error::RemoteError(e, status))), + Ok(x) => Ok(Ok(x)), } } } |