diff options
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 63 |
1 files changed, 52 insertions, 11 deletions
diff --git a/src/membership.rs b/src/membership.rs index 7aaa0759..aa51e0fa 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,18 +1,20 @@ use std::sync::Arc; +use std::path::PathBuf; +use std::io::{Read}; use std::collections::HashMap; use std::time::Duration; use std::net::{IpAddr, SocketAddr}; +use tokio::prelude::*; use futures::future::join_all; -use hyper::client::Client; use tokio::sync::RwLock; use sha2::{Sha256, Digest}; -use crate::Config; +use crate::server::Config; use crate::error::Error; use crate::data::*; use crate::proto::*; -use crate::rpc::*; +use crate::rpc_client::*; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); @@ -22,7 +24,7 @@ pub struct System { pub config: Config, pub id: UUID, - pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>, + pub rpc_client: RpcClient, pub members: RwLock<Members>, } @@ -73,26 +75,62 @@ pub struct NodeStatus { pub remaining_ping_attempts: usize, } +fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> { + let mut path = metadata_dir.clone(); + path.push("network_config"); + + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(path.as_path())?; + + let mut net_config_bytes = vec![]; + file.read_to_end(&mut net_config_bytes) + .expect("Failure when reading network_config"); + + let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..]) + .expect("Invalid or corrupt network_config file"); + + Ok(net_config) +} impl System { pub fn new(config: Config, id: UUID) -> Self { + let net_config = match read_network_config(&config.metadata_dir) { + Ok(x) => x, + Err(_) => NetworkConfig{ + members: HashMap::new(), + version: 0, + }, + }; let mut members = Members{ status: HashMap::new(), status_hash: [0u8; 32], - config: NetworkConfig{ - members: HashMap::new(), - version: 0, - }, - }; + config: net_config, + }; members.recalculate_status_hash(); System{ config, id, - rpc_client: Client::new(), + rpc_client: RpcClient::new(), members: RwLock::new(members), } } + pub async fn save_network_config(&self) { + let mut path = self.config.metadata_dir.clone(); + path.push("network_config"); + + let members = self.members.read().await; + let data = rmp_serde::encode::to_vec_named(&members.config) + .expect("Error while encoding network config"); + drop(members); + + let mut f = tokio::fs::File::create(path.as_path()).await + .expect("Could not create network_config"); + f.write_all(&data[..]).await + .expect("Could not write network_config"); + } + pub async fn make_ping(&self) -> Message { let members = self.members.read().await; Message::Ping(PingMessage{ @@ -129,7 +167,7 @@ impl System { let sys = self.clone(); let ping_msg_ref = &ping_msg; async move { - (id_option, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await) + (id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await) } })).await; @@ -146,6 +184,7 @@ impl System { to_advertise.push(AdvertisedNode{ id: info.id.clone(), addr: addr.clone(), + datacenter: info.datacenter.clone(), }); } if is_new || members.status_hash != info.status_hash { @@ -208,6 +247,7 @@ impl System { mem.push(AdvertisedNode{ id: node.clone(), addr: status.addr.clone(), + datacenter: status.datacenter.clone(), }); } Ok(Message::AdvertiseNodesUp(mem)) @@ -265,6 +305,7 @@ impl System { if adv.version > members.config.version { members.config = adv.clone(); tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); + self.save_network_config().await; } Ok(Message::Ok) |