aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-13 17:30:41 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-13 17:30:41 +0200
commit7eea46dcf3f93377163a51288a1e9a807584256c (patch)
tree8b462c6d6c8f8d6af90cebc49eed11cfbfa165e9 /src/peering
parentbc86bd3986429c4dac00a2cdb5cf8e30a092b6f5 (diff)
downloadnetapp-7eea46dcf3f93377163a51288a1e9a807584256c.tar.gz
netapp-7eea46dcf3f93377163a51288a1e9a807584256c.zip
Properly implement watches for Basalt
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/basalt.rs19
-rw-r--r--src/peering/fullmesh.rs4
2 files changed, 13 insertions, 10 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index efbf6e6..cdb0605 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash;
+use tokio::sync::watch;
+
use crate::endpoint::*;
use crate::netapp::*;
use crate::proto::*;
@@ -303,18 +305,19 @@ impl Basalt {
.collect::<Vec<_>>()
}
- pub async fn run(self: Arc<Self>) {
+ pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
for peer in self.bootstrap_peers.iter() {
tokio::spawn(self.clone().try_connect(*peer));
}
- let pushpull_loop = self.clone().run_pushpull_loop();
- let reset_loop = self.run_reset_loop();
- tokio::join!(pushpull_loop, reset_loop);
+ tokio::join!(
+ self.clone().run_pushpull_loop(must_exit.clone()),
+ self.clone().run_reset_loop(must_exit.clone()),
+ );
}
- async fn run_pushpull_loop(self: Arc<Self>) {
- loop {
+ async fn run_pushpull_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
+ while !*must_exit.borrow() {
tokio::time::sleep(self.param.exchange_interval).await;
let peers = self.view.read().unwrap().sample(2);
@@ -360,8 +363,8 @@ impl Basalt {
}
}
- async fn run_reset_loop(self: Arc<Self>) {
- loop {
+ async fn run_reset_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
+ while !*must_exit.borrow() {
tokio::time::sleep(self.param.reset_interval).await;
{
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index 793eeb2..1162048 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -178,12 +178,12 @@ impl FullMeshPeeringStrategy {
// 1. Read current state: get list of connected peers (ping them)
let (to_ping, to_retry) = {
let known_hosts = self.known_hosts.read().unwrap();
- debug!("known_hosts: {} peers", known_hosts.list.len());
+ trace!("known_hosts: {} peers", known_hosts.list.len());
let mut to_ping = vec![];
let mut to_retry = vec![];
for (id, info) in known_hosts.list.iter() {
- debug!("{}, {:?}", hex::encode(id), info);
+ trace!("{}, {:?}", hex::encode(id), info);
match info.state {
PeerConnState::Connected => {
let must_ping = match info.last_seen {