aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs2
-rw-r--r--src/membership.rs110
-rw-r--r--src/rpc.rs5
3 files changed, 91 insertions, 26 deletions
diff --git a/src/main.rs b/src/main.rs
index 4448d535..0fcda4e0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -103,7 +103,7 @@ async fn main() {
let api_server = api::run_api_server(sys.clone(), wait_from(rx2));
tokio::spawn(shutdown_signal(vec![tx1, tx2]));
- tokio::spawn(membership::bootstrap(sys));
+ tokio::spawn(sys.bootstrap());
let (e1, e2) = futures::join![rpc_server, api_server];
diff --git a/src/membership.rs b/src/membership.rs
index c14d370b..2c9cd1ca 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -1,10 +1,13 @@
use std::sync::Arc;
use std::collections::HashMap;
use std::time::Duration;
-use std::net::SocketAddr;
+use std::net::{IpAddr, SocketAddr};
+use futures::future::join_all;
+use futures::stream::StreamExt;
use hyper::client::Client;
use tokio::sync::RwLock;
+use sha2::{Sha256, Digest};
use crate::Config;
use crate::error::Error;
@@ -28,14 +31,39 @@ pub struct System {
pub struct Members {
pub present: Vec<UUID>,
pub status: HashMap<UUID, NodeStatus>,
+ pub status_hash: Hash,
pub config: HashMap<UUID, NodeConfig>,
pub config_version: u64,
}
+impl Members {
+ fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) {
+ match self.present.binary_search(&info.id) {
+ Ok(pos) => {}
+ Err(pos) => self.present.insert(pos, info.id.clone()),
+ }
+ self.status.insert(info.id.clone(),
+ NodeStatus{
+ addr: SocketAddr::new(ip, info.rpc_port),
+ remaining_ping_attempts: MAX_FAILED_PINGS,
+ });
+ }
+
+ fn recalculate_status_hash(&mut self) {
+ let mut hasher = Sha256::new();
+ for node in self.present.iter() {
+ if let Some(status) = self.status.get(node) {
+ hasher.input(format!("{} {}\n", hex::encode(node), status.addr));
+ }
+ }
+ self.status_hash.copy_from_slice(&hasher.result()[..]);
+ }
+}
+
pub struct NodeStatus {
pub addr: SocketAddr,
- remaining_ping_attempts: usize,
+ pub remaining_ping_attempts: usize,
}
pub struct NodeConfig {
@@ -51,36 +79,74 @@ impl System {
members: RwLock::new(Members{
present: Vec::new(),
status: HashMap::new(),
+ status_hash: [0u8; 32],
config: HashMap::new(),
config_version: 0,
}),
}
}
+ pub async fn make_ping(&self) -> Message {
+ Message::Ping(PingMessage{
+ id: self.id,
+ rpc_port: self.config.rpc_port,
+ present_hash: self.members.read().await.status_hash.clone(),
+ config_version: 0,
+ })
+ }
+
pub async fn broadcast(&self) -> Vec<UUID> {
self.members.read().await.present.clone()
}
-}
-pub async fn bootstrap(system: Arc<System>) {
- rpc_call_many_addr(system.clone(),
- &system.config.bootstrap_peers,
- &Message::Ping(PingMessage{
- id: system.id,
- rpc_port: system.config.rpc_port,
- present_hash: [0u8; 32],
- config_version: 0,
- }),
- None,
- PING_TIMEOUT).await;
-
- unimplemented!() //TODO
-}
+ pub async fn bootstrap(self: Arc<Self>) {
+ let ping_msg = self.make_ping().await;
+ let ping_resps = join_all(
+ self.config.bootstrap_peers.iter().cloned()
+ .map(|to| {
+ let sys = self.clone();
+ let ping_msg_ref = &ping_msg;
+ async move {
+ (to.clone(), rpc_call_addr(sys, &to, ping_msg_ref, PING_TIMEOUT).await)
+ }
+ })).await;
+
+ let mut members = self.members.write().await;
+ for (addr, ping_resp) in ping_resps {
+ if let Ok(Message::Ping(info)) = ping_resp {
+ members.handle_ping(addr.ip(), &info);
+
+ }
+ }
+ members.recalculate_status_hash();
+ drop(members);
-pub async fn handle_ping(sys: Arc<System>, from: &SocketAddr, ping: &PingMessage) -> Result<Message, Error> {
- unimplemented!() //TODO
-}
+ let resps = rpc_call_many_addr(self.clone(),
+ &self.config.bootstrap_peers,
+ &ping_msg,
+ None,
+ PING_TIMEOUT).await;
-pub async fn handle_advertise_node(sys: Arc<System>, ping: &AdvertiseNodeMessage) -> Result<Message, Error> {
- unimplemented!() //TODO
+ unimplemented!() //TODO
+ }
+
+ pub async fn handle_ping(self: Arc<Self>,
+ from: &SocketAddr,
+ ping: &PingMessage)
+ -> Result<Message, Error>
+ {
+ let mut members = self.members.write().await;
+ members.handle_ping(from.ip(), ping);
+ members.recalculate_status_hash();
+ drop(members);
+
+ Ok(self.make_ping().await)
+ }
+
+ pub async fn handle_advertise_node(self: Arc<Self>,
+ ping: &AdvertiseNodeMessage)
+ -> Result<Message, Error>
+ {
+ unimplemented!() //TODO
+ }
}
diff --git a/src/rpc.rs b/src/rpc.rs
index 4ba32c4c..5e72d0f0 100644
--- a/src/rpc.rs
+++ b/src/rpc.rs
@@ -14,7 +14,6 @@ use crate::data::*;
use crate::error::Error;
use crate::proto::Message;
use crate::membership::System;
-use crate::membership;
// ---- CLIENT PART ----
@@ -133,8 +132,8 @@ async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Resu
eprintln!("RPC from {}: {:?}", addr, msg);
let resp = err_to_msg(match &msg {
- Message::Ping(ping) => membership::handle_ping(sys, &addr, ping).await,
- Message::AdvertiseNode(adv) => membership::handle_advertise_node(sys, adv).await,
+ Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
+ Message::AdvertiseNode(adv) => sys.handle_advertise_node(adv).await,
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});