aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO6
-rw-r--r--src/error.rs9
-rw-r--r--src/main.rs98
-rw-r--r--src/membership.rs79
-rw-r--r--src/rpc_client.rs67
5 files changed, 164 insertions, 95 deletions
diff --git a/TODO b/TODO
index 3c3682cc..e47be9ab 100644
--- a/TODO
+++ b/TODO
@@ -5,12 +5,6 @@ How are we going to test that our replication method works correctly?
We will have to introduce lots of dummy data and then add/remove nodes many times.
-Improvements
-------------
-
-Membership: keep IP addresses of failed nodes and try to reping them regularly
-
-
Attaining S3 compatibility
--------------------------
diff --git a/src/error.rs b/src/error.rs
index 50a0a44b..d0a46f39 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -3,6 +3,7 @@ use hyper::StatusCode;
use std::io;
use crate::data::Hash;
+use crate::rpc_client::RPCError;
#[derive(Debug, Error)]
pub enum Error {
@@ -33,7 +34,6 @@ pub enum Error {
RMPDecode(#[error(source)] rmp_serde::decode::Error),
#[error(display = "JSON error: {}", _0)]
JSON(#[error(source)] serde_json::error::Error),
-
#[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error),
@@ -43,8 +43,11 @@ pub enum Error {
#[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError),
- #[error(display = "RPC error: {} (status code {})", _0, _1)]
- RPCError(String, StatusCode),
+ #[error(display = "RPC call error: {}", _0)]
+ RPC(#[error(source)] RPCError),
+
+ #[error(display = "Remote error: {} (status code {})", _0, _1)]
+ RemoteError(String, StatusCode),
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
diff --git a/src/main.rs b/src/main.rs
index 8985f181..06f0fe98 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -27,14 +27,16 @@ mod rpc_server;
mod server;
mod tls_util;
-use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use data::*;
use error::Error;
use membership::*;
use rpc_client::*;
@@ -107,14 +109,16 @@ pub struct ConfigureNodeOpt {
node_id: String,
/// Location (datacenter) of the node
- datacenter: String,
+ #[structopt(short = "d", long = "datacenter")]
+ datacenter: Option<String>,
/// Number of tokens
- n_tokens: u32,
+ #[structopt(short = "n", long = "n-tokens")]
+ n_tokens: Option<u32>,
/// Optionnal node tag
- #[structopt(long = "tag", default_value = "")]
- tag: String,
+ #[structopt(short = "t", long = "tag")]
+ tag: Option<String>,
}
#[derive(StructOpt, Debug)]
@@ -276,58 +280,66 @@ async fn main() {
async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
println!("Healthy nodes:");
- for adv in status.iter() {
+ for adv in status.iter().filter(|x| x.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
- "{:?}\t{}\t{}\t{}\t{}\t{}",
+ "{:?}\t{}\t{}\t[{}]\t{}\t{}",
adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens
);
+ } else {
+ println!(
+ "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED",
+ adv.id, adv.state_info.hostname, adv.addr
+ );
}
}
let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
- if config
+ let failure_case_1 = status.iter().any(|x| !x.is_up);
+ let failure_case_2 = config
.members
.iter()
- .any(|(id, _)| !status_keys.contains(id))
- {
+ .any(|(id, _)| !status_keys.contains(id));
+ if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
+ for adv in status.iter().filter(|x| !x.is_up) {
+ if let Some(cfg) = config.members.get(&adv.id) {
+ println!(
+ "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago",
+ adv.id,
+ adv.state_info.hostname,
+ adv.addr,
+ cfg.tag,
+ cfg.datacenter,
+ cfg.n_tokens,
+ (now_msec() - adv.last_seen)/1000,
+ );
+ }
+ }
for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) {
println!(
- "{:?}\t{}\t{}\t{}",
+ "{:?}\t{}\t{}\t{}\tnever seen",
id, cfg.tag, cfg.datacenter, cfg.n_tokens
);
}
}
}
- if status
- .iter()
- .any(|adv| !config.members.contains_key(&adv.id))
- {
- println!("\nUnconfigured nodes:");
- for adv in status.iter() {
- if !config.members.contains_key(&adv.id) {
- println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr);
- }
- }
- }
-
Ok(())
}
@@ -338,7 +350,7 @@ async fn cmd_configure(
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -359,20 +371,30 @@ async fn cmd_configure(
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- config.members.insert(
- candidates[0].clone(),
- NetworkConfigEntry {
- datacenter: args.datacenter,
- n_tokens: args.n_tokens,
- tag: args.tag,
+ let new_entry = match config.members.get(&candidates[0]) {
+ None => NetworkConfigEntry {
+ datacenter: args
+ .datacenter
+ .expect("Please specifiy a datacenter with the -d flag"),
+ n_tokens: args
+ .n_tokens
+ .expect("Please specifiy a number of tokens with the -n flag"),
+ tag: args.tag.unwrap_or("".to_string()),
+ },
+ Some(old) => NetworkConfigEntry {
+ datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()),
+ n_tokens: args.n_tokens.unwrap_or(old.n_tokens),
+ tag: args.tag.unwrap_or(old.tag.to_string()),
},
- );
+ };
+
+ config.members.insert(candidates[0].clone(), new_entry);
config.version += 1;
rpc_cli
@@ -381,7 +403,7 @@ async fn cmd_configure(
&Message::AdvertiseConfig(config),
ADMIN_RPC_TIMEOUT,
)
- .await?;
+ .await??;
Ok(())
}
@@ -392,7 +414,7 @@ async fn cmd_remove(
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await?
+ .await??
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -427,7 +449,7 @@ async fn cmd_remove(
&Message::AdvertiseConfig(config),
ADMIN_RPC_TIMEOUT,
)
- .await?;
+ .await??;
Ok(())
}
@@ -436,7 +458,7 @@ async fn cmd_admin(
rpc_host: SocketAddr,
args: AdminRPC,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? {
+ match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
AdminRPC::Ok(msg) => {
println!("{}", msg);
}
diff --git a/src/membership.rs b/src/membership.rs
index c0c88a43..87b065a7 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -4,6 +4,7 @@ use std::hash::Hasher;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -25,7 +26,7 @@ use crate::server::Config;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
-const MAX_FAILED_PINGS: usize = 3;
+const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
@@ -56,6 +57,10 @@ pub struct PingMessage {
pub struct AdvertisedNode {
pub id: UUID,
pub addr: SocketAddr,
+
+ pub is_up: bool,
+ pub last_seen: u64,
+
pub state_info: StateInfo,
}
@@ -91,17 +96,24 @@ pub struct System {
#[derive(Debug, Clone)]
pub struct Status {
- pub nodes: HashMap<UUID, StatusEntry>,
+ pub nodes: HashMap<UUID, Arc<StatusEntry>>,
pub hash: Hash,
}
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct StatusEntry {
pub addr: SocketAddr,
- pub remaining_ping_attempts: usize,
+ pub last_seen: u64,
+ pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
+impl StatusEntry {
+ pub fn is_up(&self) -> bool {
+ self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateInfo {
pub hostname: String,
@@ -126,11 +138,12 @@ impl Status {
let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.nodes.insert(
info.id,
- StatusEntry {
+ Arc::new(StatusEntry {
addr,
- remaining_ping_attempts: MAX_FAILED_PINGS,
+ last_seen: now_msec(),
+ num_failures: AtomicUsize::from(0),
state_info: info.state_info.clone(),
- },
+ }),
);
match old_status {
None => {
@@ -427,13 +440,15 @@ impl System {
let mut to_advertise = vec![];
for (id_option, addr, ping_resp) in ping_resps {
- if let Ok(Message::Ping(info)) = ping_resp {
+ if let Ok(Ok(Message::Ping(info))) = ping_resp {
let is_new = status.handle_ping(addr.ip(), &info);
if is_new {
has_changes = true;
to_advertise.push(AdvertisedNode {
id: info.id,
addr: *addr,
+ is_up: true,
+ last_seen: now_msec(),
state_info: info.state_info.clone(),
});
}
@@ -446,21 +461,16 @@ impl System {
.spawn_cancellable(self.clone().pull_config(info.id).map(Ok));
}
} else if let Some(id) = id_option {
- let remaining_attempts = status
- .nodes
- .get(id)
- .map(|x| x.remaining_ping_attempts)
- .unwrap_or(0);
- if remaining_attempts == 0 {
- warn!(
- "Removing node {} after too many failed pings",
- hex::encode(&id)
- );
- status.nodes.remove(&id);
- has_changes = true;
- } else {
- if let Some(st) = status.nodes.get_mut(id) {
- st.remaining_ping_attempts = remaining_attempts - 1;
+ if let Some(st) = status.nodes.get_mut(id) {
+ st.num_failures.fetch_add(1, Ordering::SeqCst);
+ if !st.is_up() {
+ warn!("Node {:?} seems to be down.", id);
+ if !ring.config.members.contains_key(id) {
+ info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id);
+ drop(st);
+ status.nodes.remove(&id);
+ has_changes = true;
+ }
}
}
}
@@ -521,6 +531,8 @@ impl System {
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
+ is_up: status.is_up(),
+ last_seen: status.last_seen,
state_info,
});
}
@@ -548,18 +560,27 @@ impl System {
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
let old_self = status.nodes.insert(
node.id,
- StatusEntry {
+ Arc::new(StatusEntry {
addr: self_addr,
- remaining_ping_attempts: MAX_FAILED_PINGS,
+ last_seen: now_msec(),
+ num_failures: AtomicUsize::from(0),
state_info: self.state_info.clone(),
- },
+ }),
);
has_changed = match old_self {
None => true,
Some(x) => x.addr != self_addr,
};
- } else if !status.nodes.contains_key(&node.id) {
- to_ping.push((node.addr, Some(node.id)));
+ } else {
+ let ping_them = match status.nodes.get(&node.id) {
+ // Case 1: new node
+ None => true,
+ // Case 2: the node might have changed address
+ Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr,
+ };
+ if ping_them {
+ to_ping.push((node.addr, Some(node.id)));
+ }
}
}
if has_changed {
@@ -580,8 +601,8 @@ impl System {
self: Arc<Self>,
adv: &NetworkConfig,
) -> Result<Message, Error> {
- let mut ring: Ring = self.ring.borrow().as_ref().clone();
let update_lock = self.update_lock.lock().await;
+ let mut ring: Ring = self.ring.borrow().as_ref().clone();
if adv.version > ring.config.version {
ring.config = adv.clone();
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index e78079c2..c083fcfd 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -2,11 +2,13 @@ use std::borrow::Borrow;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::pin::Pin;
+use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use bytes::IntoBuf;
+use err_derive::Error;
use futures::future::Future;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
@@ -25,6 +27,22 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+#[derive(Debug, Error)]
+pub enum RPCError {
+ #[error(display = "Node is down: {:?}.", _0)]
+ NodeDown(UUID),
+ #[error(display = "Timeout: {}", _0)]
+ Timeout(#[error(source)] tokio::time::Elapsed),
+ #[error(display = "HTTP error: {}", _0)]
+ HTTP(#[error(source)] http::Error),
+ #[error(display = "Hyper error: {}", _0)]
+ Hyper(#[error(source)] hyper::Error),
+ #[error(display = "Messagepack encode error: {}", _0)]
+ RMPEncode(#[error(source)] rmp_serde::encode::Error),
+ #[error(display = "Messagepack decode error: {}", _0)]
+ RMPDecode(#[error(source)] rmp_serde::decode::Error),
+}
+
#[derive(Copy, Clone)]
pub struct RequestStrategy {
pub rs_timeout: Duration,
@@ -104,19 +122,34 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
return local_handler(msg).await;
}
}
- let addr = {
- let status = self.status.borrow().clone();
- match status.nodes.get(to.borrow()) {
- Some(status) => status.addr,
- None => {
- return Err(Error::Message(format!(
- "Peer ID not found: {:?}",
- to.borrow()
- )))
+ let status = self.status.borrow().clone();
+ let node_status = match status.nodes.get(&to) {
+ Some(node_status) => {
+ if node_status.is_up() {
+ node_status
+ } else {
+ return Err(Error::from(RPCError::NodeDown(to)));
}
}
+ None => {
+ return Err(Error::Message(format!(
+ "Peer ID not found: {:?}",
+ to.borrow()
+ )))
+ }
};
- self.rpc_addr_client.call(&addr, msg, timeout).await
+ match self
+ .rpc_addr_client
+ .call(&node_status.addr, msg, timeout)
+ .await
+ {
+ Err(rpc_error) => {
+ node_status.num_failures.fetch_add(1, Ordering::SeqCst);
+ // TODO: Save failure info somewhere
+ Err(Error::from(rpc_error))
+ }
+ Ok(x) => x,
+ }
}
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
@@ -219,7 +252,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<M, Error>
+ ) -> Result<Result<M, Error>, RPCError>
where
MB: Borrow<M>,
{
@@ -276,7 +309,7 @@ impl RpcHttpClient {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<M, Error>
+ ) -> Result<Result<M, Error>, RPCError>
where
MB: Borrow<M>,
M: RpcMessage,
@@ -318,13 +351,9 @@ impl RpcHttpClient {
let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?;
- match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) {
- Err(e) => Err(Error::RPCError(
- format!("Invalid reply (deserialize error: {})", e),
- status,
- )),
- Ok(Err(e)) => Err(Error::RPCError(e, status)),
- Ok(Ok(x)) => Ok(x),
+ match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())? {
+ Err(e) => Ok(Err(Error::RemoteError(e, status))),
+ Ok(x) => Ok(Ok(x)),
}
}
}