aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs6
-rw-r--r--src/membership.rs36
-rw-r--r--src/rpc_client.rs9
3 files changed, 40 insertions, 11 deletions
diff --git a/src/main.rs b/src/main.rs
index 08f37dd5..0aab9e2a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -247,8 +247,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
- "{:?}\t{}\t{}\t{}",
- adv.id, cfg.datacenter, cfg.n_tokens, adv.addr
+ "{:?}\t{}\t{}\t{}\t{}",
+ adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens
);
}
}
@@ -274,7 +274,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nUnconfigured nodes:");
for adv in status.iter() {
if !config.members.contains_key(&adv.id) {
- println!("{:?}\t{}", adv.id, adv.addr);
+ println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr);
}
}
}
diff --git a/src/membership.rs b/src/membership.rs
index 99b0388d..412a83f8 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -48,12 +48,15 @@ pub struct PingMessage {
pub status_hash: Hash,
pub config_version: u64,
+
+ pub state_info: StateInfo,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
pub id: UUID,
pub addr: SocketAddr,
+ pub state_info: StateInfo,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -72,6 +75,8 @@ pub struct System {
pub config: Config,
pub id: UUID,
+ pub state_info: StateInfo,
+
pub rpc_http_client: Arc<RpcHttpClient>,
rpc_client: Arc<RpcClient<Message>>,
@@ -85,14 +90,20 @@ pub struct System {
#[derive(Debug, Clone)]
pub struct Status {
- pub nodes: HashMap<UUID, NodeStatus>,
+ pub nodes: HashMap<UUID, StatusEntry>,
pub hash: Hash,
}
#[derive(Debug, Clone)]
-pub struct NodeStatus {
+pub struct StatusEntry {
pub addr: SocketAddr,
pub remaining_ping_attempts: usize,
+ pub state_info: StateInfo,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StateInfo {
+ pub hostname: String,
}
#[derive(Clone)]
@@ -114,9 +125,10 @@ impl Status {
let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.nodes.insert(
info.id.clone(),
- NodeStatus {
+ StatusEntry {
addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
+ state_info: info.state_info.clone(),
},
);
match old_status {
@@ -268,6 +280,12 @@ impl System {
status.recalculate_hash();
let (update_status, status) = watch::channel(Arc::new(status));
+ let state_info = StateInfo {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or("<invalid utf-8>".to_string()),
+ };
+
let mut ring = Ring {
config: net_config,
ring: Vec::new(),
@@ -289,6 +307,7 @@ impl System {
let sys = Arc::new(System {
config,
id,
+ state_info,
rpc_http_client,
rpc_client,
status,
@@ -346,6 +365,7 @@ impl System {
rpc_port: self.config.rpc_port,
status_hash: status.hash.clone(),
config_version: ring.config.version,
+ state_info: self.state_info.clone(),
})
}
@@ -408,6 +428,7 @@ impl System {
to_advertise.push(AdvertisedNode {
id: info.id.clone(),
addr: addr.clone(),
+ state_info: info.state_info.clone(),
});
}
if is_new || status.hash != info.status_hash {
@@ -486,9 +507,15 @@ impl System {
let status = self.status.borrow().clone();
let mut mem = vec![];
for (node, status) in status.nodes.iter() {
+ let state_info = if *node == self.id {
+ self.state_info.clone()
+ } else {
+ status.state_info.clone()
+ };
mem.push(AdvertisedNode {
id: node.clone(),
addr: status.addr.clone(),
+ state_info,
});
}
Ok(Message::AdvertiseNodesUp(mem))
@@ -515,9 +542,10 @@ impl System {
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
let old_self = status.nodes.insert(
node.id.clone(),
- NodeStatus {
+ StatusEntry {
addr: self_addr,
remaining_ping_attempts: MAX_FAILED_PINGS,
+ state_info: self.state_info.clone(),
},
);
has_changed = match old_self {
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 35debb53..b2a0cf22 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -231,10 +231,11 @@ impl RpcHttpClient {
let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?;
match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) {
- Err(e) =>
- Err(Error::RPCError(format!("Invalid reply"), status)),
- Ok(Err(e)) =>
- Err(Error::RPCError(e, status)),
+ Err(e) => Err(Error::RPCError(
+ format!("Invalid reply (deserialize error: {})", e),
+ status,
+ )),
+ Ok(Err(e)) => Err(Error::RPCError(e, status)),
Ok(Ok(x)) => Ok(x),
}
}