diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-13 17:30:41 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-13 17:30:41 +0200 |
commit | 7eea46dcf3f93377163a51288a1e9a807584256c (patch) | |
tree | 8b462c6d6c8f8d6af90cebc49eed11cfbfa165e9 | |
parent | bc86bd3986429c4dac00a2cdb5cf8e30a092b6f5 (diff) | |
download | netapp-7eea46dcf3f93377163a51288a1e9a807584256c.tar.gz netapp-7eea46dcf3f93377163a51288a1e9a807584256c.zip |
Properly implement watches for Basalt
-rw-r--r-- | examples/basalt.rs | 12 | ||||
-rw-r--r-- | src/peering/basalt.rs | 19 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 4 |
3 files changed, 20 insertions, 15 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs index 63b4b4c..85fbb58 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -12,6 +12,8 @@ use structopt::StructOpt; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; +use tokio::sync::watch; + use netapp::endpoint::*; use netapp::peering::basalt::*; use netapp::proto::*; @@ -126,18 +128,18 @@ async fn main() { let watch_cancel = netapp::util::watch_ctrl_c(); tokio::join!( - example.clone().sampling_loop(), + example.clone().sampling_loop(watch_cancel.clone()), example .netapp .clone() - .listen(listen_addr, public_addr, watch_cancel), - example.basalt.clone().run(), + .listen(listen_addr, public_addr, watch_cancel.clone()), + example.basalt.clone().run(watch_cancel.clone()), ); } impl Example { - async fn sampling_loop(self: Arc<Self>) { - loop { + async fn sampling_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow() { tokio::time::sleep(Duration::from_secs(10)).await; let peers = self.basalt.sample(10); diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index efbf6e6..cdb0605 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; +use tokio::sync::watch; + use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; @@ -303,18 +305,19 @@ impl Basalt { .collect::<Vec<_>>() } - pub async fn run(self: Arc<Self>) { + pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) { for peer in self.bootstrap_peers.iter() { tokio::spawn(self.clone().try_connect(*peer)); } - let pushpull_loop = self.clone().run_pushpull_loop(); - let reset_loop = self.run_reset_loop(); - tokio::join!(pushpull_loop, reset_loop); + tokio::join!( + self.clone().run_pushpull_loop(must_exit.clone()), + self.clone().run_reset_loop(must_exit.clone()), + ); } - async fn run_pushpull_loop(self: Arc<Self>) { - loop { + async fn run_pushpull_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow() { tokio::time::sleep(self.param.exchange_interval).await; let peers = self.view.read().unwrap().sample(2); @@ -360,8 +363,8 @@ impl Basalt { } } - async fn run_reset_loop(self: Arc<Self>) { - loop { + async fn run_reset_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow() { tokio::time::sleep(self.param.reset_interval).await; { diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 793eeb2..1162048 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -178,12 +178,12 @@ impl FullMeshPeeringStrategy { // 1. Read current state: get list of connected peers (ping them) let (to_ping, to_retry) = { let known_hosts = self.known_hosts.read().unwrap(); - debug!("known_hosts: {} peers", known_hosts.list.len()); + trace!("known_hosts: {} peers", known_hosts.list.len()); let mut to_ping = vec![]; let mut to_retry = vec![]; for (id, info) in known_hosts.list.iter() { - debug!("{}, {:?}", hex::encode(id), info); + trace!("{}, {:?}", hex::encode(id), info); match info.state { PeerConnState::Connected => { let must_ping = match info.last_seen { |