From 3c36b449a3786bb62fa023bde37bac24635b5717 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 6 Apr 2020 21:02:15 +0200 Subject: Some work --- src/main.rs | 2 +- src/membership.rs | 110 +++++++++++++++++++++++++++++++++++++++++++----------- src/rpc.rs | 5 +-- 3 files changed, 91 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 4448d535..0fcda4e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -103,7 +103,7 @@ async fn main() { let api_server = api::run_api_server(sys.clone(), wait_from(rx2)); tokio::spawn(shutdown_signal(vec![tx1, tx2])); - tokio::spawn(membership::bootstrap(sys)); + tokio::spawn(sys.bootstrap()); let (e1, e2) = futures::join![rpc_server, api_server]; diff --git a/src/membership.rs b/src/membership.rs index c14d370b..2c9cd1ca 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,10 +1,13 @@ use std::sync::Arc; use std::collections::HashMap; use std::time::Duration; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; +use futures::future::join_all; +use futures::stream::StreamExt; use hyper::client::Client; use tokio::sync::RwLock; +use sha2::{Sha256, Digest}; use crate::Config; use crate::error::Error; @@ -28,14 +31,39 @@ pub struct System { pub struct Members { pub present: Vec, pub status: HashMap, + pub status_hash: Hash, pub config: HashMap, pub config_version: u64, } +impl Members { + fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) { + match self.present.binary_search(&info.id) { + Ok(pos) => {} + Err(pos) => self.present.insert(pos, info.id.clone()), + } + self.status.insert(info.id.clone(), + NodeStatus{ + addr: SocketAddr::new(ip, info.rpc_port), + remaining_ping_attempts: MAX_FAILED_PINGS, + }); + } + + fn recalculate_status_hash(&mut self) { + let mut hasher = Sha256::new(); + for node in self.present.iter() { + if let Some(status) = self.status.get(node) { + hasher.input(format!("{} {}\n", hex::encode(node), status.addr)); + } + } + self.status_hash.copy_from_slice(&hasher.result()[..]); + } +} + pub struct NodeStatus { pub addr: SocketAddr, - remaining_ping_attempts: usize, + pub remaining_ping_attempts: usize, } pub struct NodeConfig { @@ -51,36 +79,74 @@ impl System { members: RwLock::new(Members{ present: Vec::new(), status: HashMap::new(), + status_hash: [0u8; 32], config: HashMap::new(), config_version: 0, }), } } + pub async fn make_ping(&self) -> Message { + Message::Ping(PingMessage{ + id: self.id, + rpc_port: self.config.rpc_port, + present_hash: self.members.read().await.status_hash.clone(), + config_version: 0, + }) + } + pub async fn broadcast(&self) -> Vec { self.members.read().await.present.clone() } -} -pub async fn bootstrap(system: Arc) { - rpc_call_many_addr(system.clone(), - &system.config.bootstrap_peers, - &Message::Ping(PingMessage{ - id: system.id, - rpc_port: system.config.rpc_port, - present_hash: [0u8; 32], - config_version: 0, - }), - None, - PING_TIMEOUT).await; - - unimplemented!() //TODO -} + pub async fn bootstrap(self: Arc) { + let ping_msg = self.make_ping().await; + let ping_resps = join_all( + self.config.bootstrap_peers.iter().cloned() + .map(|to| { + let sys = self.clone(); + let ping_msg_ref = &ping_msg; + async move { + (to.clone(), rpc_call_addr(sys, &to, ping_msg_ref, PING_TIMEOUT).await) + } + })).await; + + let mut members = self.members.write().await; + for (addr, ping_resp) in ping_resps { + if let Ok(Message::Ping(info)) = ping_resp { + members.handle_ping(addr.ip(), &info); + + } + } + members.recalculate_status_hash(); + drop(members); -pub async fn handle_ping(sys: Arc, from: &SocketAddr, ping: &PingMessage) -> Result { - unimplemented!() //TODO -} + let resps = rpc_call_many_addr(self.clone(), + &self.config.bootstrap_peers, + &ping_msg, + None, + PING_TIMEOUT).await; -pub async fn handle_advertise_node(sys: Arc, ping: &AdvertiseNodeMessage) -> Result { - unimplemented!() //TODO + unimplemented!() //TODO + } + + pub async fn handle_ping(self: Arc, + from: &SocketAddr, + ping: &PingMessage) + -> Result + { + let mut members = self.members.write().await; + members.handle_ping(from.ip(), ping); + members.recalculate_status_hash(); + drop(members); + + Ok(self.make_ping().await) + } + + pub async fn handle_advertise_node(self: Arc, + ping: &AdvertiseNodeMessage) + -> Result + { + unimplemented!() //TODO + } } diff --git a/src/rpc.rs b/src/rpc.rs index 4ba32c4c..5e72d0f0 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -14,7 +14,6 @@ use crate::data::*; use crate::error::Error; use crate::proto::Message; use crate::membership::System; -use crate::membership; // ---- CLIENT PART ---- @@ -133,8 +132,8 @@ async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Resu eprintln!("RPC from {}: {:?}", addr, msg); let resp = err_to_msg(match &msg { - Message::Ping(ping) => membership::handle_ping(sys, &addr, ping).await, - Message::AdvertiseNode(adv) => membership::handle_advertise_node(sys, adv).await, + Message::Ping(ping) => sys.handle_ping(&addr, ping).await, + Message::AdvertiseNode(adv) => sys.handle_advertise_node(adv).await, _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), }); -- cgit v1.2.3