aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock11
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs6
-rw-r--r--src/membership.rs36
-rw-r--r--src/rpc_client.rs9
5 files changed, 52 insertions, 11 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 52e67f5f..a1ae7289 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -304,6 +304,7 @@ dependencies = [
"futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -333,6 +334,15 @@ dependencies = [
]
[[package]]
+name = "gethostname"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1273,6 +1283,7 @@ dependencies = [
"checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
"checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
+"checksum gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028"
"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
"checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
diff --git a/Cargo.toml b/Cargo.toml
index a66a712f..72528b98 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,7 @@ async-trait = "0.1.30"
reduce = "0.1.2"
serde_json = "1.0"
arc-swap = "0.4"
+gethostname = "0.2"
rustls = "0.17"
tokio-rustls = "0.13"
diff --git a/src/main.rs b/src/main.rs
index 08f37dd5..0aab9e2a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -247,8 +247,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
- "{:?}\t{}\t{}\t{}",
- adv.id, cfg.datacenter, cfg.n_tokens, adv.addr
+ "{:?}\t{}\t{}\t{}\t{}",
+ adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens
);
}
}
@@ -274,7 +274,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nUnconfigured nodes:");
for adv in status.iter() {
if !config.members.contains_key(&adv.id) {
- println!("{:?}\t{}", adv.id, adv.addr);
+ println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr);
}
}
}
diff --git a/src/membership.rs b/src/membership.rs
index 99b0388d..412a83f8 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -48,12 +48,15 @@ pub struct PingMessage {
pub status_hash: Hash,
pub config_version: u64,
+
+ pub state_info: StateInfo,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
pub id: UUID,
pub addr: SocketAddr,
+ pub state_info: StateInfo,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -72,6 +75,8 @@ pub struct System {
pub config: Config,
pub id: UUID,
+ pub state_info: StateInfo,
+
pub rpc_http_client: Arc<RpcHttpClient>,
rpc_client: Arc<RpcClient<Message>>,
@@ -85,14 +90,20 @@ pub struct System {
#[derive(Debug, Clone)]
pub struct Status {
- pub nodes: HashMap<UUID, NodeStatus>,
+ pub nodes: HashMap<UUID, StatusEntry>,
pub hash: Hash,
}
#[derive(Debug, Clone)]
-pub struct NodeStatus {
+pub struct StatusEntry {
pub addr: SocketAddr,
pub remaining_ping_attempts: usize,
+ pub state_info: StateInfo,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StateInfo {
+ pub hostname: String,
}
#[derive(Clone)]
@@ -114,9 +125,10 @@ impl Status {
let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.nodes.insert(
info.id.clone(),
- NodeStatus {
+ StatusEntry {
addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
+ state_info: info.state_info.clone(),
},
);
match old_status {
@@ -268,6 +280,12 @@ impl System {
status.recalculate_hash();
let (update_status, status) = watch::channel(Arc::new(status));
+ let state_info = StateInfo {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or("<invalid utf-8>".to_string()),
+ };
+
let mut ring = Ring {
config: net_config,
ring: Vec::new(),
@@ -289,6 +307,7 @@ impl System {
let sys = Arc::new(System {
config,
id,
+ state_info,
rpc_http_client,
rpc_client,
status,
@@ -346,6 +365,7 @@ impl System {
rpc_port: self.config.rpc_port,
status_hash: status.hash.clone(),
config_version: ring.config.version,
+ state_info: self.state_info.clone(),
})
}
@@ -408,6 +428,7 @@ impl System {
to_advertise.push(AdvertisedNode {
id: info.id.clone(),
addr: addr.clone(),
+ state_info: info.state_info.clone(),
});
}
if is_new || status.hash != info.status_hash {
@@ -486,9 +507,15 @@ impl System {
let status = self.status.borrow().clone();
let mut mem = vec![];
for (node, status) in status.nodes.iter() {
+ let state_info = if *node == self.id {
+ self.state_info.clone()
+ } else {
+ status.state_info.clone()
+ };
mem.push(AdvertisedNode {
id: node.clone(),
addr: status.addr.clone(),
+ state_info,
});
}
Ok(Message::AdvertiseNodesUp(mem))
@@ -515,9 +542,10 @@ impl System {
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
let old_self = status.nodes.insert(
node.id.clone(),
- NodeStatus {
+ StatusEntry {
addr: self_addr,
remaining_ping_attempts: MAX_FAILED_PINGS,
+ state_info: self.state_info.clone(),
},
);
has_changed = match old_self {
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 35debb53..b2a0cf22 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -231,10 +231,11 @@ impl RpcHttpClient {
let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?;
match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) {
- Err(e) =>
- Err(Error::RPCError(format!("Invalid reply"), status)),
- Ok(Err(e)) =>
- Err(Error::RPCError(e, status)),
+ Err(e) => Err(Error::RPCError(
+ format!("Invalid reply (deserialize error: {})", e),
+ status,
+ )),
+ Ok(Err(e)) => Err(Error::RPCError(e, status)),
Ok(Ok(x)) => Ok(x),
}
}