aboutsummaryrefslogtreecommitdiff
path: root/src/mail/incoming.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-30 20:35:27 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-30 20:35:27 +0200
commit2189f8b64b635b74d92f6027e7c6f45c317c334e (patch)
tree722c17b834d17ed06b350b49ca755ad8f17208da /src/mail/incoming.rs
parent2c61af6684c44e3558cc808c6b319bc9706adb00 (diff)
downloadaerogramme-2189f8b64b635b74d92f6027e7c6f45c317c334e.tar.gz
aerogramme-2189f8b64b635b74d92f6027e7c6f45c317c334e.zip
WIP incoming loop
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r--src/mail/incoming.rs136
1 files changed, 85 insertions, 51 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 24c7a2a..398f555 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -3,10 +3,10 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::Result;
-use k2v_client::{CausalityToken, K2vClient, K2vValue};
+use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{PutObjectRequest, S3Client, S3};
use tokio::sync::watch;
-use tracing::error;
+use tracing::{error, info};
use crate::cryptoblob;
use crate::login::{Credentials, PublicCredentials};
@@ -45,13 +45,30 @@ async fn incoming_mail_watch_process_internal(
loop {
let new_mail = if *lock_held.borrow() {
+ info!("incoming lock held");
+
+ let wait_new_mail = async {
+ loop {
+ match k2v_wait_value_changed(&k2v, &INCOMING_PK, &INCOMING_WATCH_SK, &prev_ct)
+ .await
+ {
+ Ok(cv) => break cv,
+ Err(e) => {
+ error!("Error in wait_new_mail: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ };
+
tokio::select! {
- ct = wait_new_mail(&k2v, &prev_ct) => Some(ct),
+ cv = wait_new_mail => Some(cv.causality),
_ = tokio::time::sleep(Duration::from_secs(300)) => prev_ct.take(),
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
}
} else {
+ info!("incoming lock not held");
tokio::select! {
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
@@ -59,7 +76,7 @@ async fn incoming_mail_watch_process_internal(
};
if let Some(user) = Weak::upgrade(&user) {
- eprintln!("User still available");
+ info!("User still available");
// If INBOX no longer is same mailbox, open new mailbox
let inbox_id = rx_inbox_id.borrow().clone();
@@ -92,7 +109,7 @@ async fn incoming_mail_watch_process_internal(
}
}
} else {
- eprintln!("User no longer available, exiting incoming loop.");
+ info!("User no longer available, exiting incoming loop.");
break;
}
}
@@ -100,45 +117,6 @@ async fn incoming_mail_watch_process_internal(
Ok(())
}
-async fn wait_new_mail(k2v: &K2vClient, prev_ct: &Option<CausalityToken>) -> CausalityToken {
- loop {
- if let Some(ct) = &prev_ct {
- match k2v
- .poll_item(INCOMING_PK, INCOMING_WATCH_SK, ct.clone(), None)
- .await
- {
- Err(e) => {
- error!("Error when waiting for incoming watch: {}, sleeping", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- Ok(None) => continue,
- Ok(Some(cv)) => {
- return cv.causality;
- }
- }
- } else {
- match k2v.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
- Err(k2v_client::Error::NotFound) => {
- if let Err(e) = k2v
- .insert_item(INCOMING_PK, INCOMING_WATCH_SK, vec![0u8], None)
- .await
- {
- error!("Error when waiting for incoming watch: {}, sleeping", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- }
- Err(e) => {
- error!("Error when waiting for incoming watch: {}, sleeping", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- Ok(cv) => {
- return cv.causality;
- }
- }
- }
- }
-}
-
async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>) -> Result<()> {
unimplemented!()
}
@@ -147,10 +125,7 @@ fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::R
let (held_tx, held_rx) = watch::channel(false);
tokio::spawn(async move {
- if let Err(e) = k2v_lock_loop_internal(k2v, pk, sk, &held_tx).await {
- error!("Error in k2v locking loop: {}", e);
- }
- let _ = held_tx.send(false);
+ let _ = k2v_lock_loop_internal(k2v, pk, sk, held_tx).await;
});
held_rx
@@ -160,9 +135,68 @@ async fn k2v_lock_loop_internal(
k2v: K2vClient,
pk: &'static str,
sk: &'static str,
- held_tx: &watch::Sender<bool>,
-) -> Result<()> {
- unimplemented!()
+ held_tx: watch::Sender<bool>,
+) -> std::result::Result<(), watch::error::SendError<bool>> {
+ let pid = gen_ident();
+
+ let mut state: Option<(UniqueIdent, u64, CausalityToken)> = None;
+ loop {
+ let held_until = match &state {
+ None => None,
+ Some((_holder, expiration_time, _ct)) => Some(expiration_time),
+ };
+
+ let now = now_msec();
+ let wait_half_held_time = async {
+ match held_until {
+ None => futures::future::pending().await,
+ Some(t) => tokio::time::sleep(Duration::from_millis((now_msec() - t) / 2)).await,
+ }
+ };
+
+ unimplemented!();
+
+ /*
+ tokio::select! {
+ ret = k2v_wait_value_changed(&k2v, pk, sk, &state.as_ref().map(|(_, _, ct)| ct.clone())) => {
+ match ret {
+ Err(e) => {
+ held_tx.send(false)?;
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ continue;
+ }
+ Ok(cv) => {
+ unimplemented!();
+ }
+ }
+ }
+ }
+ */
+ }
+}
+
+async fn k2v_wait_value_changed<'a>(
+ k2v: &'a K2vClient,
+ pk: &'static str,
+ sk: &'static str,
+ prev_ct: &'a Option<CausalityToken>,
+) -> Result<CausalValue> {
+ loop {
+ if let Some(ct) = prev_ct {
+ match k2v.poll_item(pk, sk, ct.clone(), None).await? {
+ None => continue,
+ Some(cv) => return Ok(cv),
+ }
+ } else {
+ match k2v.read_item(pk, sk).await {
+ Err(k2v_client::Error::NotFound) => {
+ k2v.insert_item(pk, sk, vec![0u8], None).await?;
+ }
+ Err(e) => return Err(e.into()),
+ Ok(cv) => return Ok(cv),
+ }
+ }
+ }
}
// ----