diff options
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r-- | src/mail/incoming.rs | 136 |
1 files changed, 85 insertions, 51 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 24c7a2a..398f555 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -3,10 +3,10 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::Result; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; +use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; use rusoto_s3::{PutObjectRequest, S3Client, S3}; use tokio::sync::watch; -use tracing::error; +use tracing::{error, info}; use crate::cryptoblob; use crate::login::{Credentials, PublicCredentials}; @@ -45,13 +45,30 @@ async fn incoming_mail_watch_process_internal( loop { let new_mail = if *lock_held.borrow() { + info!("incoming lock held"); + + let wait_new_mail = async { + loop { + match k2v_wait_value_changed(&k2v, &INCOMING_PK, &INCOMING_WATCH_SK, &prev_ct) + .await + { + Ok(cv) => break cv, + Err(e) => { + error!("Error in wait_new_mail: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + }; + tokio::select! { - ct = wait_new_mail(&k2v, &prev_ct) => Some(ct), + cv = wait_new_mail => Some(cv.causality), _ = tokio::time::sleep(Duration::from_secs(300)) => prev_ct.take(), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } } else { + info!("incoming lock not held"); tokio::select! { _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, @@ -59,7 +76,7 @@ async fn incoming_mail_watch_process_internal( }; if let Some(user) = Weak::upgrade(&user) { - eprintln!("User still available"); + info!("User still available"); // If INBOX no longer is same mailbox, open new mailbox let inbox_id = rx_inbox_id.borrow().clone(); @@ -92,7 +109,7 @@ async fn incoming_mail_watch_process_internal( } } } else { - eprintln!("User no longer available, exiting incoming loop."); + info!("User no longer available, exiting incoming loop."); break; } } @@ -100,45 +117,6 @@ async fn incoming_mail_watch_process_internal( Ok(()) } -async fn wait_new_mail(k2v: &K2vClient, prev_ct: &Option<CausalityToken>) -> CausalityToken { - loop { - if let Some(ct) = &prev_ct { - match k2v - .poll_item(INCOMING_PK, INCOMING_WATCH_SK, ct.clone(), None) - .await - { - Err(e) => { - error!("Error when waiting for incoming watch: {}, sleeping", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - Ok(None) => continue, - Ok(Some(cv)) => { - return cv.causality; - } - } - } else { - match k2v.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { - Err(k2v_client::Error::NotFound) => { - if let Err(e) = k2v - .insert_item(INCOMING_PK, INCOMING_WATCH_SK, vec![0u8], None) - .await - { - error!("Error when waiting for incoming watch: {}, sleeping", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - } - Err(e) => { - error!("Error when waiting for incoming watch: {}, sleeping", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - Ok(cv) => { - return cv.causality; - } - } - } - } -} - async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>) -> Result<()> { unimplemented!() } @@ -147,10 +125,7 @@ fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::R let (held_tx, held_rx) = watch::channel(false); tokio::spawn(async move { - if let Err(e) = k2v_lock_loop_internal(k2v, pk, sk, &held_tx).await { - error!("Error in k2v locking loop: {}", e); - } - let _ = held_tx.send(false); + let _ = k2v_lock_loop_internal(k2v, pk, sk, held_tx).await; }); held_rx @@ -160,9 +135,68 @@ async fn k2v_lock_loop_internal( k2v: K2vClient, pk: &'static str, sk: &'static str, - held_tx: &watch::Sender<bool>, -) -> Result<()> { - unimplemented!() + held_tx: watch::Sender<bool>, +) -> std::result::Result<(), watch::error::SendError<bool>> { + let pid = gen_ident(); + + let mut state: Option<(UniqueIdent, u64, CausalityToken)> = None; + loop { + let held_until = match &state { + None => None, + Some((_holder, expiration_time, _ct)) => Some(expiration_time), + }; + + let now = now_msec(); + let wait_half_held_time = async { + match held_until { + None => futures::future::pending().await, + Some(t) => tokio::time::sleep(Duration::from_millis((now_msec() - t) / 2)).await, + } + }; + + unimplemented!(); + + /* + tokio::select! { + ret = k2v_wait_value_changed(&k2v, pk, sk, &state.as_ref().map(|(_, _, ct)| ct.clone())) => { + match ret { + Err(e) => { + held_tx.send(false)?; + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } + Ok(cv) => { + unimplemented!(); + } + } + } + } + */ + } +} + +async fn k2v_wait_value_changed<'a>( + k2v: &'a K2vClient, + pk: &'static str, + sk: &'static str, + prev_ct: &'a Option<CausalityToken>, +) -> Result<CausalValue> { + loop { + if let Some(ct) = prev_ct { + match k2v.poll_item(pk, sk, ct.clone(), None).await? { + None => continue, + Some(cv) => return Ok(cv), + } + } else { + match k2v.read_item(pk, sk).await { + Err(k2v_client::Error::NotFound) => { + k2v.insert_item(pk, sk, vec![0u8], None).await?; + } + Err(e) => return Err(e.into()), + Ok(cv) => return Ok(cv), + } + } + } } // ---- |