aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs63
1 files changed, 52 insertions, 11 deletions
diff --git a/src/membership.rs b/src/membership.rs
index 7aaa0759..aa51e0fa 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -1,18 +1,20 @@
use std::sync::Arc;
+use std::path::PathBuf;
+use std::io::{Read};
use std::collections::HashMap;
use std::time::Duration;
use std::net::{IpAddr, SocketAddr};
+use tokio::prelude::*;
use futures::future::join_all;
-use hyper::client::Client;
use tokio::sync::RwLock;
use sha2::{Sha256, Digest};
-use crate::Config;
+use crate::server::Config;
use crate::error::Error;
use crate::data::*;
use crate::proto::*;
-use crate::rpc::*;
+use crate::rpc_client::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
@@ -22,7 +24,7 @@ pub struct System {
pub config: Config,
pub id: UUID,
- pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>,
+ pub rpc_client: RpcClient,
pub members: RwLock<Members>,
}
@@ -73,26 +75,62 @@ pub struct NodeStatus {
pub remaining_ping_attempts: usize,
}
+fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
+ let mut path = metadata_dir.clone();
+ path.push("network_config");
+
+ let mut file = std::fs::OpenOptions::new()
+ .read(true)
+ .open(path.as_path())?;
+
+ let mut net_config_bytes = vec![];
+ file.read_to_end(&mut net_config_bytes)
+ .expect("Failure when reading network_config");
+
+ let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
+ .expect("Invalid or corrupt network_config file");
+
+ Ok(net_config)
+}
impl System {
pub fn new(config: Config, id: UUID) -> Self {
+ let net_config = match read_network_config(&config.metadata_dir) {
+ Ok(x) => x,
+ Err(_) => NetworkConfig{
+ members: HashMap::new(),
+ version: 0,
+ },
+ };
let mut members = Members{
status: HashMap::new(),
status_hash: [0u8; 32],
- config: NetworkConfig{
- members: HashMap::new(),
- version: 0,
- },
- };
+ config: net_config,
+ };
members.recalculate_status_hash();
System{
config,
id,
- rpc_client: Client::new(),
+ rpc_client: RpcClient::new(),
members: RwLock::new(members),
}
}
+ pub async fn save_network_config(&self) {
+ let mut path = self.config.metadata_dir.clone();
+ path.push("network_config");
+
+ let members = self.members.read().await;
+ let data = rmp_serde::encode::to_vec_named(&members.config)
+ .expect("Error while encoding network config");
+ drop(members);
+
+ let mut f = tokio::fs::File::create(path.as_path()).await
+ .expect("Could not create network_config");
+ f.write_all(&data[..]).await
+ .expect("Could not write network_config");
+ }
+
pub async fn make_ping(&self) -> Message {
let members = self.members.read().await;
Message::Ping(PingMessage{
@@ -129,7 +167,7 @@ impl System {
let sys = self.clone();
let ping_msg_ref = &ping_msg;
async move {
- (id_option, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
+ (id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await)
}
})).await;
@@ -146,6 +184,7 @@ impl System {
to_advertise.push(AdvertisedNode{
id: info.id.clone(),
addr: addr.clone(),
+ datacenter: info.datacenter.clone(),
});
}
if is_new || members.status_hash != info.status_hash {
@@ -208,6 +247,7 @@ impl System {
mem.push(AdvertisedNode{
id: node.clone(),
addr: status.addr.clone(),
+ datacenter: status.datacenter.clone(),
});
}
Ok(Message::AdvertiseNodesUp(mem))
@@ -265,6 +305,7 @@ impl System {
if adv.version > members.config.version {
members.config = adv.clone();
tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT));
+ self.save_network_config().await;
}
Ok(Message::Ok)