aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-02 13:30:47 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-02 13:30:47 +0100
commitd4de2ffc40fe9d003f12139053ca070eda0b7221 (patch)
treee95476f0b7a6d1c75cc462b3ea7eee74c4faf09f /src/peering
downloadnetapp-d4de2ffc40fe9d003f12139053ca070eda0b7221.tar.gz
netapp-d4de2ffc40fe9d003f12139053ca070eda0b7221.zip
First commit
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/basalt.rs475
-rw-r--r--src/peering/fullmesh.rs437
-rw-r--r--src/peering/mod.rs2
3 files changed, 914 insertions, 0 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
new file mode 100644
index 0000000..be807a8
--- /dev/null
+++ b/src/peering/basalt.rs
@@ -0,0 +1,475 @@
+use std::collections::HashSet;
+use std::net::SocketAddr;
+use std::sync::{Arc, RwLock};
+use std::time::Duration;
+
+use log::{debug, warn};
+use lru::LruCache;
+use rand::{thread_rng, Rng};
+use serde::{Deserialize, Serialize};
+
+use sodiumoxide::crypto::hash;
+use sodiumoxide::crypto::sign::ed25519;
+
+use crate::conn::*;
+use crate::message::*;
+use crate::netapp::*;
+use crate::proto::*;
+
+// -- Protocol messages --
+
+#[derive(Serialize, Deserialize)]
+struct PullMessage {}
+
+impl Message for PullMessage {
+ const KIND: MessageKind = 0x42001100;
+ type Response = PushMessage;
+}
+
+#[derive(Serialize, Deserialize)]
+struct PushMessage {
+ peers: Vec<Peer>,
+}
+
+impl Message for PushMessage {
+ const KIND: MessageKind = 0x42001101;
+ type Response = ();
+}
+
+// -- Algorithm data structures --
+
+type Seed = [u8; 32];
+
+#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
+struct Peer {
+ id: ed25519::PublicKey,
+ addr: SocketAddr,
+}
+
+type Cost = [u8; 40];
+const MAX_COST: Cost = [0xffu8; 40];
+
+impl Peer {
+ fn cost(&self, seed: &Seed) -> Cost {
+ let mut hasher = hash::State::new();
+ hasher.update(&seed[..]);
+
+ let mut cost = [0u8; 40];
+ match self.addr {
+ SocketAddr::V4(v4addr) => {
+ let v4ip = v4addr.ip().octets();
+
+ for i in 0..4 {
+ let mut h = hasher.clone();
+ h.update(&v4ip[..i + 1]);
+ cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
+ }
+ }
+ SocketAddr::V6(v6addr) => {
+ let v6ip = v6addr.ip().octets();
+
+ for i in 0..4 {
+ let mut h = hasher.clone();
+ h.update(&v6ip[..i + 2]);
+ cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
+ }
+ }
+ }
+
+ {
+ let mut h5 = hasher.clone();
+ h5.update(&format!("{}", self.addr).into_bytes()[..]);
+ cost[32..40].copy_from_slice(&h5.finalize()[..8]);
+ }
+
+ cost
+ }
+}
+
+struct BasaltSlot {
+ seed: Seed,
+ peer: Option<Peer>,
+}
+
+impl BasaltSlot {
+ fn cost(&self) -> Cost {
+ self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST)
+ }
+}
+
+struct BasaltView {
+ i_reset: usize,
+ slots: Vec<BasaltSlot>,
+}
+
+impl BasaltView {
+ fn new(size: usize) -> Self {
+ let slots = (0..size)
+ .map(|_| BasaltSlot {
+ seed: rand_seed(),
+ peer: None,
+ })
+ .collect::<Vec<_>>();
+ Self { i_reset: 0, slots }
+ }
+
+ fn current_peers(&self) -> HashSet<Peer> {
+ self.slots
+ .iter()
+ .filter(|s| s.peer.is_some())
+ .map(|s| s.peer.unwrap().clone())
+ .collect::<HashSet<_>>()
+ }
+ fn current_peers_vec(&self) -> Vec<Peer> {
+ self.current_peers().drain().collect::<Vec<_>>()
+ }
+
+ fn sample(&self, count: usize) -> Vec<Peer> {
+ let possibles = self
+ .slots
+ .iter()
+ .enumerate()
+ .filter(|(_i, s)| s.peer.is_some())
+ .map(|(i, _s)| i)
+ .collect::<Vec<_>>();
+ if possibles.len() == 0 {
+ vec![]
+ } else {
+ let mut ret = vec![];
+ let mut rng = thread_rng();
+ for _i in 0..count {
+ let idx = rng.gen_range(0, possibles.len());
+ ret.push(self.slots[possibles[idx]].peer.unwrap());
+ }
+ ret
+ }
+ }
+
+ fn update_slot(&mut self, i: usize, peers: &[Peer]) {
+ let mut slot_cost = self.slots[i].cost();
+
+ for peer in peers.iter() {
+ let peer_cost = peer.cost(&self.slots[i].seed);
+ if self.slots[i].peer.is_none() || peer_cost < slot_cost {
+ self.slots[i].peer = Some(*peer);
+ slot_cost = peer_cost;
+ }
+ }
+ }
+ fn update_all_slots(&mut self, peers: &[Peer]) {
+ for i in 0..self.slots.len() {
+ self.update_slot(i, peers);
+ }
+ }
+
+ fn disconnected(&mut self, id: ed25519::PublicKey) {
+ let mut cleared_slots = vec![];
+ for i in 0..self.slots.len() {
+ if let Some(p) = self.slots[i].peer {
+ if p.id == id {
+ self.slots[i].peer = None;
+ cleared_slots.push(i);
+ }
+ }
+ }
+
+ let remaining_peers = self.current_peers_vec();
+
+ for i in cleared_slots {
+ self.update_slot(i, &remaining_peers[..]);
+ }
+ }
+
+ fn should_try_list(&self, peers: &[Peer]) -> Vec<Peer> {
+ // Select peers that have lower cost than any of our slots
+ let mut ret = HashSet::new();
+
+ for i in 0..self.slots.len() {
+ if self.slots[i].peer.is_none() {
+ return peers.to_vec();
+ }
+ let mut min_cost = self.slots[i].cost();
+ let mut min_peer = None;
+ for peer in peers.iter() {
+ if ret.contains(peer) {
+ continue;
+ }
+ let peer_cost = peer.cost(&self.slots[i].seed);
+ if peer_cost < min_cost {
+ min_cost = peer_cost;
+ min_peer = Some(*peer);
+ }
+ }
+ if let Some(p) = min_peer {
+ ret.insert(p);
+ if ret.len() == peers.len() {
+ break;
+ }
+ }
+ }
+
+ ret.drain().collect::<Vec<_>>()
+ }
+
+ fn reset_some_slots(&mut self, count: usize) {
+ for _i in 0..count {
+ self.slots[self.i_reset].seed = rand_seed();
+ self.i_reset = (self.i_reset + 1) % self.slots.len();
+ }
+ }
+}
+
+pub struct BasaltParams {
+ pub view_size: usize,
+ pub cache_size: usize,
+ pub exchange_interval: Duration,
+ pub reset_interval: Duration,
+ pub reset_count: usize,
+}
+
+pub struct Basalt {
+ netapp: Arc<NetApp>,
+
+ param: BasaltParams,
+ bootstrap_peers: Vec<Peer>,
+
+ view: RwLock<BasaltView>,
+ current_attempts: RwLock<HashSet<Peer>>,
+ backlog: RwLock<LruCache<Peer, ()>>,
+}
+
+impl Basalt {
+ pub fn new(
+ netapp: Arc<NetApp>,
+ bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
+ param: BasaltParams,
+ ) -> Arc<Self> {
+ let bootstrap_peers = bootstrap_list
+ .iter()
+ .map(|(id, addr)| Peer {
+ id: *id,
+ addr: *addr,
+ })
+ .collect::<Vec<_>>();
+
+ let view = BasaltView::new(param.view_size);
+ let backlog = LruCache::new(param.cache_size);
+
+ let basalt = Arc::new(Self {
+ netapp: netapp.clone(),
+ param,
+ bootstrap_peers,
+ view: RwLock::new(view),
+ current_attempts: RwLock::new(HashSet::new()),
+ backlog: RwLock::new(backlog),
+ });
+
+ let basalt2 = basalt.clone();
+ netapp.on_connected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
+ basalt2.on_connected(pk, addr, is_incoming);
+ },
+ ))));
+
+ let basalt2 = basalt.clone();
+ netapp.on_disconnected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, is_incoming: bool| {
+ basalt2.on_disconnected(pk, is_incoming);
+ },
+ ))));
+
+ let basalt2 = basalt.clone();
+ netapp.add_msg_handler::<PullMessage, _, _>(
+ move |_from: ed25519::PublicKey, _pullmsg: PullMessage| {
+ let push_msg = basalt2.make_push_message();
+ async move { Ok(push_msg) }
+ },
+ );
+
+ let basalt2 = basalt.clone();
+ netapp.add_msg_handler::<PushMessage, _, _>(
+ move |_from: ed25519::PublicKey, push_msg: PushMessage| {
+ basalt2.handle_peer_list(&push_msg.peers[..]);
+ async move { Ok(()) }
+ },
+ );
+
+ basalt
+ }
+
+ pub fn sample(&self, count: usize) -> Vec<ed25519::PublicKey> {
+ self.view
+ .read()
+ .unwrap()
+ .sample(count)
+ .iter()
+ .map(|p| p.id)
+ .collect::<Vec<_>>()
+ }
+
+ pub async fn run(self: Arc<Self>) {
+ for peer in self.bootstrap_peers.iter() {
+ tokio::spawn(self.clone().try_connect(*peer));
+ }
+
+ let pushpull_loop = self.clone().run_pushpull_loop();
+ let reset_loop = self.run_reset_loop();
+ tokio::join!(pushpull_loop, reset_loop);
+ }
+
+ async fn run_pushpull_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::delay_for(self.param.exchange_interval).await;
+
+ let peers = self.view.read().unwrap().sample(2);
+ if peers.len() == 2 {
+ let (c1, c2) = {
+ let client_conns = self.netapp.client_conns.read().unwrap();
+ (
+ client_conns.get(&peers[0].id).cloned(),
+ client_conns.get(&peers[1].id).cloned(),
+ )
+ };
+ if let Some(c) = c1 {
+ tokio::spawn(self.clone().do_pull(c));
+ }
+ if let Some(c) = c2 {
+ tokio::spawn(self.clone().do_push(c));
+ }
+ }
+ }
+ }
+
+ async fn do_pull(self: Arc<Self>, peer: Arc<ClientConn>) {
+ match peer.request(PullMessage {}, prio::NORMAL).await {
+ Ok(resp) => {
+ self.handle_peer_list(&resp.peers[..]);
+ }
+ Err(e) => {
+ warn!("Error during pull exchange: {}", e);
+ }
+ };
+ }
+
+ async fn do_push(self: Arc<Self>, peer: Arc<ClientConn>) {
+ let push_msg = self.make_push_message();
+ if let Err(e) = peer.request(push_msg, prio::NORMAL).await {
+ warn!("Error during push exchange: {}", e);
+ }
+ }
+
+ fn make_push_message(&self) -> PushMessage {
+ let current_peers = self.view.read().unwrap().current_peers_vec();
+ PushMessage {
+ peers: current_peers,
+ }
+ }
+
+ async fn run_reset_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::delay_for(self.param.reset_interval).await;
+
+ {
+ let mut view = self.view.write().unwrap();
+ let prev_peers = view.current_peers();
+ let prev_peers_vec = prev_peers.iter().cloned().collect::<Vec<_>>();
+
+ view.reset_some_slots(self.param.reset_count);
+ view.update_all_slots(&prev_peers_vec[..]);
+
+ let new_peers = view.current_peers();
+ drop(view);
+
+ self.close_all_diff(&prev_peers, &new_peers);
+ }
+
+ let mut to_retry_maybe = self.bootstrap_peers.clone();
+ for (peer, _) in self.backlog.read().unwrap().iter() {
+ if !self.bootstrap_peers.contains(peer) {
+ to_retry_maybe.push(*peer);
+ }
+ }
+ self.handle_peer_list(&to_retry_maybe[..]);
+ }
+ }
+
+ fn handle_peer_list(self: &Arc<Self>, peers: &[Peer]) {
+ let to_connect = self.view.read().unwrap().should_try_list(peers);
+
+ for peer in to_connect.iter() {
+ tokio::spawn(self.clone().try_connect(*peer));
+ }
+ }
+
+ async fn try_connect(self: Arc<Self>, peer: Peer) {
+ {
+ let view = self.view.read().unwrap();
+ let mut attempts = self.current_attempts.write().unwrap();
+
+ if view.slots.iter().any(|x| x.peer == Some(peer)) {
+ return;
+ }
+ if attempts.contains(&peer) {
+ return;
+ }
+
+ attempts.insert(peer);
+ }
+ let res = self.netapp.clone().try_connect(peer.addr, peer.id).await;
+ debug!("Connection attempt to {}: {:?}", peer.addr, res);
+
+ self.current_attempts.write().unwrap().remove(&peer);
+
+ if res.is_err() {
+ self.backlog.write().unwrap().pop(&peer);
+ }
+ }
+
+ fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) {
+ if is_incoming {
+ self.handle_peer_list(&[Peer{id: pk, addr}][..]);
+ } else {
+ let peer = Peer { id: pk, addr };
+
+ let mut backlog = self.backlog.write().unwrap();
+ if backlog.get(&peer).is_none() {
+ backlog.put(peer, ());
+ }
+ drop(backlog);
+
+ let mut view = self.view.write().unwrap();
+ let prev_peers = view.current_peers();
+
+ view.update_all_slots(&[peer][..]);
+
+ let new_peers = view.current_peers();
+ drop(view);
+
+ self.close_all_diff(&prev_peers, &new_peers);
+ }
+ }
+
+ fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) {
+ if !is_incoming {
+ self.view.write().unwrap().disconnected(pk);
+ }
+ }
+
+ fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
+ let client_conns = self.netapp.client_conns.read().unwrap();
+ for peer in prev_peers.iter() {
+ if !new_peers.contains(peer) {
+ if let Some(c) = client_conns.get(&peer.id) {
+ debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr);
+ c.close();
+ }
+ }
+ }
+ }
+}
+
+fn rand_seed() -> Seed {
+ let mut seed = [0u8; 32];
+ sodiumoxide::randombytes::randombytes_into(&mut seed[..]);
+ seed
+}
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
new file mode 100644
index 0000000..e04beb6
--- /dev/null
+++ b/src/peering/fullmesh.rs
@@ -0,0 +1,437 @@
+use std::collections::{HashMap, VecDeque};
+use std::net::SocketAddr;
+use std::sync::atomic::{self, AtomicU64};
+use std::sync::{Arc, RwLock};
+use std::time::{Duration, Instant};
+
+use log::{debug, info, trace, warn};
+use serde::{Deserialize, Serialize};
+
+use sodiumoxide::crypto::hash;
+use sodiumoxide::crypto::sign::ed25519;
+
+use crate::conn::*;
+use crate::message::*;
+use crate::netapp::*;
+use crate::proto::*;
+
+const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
+const CONN_MAX_RETRIES: usize = 10;
+const PING_INTERVAL: Duration = Duration::from_secs(10);
+const LOOP_DELAY: Duration = Duration::from_secs(1);
+
+// -- Protocol messages --
+
+#[derive(Serialize, Deserialize)]
+struct PingMessage {
+ pub id: u64,
+ pub peer_list_hash: hash::Digest,
+}
+
+impl Message for PingMessage {
+ const KIND: MessageKind = 0x42001000;
+ type Response = PingMessage;
+}
+
+#[derive(Serialize, Deserialize)]
+struct PeerListMessage {
+ pub list: Vec<(ed25519::PublicKey, SocketAddr)>,
+}
+
+impl Message for PeerListMessage {
+ const KIND: MessageKind = 0x42001001;
+ type Response = PeerListMessage;
+}
+
+// -- Algorithm data structures --
+
+#[derive(Debug)]
+struct PeerInfo {
+ addr: SocketAddr,
+ state: PeerConnState,
+ last_seen: Option<Instant>,
+ ping: VecDeque<Duration>,
+}
+
+#[derive(Copy, Clone, Debug)]
+pub struct PeerInfoPub {
+ pub id: ed25519::PublicKey,
+ pub addr: SocketAddr,
+ pub state: PeerConnState,
+ pub last_seen: Option<Instant>,
+ pub avg_ping: Option<Duration>,
+ pub max_ping: Option<Duration>,
+ pub med_ping: Option<Duration>,
+}
+
+// PeerConnState: possible states for our tentative connections to given peer
+// This module is only interested in recording connection info for outgoing
+// TCP connections
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum PeerConnState {
+ // This entry represents ourself
+ Ourself,
+
+ // We currently have a connection to this peer
+ Connected,
+
+ // Our next connection tentative (the nth, where n is the first value)
+ // will be at given Instant
+ Waiting(usize, Instant),
+
+ // A connection tentative is in progress
+ Trying(usize),
+
+ // We abandonned trying to connect to this peer (too many failed attempts)
+ Abandonned,
+}
+
+struct KnownHosts {
+ list: HashMap<ed25519::PublicKey, PeerInfo>,
+ hash: hash::Digest,
+}
+
+impl KnownHosts {
+ fn new() -> Self {
+ let list = HashMap::new();
+ let hash = Self::calculate_hash(&list);
+ Self { list, hash }
+ }
+ fn update_hash(&mut self) {
+ self.hash = Self::calculate_hash(&self.list);
+ }
+ fn map_into_vec(
+ input: &HashMap<ed25519::PublicKey, PeerInfo>,
+ ) -> Vec<(ed25519::PublicKey, SocketAddr)> {
+ let mut list = Vec::with_capacity(input.len());
+ for (id, peer) in input.iter() {
+ if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
+ list.push((id.clone(), peer.addr));
+ }
+ }
+ list
+ }
+ fn calculate_hash(input: &HashMap<ed25519::PublicKey, PeerInfo>) -> hash::Digest {
+ let mut list = Self::map_into_vec(input);
+ list.sort();
+ let mut hash_state = hash::State::new();
+ for (id, addr) in list {
+ hash_state.update(&id[..]);
+ hash_state.update(&format!("{}", addr).into_bytes()[..]);
+ }
+ hash_state.finalize()
+ }
+}
+
+pub struct FullMeshPeeringStrategy {
+ netapp: Arc<NetApp>,
+ known_hosts: RwLock<KnownHosts>,
+ next_ping_id: AtomicU64,
+}
+
+impl FullMeshPeeringStrategy {
+ pub fn new(
+ netapp: Arc<NetApp>,
+ bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
+ ) -> Arc<Self> {
+ let mut known_hosts = KnownHosts::new();
+ for (pk, addr) in bootstrap_list {
+ if pk != netapp.pubkey {
+ known_hosts.list.insert(
+ pk,
+ PeerInfo {
+ addr: addr,
+ state: PeerConnState::Waiting(0, Instant::now()),
+ last_seen: None,
+ ping: VecDeque::new(),
+ },
+ );
+ }
+ }
+
+ let strat = Arc::new(Self {
+ netapp: netapp.clone(),
+ known_hosts: RwLock::new(known_hosts),
+ next_ping_id: AtomicU64::new(42),
+ });
+
+ let strat2 = strat.clone();
+ netapp.add_msg_handler::<PingMessage, _, _>(
+ move |from: ed25519::PublicKey, ping: PingMessage| {
+ let ping_resp = PingMessage {
+ id: ping.id,
+ peer_list_hash: strat2.known_hosts.read().unwrap().hash,
+ };
+ async move {
+ debug!("Ping from {}", hex::encode(&from));
+ Ok(ping_resp)
+ }
+ },
+ );
+
+ let strat2 = strat.clone();
+ netapp.add_msg_handler::<PeerListMessage, _, _>(
+ move |_from: ed25519::PublicKey, peer_list: PeerListMessage| {
+ strat2.handle_peer_list(&peer_list.list[..]);
+ let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list);
+ let resp = PeerListMessage { list: peer_list };
+ async move { Ok(resp) }
+ },
+ );
+
+ let strat2 = strat.clone();
+ netapp.on_connected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
+ let strat2 = strat2.clone();
+ tokio::spawn(strat2.on_connected(pk, addr, is_incoming));
+ },
+ ))));
+
+ let strat2 = strat.clone();
+ netapp.on_disconnected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, is_incoming: bool| {
+ let strat2 = strat2.clone();
+ tokio::spawn(strat2.on_disconnected(pk, is_incoming));
+ },
+ ))));
+
+ strat
+ }
+
+ pub async fn run(self: Arc<Self>) {
+ loop {
+ // 1. Read current state: get list of connected peers (ping them)
+ let known_hosts = self.known_hosts.read().unwrap();
+ debug!("known_hosts: {} peers", known_hosts.list.len());
+
+ let mut to_ping = vec![];
+ let mut to_retry = vec![];
+ for (id, info) in known_hosts.list.iter() {
+ debug!("{}, {:?}", hex::encode(id), info);
+ match info.state {
+ PeerConnState::Connected => {
+ let must_ping = match info.last_seen {
+ None => true,
+ Some(t) => Instant::now() - t > PING_INTERVAL,
+ };
+ if must_ping {
+ to_ping.push(id.clone());
+ }
+ }
+ PeerConnState::Waiting(_, t) => {
+ if Instant::now() >= t {
+ to_retry.push(id.clone());
+ }
+ }
+ _ => (),
+ }
+ }
+ drop(known_hosts);
+
+ // 2. Dispatch ping to hosts
+ trace!("to_ping: {} peers", to_retry.len());
+ for id in to_ping {
+ tokio::spawn(self.clone().ping(id));
+ }
+
+ // 3. Try reconnects
+ trace!("to_retry: {} peers", to_retry.len());
+ if !to_retry.is_empty() {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ for id in to_retry {
+ if let Some(h) = known_hosts.list.get_mut(&id) {
+ if let PeerConnState::Waiting(i, _) = h.state {
+ info!(
+ "Retrying connection to {} at {} ({})",
+ hex::encode(&id),
+ h.addr,
+ i + 1
+ );
+ h.state = PeerConnState::Trying(i);
+ tokio::spawn(self.clone().try_connect(id, h.addr.clone()));
+ }
+ }
+ }
+ }
+
+ // 4. Sleep before next loop iteration
+ tokio::time::delay_for(LOOP_DELAY).await;
+ }
+ }
+
+ async fn ping(self: Arc<Self>, id: ed25519::PublicKey) {
+ let peer = {
+ match self.netapp.client_conns.read().unwrap().get(&id) {
+ None => {
+ warn!("Should ping {}, but no connection", hex::encode(id));
+ return;
+ }
+ Some(peer) => peer.clone(),
+ }
+ };
+
+ let peer_list_hash = self.known_hosts.read().unwrap().hash;
+ let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
+ let ping_time = Instant::now();
+ let ping_msg = PingMessage {
+ id: ping_id,
+ peer_list_hash,
+ };
+
+ debug!(
+ "Sending ping {} to {} at {:?}",
+ ping_id,
+ hex::encode(id),
+ ping_time
+ );
+ match peer.clone().request(ping_msg, prio::HIGH).await {
+ Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
+ Ok(ping_resp) => {
+ let resp_time = Instant::now();
+ debug!(
+ "Got ping response from {} at {:?}",
+ hex::encode(id),
+ resp_time
+ );
+ {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&id) {
+ host.last_seen = Some(resp_time);
+ host.ping.push_back(resp_time - ping_time);
+ while host.ping.len() > 10 {
+ host.ping.pop_front();
+ }
+ }
+ }
+ if ping_resp.peer_list_hash != peer_list_hash {
+ self.exchange_peers(peer).await;
+ }
+ }
+ }
+ }
+
+ async fn exchange_peers(self: Arc<Self>, peer: Arc<ClientConn>) {
+ let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
+ let pex_message = PeerListMessage { list: peer_list };
+ match peer.request(pex_message, prio::BACKGROUND).await {
+ Err(e) => warn!("Error doing peer exchange: {}", e),
+ Ok(resp) => {
+ self.handle_peer_list(&resp.list[..]);
+ }
+ }
+ }
+
+ fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ for (id, addr) in list.iter() {
+ if !known_hosts.list.contains_key(id) {
+ known_hosts.list.insert(*id, self.new_peer(id, *addr));
+ }
+ }
+ }
+
+ async fn try_connect(self: Arc<Self>, id: ed25519::PublicKey, addr: SocketAddr) {
+ let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await;
+ if let Err(e) = conn_result {
+ warn!("Error connecting to {}: {}", hex::encode(id), e);
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&id) {
+ host.state = match host.state {
+ PeerConnState::Trying(i) => {
+ if i >= CONN_MAX_RETRIES {
+ PeerConnState::Abandonned
+ } else {
+ PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL)
+ }
+ }
+ _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL),
+ };
+ }
+ }
+ }
+
+ async fn on_connected(
+ self: Arc<Self>,
+ pk: ed25519::PublicKey,
+ addr: SocketAddr,
+ is_incoming: bool,
+ ) {
+ if is_incoming {
+ if !self.known_hosts.read().unwrap().list.contains_key(&pk) {
+ self.known_hosts
+ .write()
+ .unwrap()
+ .list
+ .insert(pk, self.new_peer(&pk, addr));
+ }
+ } else {
+ info!("Successfully connected to {} at {}", hex::encode(&pk), addr);
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&pk) {
+ host.state = PeerConnState::Connected;
+ known_hosts.update_hash();
+ }
+ }
+ }
+
+ async fn on_disconnected(self: Arc<Self>, pk: ed25519::PublicKey, is_incoming: bool) {
+ if !is_incoming {
+ info!("Connection to {} was closed", hex::encode(pk));
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&pk) {
+ host.state = PeerConnState::Waiting(0, Instant::now());
+ known_hosts.update_hash();
+ }
+ }
+ }
+
+ pub fn get_peer_list(&self) -> Vec<PeerInfoPub> {
+ let known_hosts = self.known_hosts.read().unwrap();
+ let mut ret = Vec::with_capacity(known_hosts.list.len());
+ for (id, info) in known_hosts.list.iter() {
+ let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
+ pings.sort();
+ if pings.len() > 0 {
+ ret.push(PeerInfoPub {
+ id: id.clone(),
+ addr: info.addr,
+ state: info.state,
+ last_seen: info.last_seen,
+ avg_ping: Some(
+ pings
+ .iter()
+ .fold(Duration::from_secs(0), |x, y| x + *y)
+ .div_f64(pings.len() as f64),
+ ),
+ max_ping: pings.last().cloned(),
+ med_ping: Some(pings[pings.len() / 2]),
+ });
+ } else {
+ ret.push(PeerInfoPub {
+ id: id.clone(),
+ addr: info.addr,
+ state: info.state,
+ last_seen: info.last_seen,
+ avg_ping: None,
+ max_ping: None,
+ med_ping: None,
+ });
+ }
+ }
+ ret
+ }
+
+ fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo {
+ let state = if *id == self.netapp.pubkey {
+ PeerConnState::Ourself
+ } else {
+ PeerConnState::Waiting(0, Instant::now())
+ };
+ PeerInfo {
+ addr,
+ state,
+ last_seen: None,
+ ping: VecDeque::new(),
+ }
+ }
+}
diff --git a/src/peering/mod.rs b/src/peering/mod.rs
new file mode 100644
index 0000000..beb2e18
--- /dev/null
+++ b/src/peering/mod.rs
@@ -0,0 +1,2 @@
+pub mod basalt;
+pub mod fullmesh;