From 7eea46dcf3f93377163a51288a1e9a807584256c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Oct 2021 17:30:41 +0200 Subject: Properly implement watches for Basalt --- src/peering/basalt.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'src/peering/basalt.rs') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index efbf6e6..cdb0605 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; +use tokio::sync::watch; + use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; @@ -303,18 +305,19 @@ impl Basalt { .collect::>() } - pub async fn run(self: Arc) { + pub async fn run(self: Arc, must_exit: watch::Receiver) { for peer in self.bootstrap_peers.iter() { tokio::spawn(self.clone().try_connect(*peer)); } - let pushpull_loop = self.clone().run_pushpull_loop(); - let reset_loop = self.run_reset_loop(); - tokio::join!(pushpull_loop, reset_loop); + tokio::join!( + self.clone().run_pushpull_loop(must_exit.clone()), + self.clone().run_reset_loop(must_exit.clone()), + ); } - async fn run_pushpull_loop(self: Arc) { - loop { + async fn run_pushpull_loop(self: Arc, must_exit: watch::Receiver) { + while !*must_exit.borrow() { tokio::time::sleep(self.param.exchange_interval).await; let peers = self.view.read().unwrap().sample(2); @@ -360,8 +363,8 @@ impl Basalt { } } - async fn run_reset_loop(self: Arc) { - loop { + async fn run_reset_loop(self: Arc, must_exit: watch::Receiver) { + while !*must_exit.borrow() { tokio::time::sleep(self.param.reset_interval).await; { -- cgit v1.2.3