aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-13 16:14:10 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-13 16:14:10 +0200
commit2a0aa0d42c51f54e52a2a69e9806daeb01fc17f1 (patch)
tree863a24e8847e96f1a4a63d24a42c3e8c5ae85658
parent3b256de3dc1347bf449905de714ac5486935264a (diff)
downloadaerogramme-2a0aa0d42c51f54e52a2a69e9806daeb01fc17f1.tar.gz
aerogramme-2a0aa0d42c51f54e52a2a69e9806daeb01fc17f1.zip
Refactor stuff to indeed release resources
-rw-r--r--src/bayou.rs57
1 files changed, 23 insertions, 34 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 4d33a8e..2d83ce3 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -471,61 +471,50 @@ impl K2vWatch {
let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
- tokio::spawn(Self::watcher_task(
+ tokio::spawn(Self::background_task(
Arc::downgrade(&watch),
creds.k2v_client()?,
tx,
));
- tokio::spawn(Self::updater_task(
- Arc::downgrade(&watch),
- creds.k2v_client()?,
- ));
Ok(watch)
}
- async fn watcher_task(
+ async fn background_task(
self_weak: Weak<Self>,
k2v: K2vClient,
tx: watch::Sender<Option<CausalityToken>>,
) {
let mut ct = None;
while let Some(this) = Weak::upgrade(&self_weak) {
- info!("bayou k2v watch loop iter: ct = {:?}", ct);
- let update = tokio::select!(
- _ = tokio::time::sleep(Duration::from_secs(60)) => continue,
- r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r,
+ debug!(
+ "bayou k2v watch bg loop iter ({}, {}): ct = {:?}",
+ this.pk, this.sk, ct
);
- match update {
- Err(e) => {
- error!("Error in bayou k2v wait value changed: {}", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- Ok(cv) => {
- if tx.send(Some(cv.causality.clone())).is_err() {
- break;
+ tokio::select!(
+ _ = tokio::time::sleep(Duration::from_secs(60)) => continue,
+ update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
+ match update {
+ Err(e) => {
+ error!("Error in bayou k2v wait value changed: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(cv) => {
+ if tx.send(Some(cv.causality.clone())).is_err() {
+ break;
+ }
+ ct = Some(cv.causality);
+ }
}
- ct = Some(cv.causality);
}
- }
- }
- info!("bayou k2v watch loop exiting");
- }
-
- async fn updater_task(self_weak: Weak<Self>, k2v: K2vClient) {
- while let Some(this) = Weak::upgrade(&self_weak) {
- let ct: Option<CausalityToken> = this.rx.borrow().clone();
- let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
-
- tokio::select!(
- _ = tokio::time::sleep(Duration::from_secs(60)) => (),
_ = this.notify.notified() => {
+ let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
if let Err(e) = k2v
.insert_item(
&this.pk,
&this.sk,
rand,
- ct,
+ ct.clone(),
)
.await
{
@@ -533,9 +522,9 @@ impl K2vWatch {
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
- )
+ );
}
- info!("bayou k2v watch updater loop exiting");
+ info!("bayou k2v watch bg loop exiting");
}
}