diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-13 16:14:10 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-13 16:14:10 +0200 |
commit | 2a0aa0d42c51f54e52a2a69e9806daeb01fc17f1 (patch) | |
tree | 863a24e8847e96f1a4a63d24a42c3e8c5ae85658 | |
parent | 3b256de3dc1347bf449905de714ac5486935264a (diff) | |
download | aerogramme-2a0aa0d42c51f54e52a2a69e9806daeb01fc17f1.tar.gz aerogramme-2a0aa0d42c51f54e52a2a69e9806daeb01fc17f1.zip |
Refactor stuff to indeed release resources
-rw-r--r-- | src/bayou.rs | 57 |
1 files changed, 23 insertions, 34 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 4d33a8e..2d83ce3 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -471,61 +471,50 @@ impl K2vWatch { let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); - tokio::spawn(Self::watcher_task( + tokio::spawn(Self::background_task( Arc::downgrade(&watch), creds.k2v_client()?, tx, )); - tokio::spawn(Self::updater_task( - Arc::downgrade(&watch), - creds.k2v_client()?, - )); Ok(watch) } - async fn watcher_task( + async fn background_task( self_weak: Weak<Self>, k2v: K2vClient, tx: watch::Sender<Option<CausalityToken>>, ) { let mut ct = None; while let Some(this) = Weak::upgrade(&self_weak) { - info!("bayou k2v watch loop iter: ct = {:?}", ct); - let update = tokio::select!( - _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r, + debug!( + "bayou k2v watch bg loop iter ({}, {}): ct = {:?}", + this.pk, this.sk, ct ); - match update { - Err(e) => { - error!("Error in bayou k2v wait value changed: {}", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - Ok(cv) => { - if tx.send(Some(cv.causality.clone())).is_err() { - break; + tokio::select!( + _ = tokio::time::sleep(Duration::from_secs(60)) => continue, + update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + match update { + Err(e) => { + error!("Error in bayou k2v wait value changed: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(cv) => { + if tx.send(Some(cv.causality.clone())).is_err() { + break; + } + ct = Some(cv.causality); + } } - ct = Some(cv.causality); } - } - } - info!("bayou k2v watch loop exiting"); - } - - async fn updater_task(self_weak: Weak<Self>, k2v: K2vClient) { - while let Some(this) = Weak::upgrade(&self_weak) { - let ct: Option<CausalityToken> = this.rx.borrow().clone(); - let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - - tokio::select!( - _ = tokio::time::sleep(Duration::from_secs(60)) => (), _ = this.notify.notified() => { + let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); if let Err(e) = k2v .insert_item( &this.pk, &this.sk, rand, - ct, + ct.clone(), ) .await { @@ -533,9 +522,9 @@ impl K2vWatch { tokio::time::sleep(Duration::from_secs(30)).await; } } - ) + ); } - info!("bayou k2v watch updater loop exiting"); + info!("bayou k2v watch bg loop exiting"); } } |