diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 6 | ||||
-rw-r--r-- | src/membership.rs | 36 | ||||
-rw-r--r-- | src/rpc_client.rs | 9 |
3 files changed, 40 insertions, 11 deletions
diff --git a/src/main.rs b/src/main.rs index 08f37dd5..0aab9e2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -247,8 +247,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { println!( - "{:?}\t{}\t{}\t{}", - adv.id, cfg.datacenter, cfg.n_tokens, adv.addr + "{:?}\t{}\t{}\t{}\t{}", + adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens ); } } @@ -274,7 +274,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re println!("\nUnconfigured nodes:"); for adv in status.iter() { if !config.members.contains_key(&adv.id) { - println!("{:?}\t{}", adv.id, adv.addr); + println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr); } } } diff --git a/src/membership.rs b/src/membership.rs index 99b0388d..412a83f8 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -48,12 +48,15 @@ pub struct PingMessage { pub status_hash: Hash, pub config_version: u64, + + pub state_info: StateInfo, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, + pub state_info: StateInfo, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -72,6 +75,8 @@ pub struct System { pub config: Config, pub id: UUID, + pub state_info: StateInfo, + pub rpc_http_client: Arc<RpcHttpClient>, rpc_client: Arc<RpcClient<Message>>, @@ -85,14 +90,20 @@ pub struct System { #[derive(Debug, Clone)] pub struct Status { - pub nodes: HashMap<UUID, NodeStatus>, + pub nodes: HashMap<UUID, StatusEntry>, pub hash: Hash, } #[derive(Debug, Clone)] -pub struct NodeStatus { +pub struct StatusEntry { pub addr: SocketAddr, pub remaining_ping_attempts: usize, + pub state_info: StateInfo, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateInfo { + pub hostname: String, } #[derive(Clone)] @@ -114,9 +125,10 @@ impl Status { let addr = SocketAddr::new(ip, info.rpc_port); let old_status = self.nodes.insert( info.id.clone(), - NodeStatus { + StatusEntry { addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, + state_info: info.state_info.clone(), }, ); match old_status { @@ -268,6 +280,12 @@ impl System { status.recalculate_hash(); let (update_status, status) = watch::channel(Arc::new(status)); + let state_info = StateInfo { + hostname: gethostname::gethostname() + .into_string() + .unwrap_or("<invalid utf-8>".to_string()), + }; + let mut ring = Ring { config: net_config, ring: Vec::new(), @@ -289,6 +307,7 @@ impl System { let sys = Arc::new(System { config, id, + state_info, rpc_http_client, rpc_client, status, @@ -346,6 +365,7 @@ impl System { rpc_port: self.config.rpc_port, status_hash: status.hash.clone(), config_version: ring.config.version, + state_info: self.state_info.clone(), }) } @@ -408,6 +428,7 @@ impl System { to_advertise.push(AdvertisedNode { id: info.id.clone(), addr: addr.clone(), + state_info: info.state_info.clone(), }); } if is_new || status.hash != info.status_hash { @@ -486,9 +507,15 @@ impl System { let status = self.status.borrow().clone(); let mut mem = vec![]; for (node, status) in status.nodes.iter() { + let state_info = if *node == self.id { + self.state_info.clone() + } else { + status.state_info.clone() + }; mem.push(AdvertisedNode { id: node.clone(), addr: status.addr.clone(), + state_info, }); } Ok(Message::AdvertiseNodesUp(mem)) @@ -515,9 +542,10 @@ impl System { let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); let old_self = status.nodes.insert( node.id.clone(), - NodeStatus { + StatusEntry { addr: self_addr, remaining_ping_attempts: MAX_FAILED_PINGS, + state_info: self.state_info.clone(), }, ); has_changed = match old_self { diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 35debb53..b2a0cf22 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -231,10 +231,11 @@ impl RpcHttpClient { let status = resp.status(); let body = hyper::body::to_bytes(resp.into_body()).await?; match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) { - Err(e) => - Err(Error::RPCError(format!("Invalid reply"), status)), - Ok(Err(e)) => - Err(Error::RPCError(e, status)), + Err(e) => Err(Error::RPCError( + format!("Invalid reply (deserialize error: {})", e), + status, + )), + Ok(Err(e)) => Err(Error::RPCError(e, status)), Ok(Ok(x)) => Ok(x), } } |