aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src/mail/incoming.rs
diff options
context:
space:
mode:
Diffstat (limited to 'aero-collections/src/mail/incoming.rs')
-rw-r--r--aero-collections/src/mail/incoming.rs443
1 files changed, 443 insertions, 0 deletions
diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs
new file mode 100644
index 0000000..55c2515
--- /dev/null
+++ b/aero-collections/src/mail/incoming.rs
@@ -0,0 +1,443 @@
+use std::sync::{Arc, Weak};
+use std::time::Duration;
+
+use anyhow::{anyhow, bail, Result};
+use base64::Engine;
+use futures::{future::BoxFuture, FutureExt};
+//use tokio::io::AsyncReadExt;
+use tokio::sync::watch;
+use tracing::{debug, error, info, warn};
+
+use aero_bayou::timestamp::now_msec;
+use aero_user::cryptoblob;
+use aero_user::login::{Credentials, PublicCredentials};
+use aero_user::storage;
+
+use crate::mail::mailbox::Mailbox;
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::IMF;
+use crate::unique_ident::*;
+use crate::user::User;
+
+const INCOMING_PK: &str = "incoming";
+const INCOMING_LOCK_SK: &str = "lock";
+const INCOMING_WATCH_SK: &str = "watch";
+
+const MESSAGE_KEY: &str = "message-key";
+
+// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
+// It is renewed every LOCK_DURATION/3
+// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
+// lost the lock.
+const LOCK_DURATION: Duration = Duration::from_secs(300);
+
+// In addition to checking when notified, also check for new mail every 10 minutes
+const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
+
+pub async fn incoming_mail_watch_process(
+ user: Weak<User>,
+ creds: Credentials,
+ rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) {
+ if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
+ error!("Error in incoming mail watch process: {}", e);
+ }
+}
+
+async fn incoming_mail_watch_process_internal(
+ user: Weak<User>,
+ creds: Credentials,
+ mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) -> Result<()> {
+ let mut lock_held = k2v_lock_loop(
+ creds.storage.build().await?,
+ storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
+ );
+ let storage = creds.storage.build().await?;
+
+ let mut inbox: Option<Arc<Mailbox>> = None;
+ let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+
+ loop {
+ let maybe_updated_incoming_key = if *lock_held.borrow() {
+ debug!("incoming lock held");
+
+ let wait_new_mail = async {
+ loop {
+ match storage.row_poll(&incoming_key).await {
+ Ok(row_val) => break row_val.row_ref,
+ Err(e) => {
+ error!("Error in wait_new_mail: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ };
+
+ tokio::select! {
+ inc_k = wait_new_mail => Some(inc_k),
+ _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
+ }
+ } else {
+ debug!("incoming lock not held");
+ tokio::select! {
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
+ }
+ };
+
+ let user = match Weak::upgrade(&user) {
+ Some(user) => user,
+ None => {
+ debug!("User no longer available, exiting incoming loop.");
+ break;
+ }
+ };
+ debug!("User still available");
+
+ // If INBOX no longer is same mailbox, open new mailbox
+ let inbox_id = *rx_inbox_id.borrow();
+ if let Some((id, uidvalidity)) = inbox_id {
+ if Some(id) != inbox.as_ref().map(|b| b.id) {
+ match user.open_mailbox_by_id(id, uidvalidity).await {
+ Ok(mb) => {
+ inbox = Some(mb);
+ }
+ Err(e) => {
+ inbox = None;
+ error!("Error when opening inbox ({}): {}", id, e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ continue;
+ }
+ }
+ }
+ }
+
+ // If we were able to open INBOX, and we have mail,
+ // fetch new mail
+ if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
+ match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
+ Ok(()) => {
+ incoming_key = updated_incoming_key;
+ }
+ Err(e) => {
+ error!("Could not fetch incoming mail: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ }
+ drop(rx_inbox_id);
+ Ok(())
+}
+
+async fn handle_incoming_mail(
+ user: &Arc<User>,
+ storage: &storage::Store,
+ inbox: &Arc<Mailbox>,
+ lock_held: &watch::Receiver<bool>,
+) -> Result<()> {
+ let mails_res = storage.blob_list("incoming/").await?;
+
+ for object in mails_res {
+ if !*lock_held.borrow() {
+ break;
+ }
+ let key = object.0;
+ if let Some(mail_id) = key.strip_prefix("incoming/") {
+ if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
+ move_incoming_message(user, storage, inbox, mail_id).await?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn move_incoming_message(
+ user: &Arc<User>,
+ storage: &storage::Store,
+ inbox: &Arc<Mailbox>,
+ id: UniqueIdent,
+) -> Result<()> {
+ info!("Moving incoming message: {}", id);
+
+ let object_key = format!("incoming/{}", id);
+
+ // 1. Fetch message from S3
+ let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
+
+ // 1.a decrypt message key from headers
+ //info!("Object metadata: {:?}", get_result.metadata);
+ let key_encrypted_b64 = object
+ .meta
+ .get(MESSAGE_KEY)
+ .ok_or(anyhow!("Missing key in metadata"))?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
+ let message_key = sodiumoxide::crypto::sealedbox::open(
+ &key_encrypted,
+ &user.creds.keys.public,
+ &user.creds.keys.secret,
+ )
+ .map_err(|_| anyhow!("Cannot decrypt message key"))?;
+ let message_key =
+ cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
+
+ // 1.b retrieve message body
+ let obj_body = object.value;
+ let plain_mail = cryptoblob::open(&obj_body, &message_key)
+ .map_err(|_| anyhow!("Cannot decrypt email content"))?;
+
+ // 2 parse mail and add to inbox
+ let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
+ inbox
+ .append_from_s3(msg, id, object.blob_ref.clone(), message_key)
+ .await?;
+
+ // 3 delete from incoming
+ storage.blob_rm(&object.blob_ref).await?;
+
+ Ok(())
+}
+
+// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
+
+fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
+ let (held_tx, held_rx) = watch::channel(false);
+
+ tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
+
+ held_rx
+}
+
+#[derive(Clone, Debug)]
+enum LockState {
+ Unknown,
+ Empty,
+ Held(UniqueIdent, u64, storage::RowRef),
+}
+
+async fn k2v_lock_loop_internal(
+ storage: storage::Store,
+ row_ref: storage::RowRef,
+ held_tx: watch::Sender<bool>,
+) {
+ let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
+ let mut state_rx_2 = state_rx.clone();
+
+ let our_pid = gen_ident();
+
+ // Loop 1: watch state of lock in K2V, save that in corresponding watch channel
+ let watch_lock_loop: BoxFuture<Result<()>> = async {
+ let mut ct = row_ref.clone();
+ loop {
+ debug!("k2v watch lock loop iter: ct = {:?}", ct);
+ match storage.row_poll(&ct).await {
+ Err(e) => {
+ error!(
+ "Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
+ e
+ );
+ state_tx.send(LockState::Unknown)?;
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(cv) => {
+ let mut lock_state = None;
+ for v in cv.value.iter() {
+ if let storage::Alternative::Value(vbytes) = v {
+ if vbytes.len() == 32 {
+ let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
+ let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
+ if lock_state
+ .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
+ .unwrap_or(true)
+ {
+ lock_state = Some((pid, ts));
+ }
+ }
+ }
+ }
+ let new_ct = cv.row_ref;
+
+ debug!(
+ "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
+ ct, new_ct, lock_state
+ );
+ state_tx.send(
+ lock_state
+ .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
+ .unwrap_or(LockState::Empty),
+ )?;
+ ct = new_ct;
+ }
+ }
+ }
+ }
+ .boxed();
+
+ // Loop 2: notify user whether we are holding the lock or not
+ let lock_notify_loop: BoxFuture<Result<()>> = async {
+ loop {
+ let now = now_msec();
+ let held_with_expiration_time = match &*state_rx.borrow_and_update() {
+ LockState::Held(pid, ts, _ct) if *pid == our_pid => {
+ let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
+ if now < expiration_time {
+ Some(expiration_time)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ };
+ let held = held_with_expiration_time.is_some();
+ if held != *held_tx.borrow() {
+ held_tx.send(held)?;
+ }
+
+ let await_expired = async {
+ match held_with_expiration_time {
+ None => futures::future::pending().await,
+ Some(expiration_time) => {
+ tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
+ }
+ };
+ };
+
+ tokio::select!(
+ r = state_rx.changed() => {
+ r?;
+ }
+ _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
+ _ = await_expired => continue,
+ );
+ }
+ }
+ .boxed();
+
+ // Loop 3: acquire lock when relevant
+ let take_lock_loop: BoxFuture<Result<()>> = async {
+ loop {
+ let now = now_msec();
+ let state: LockState = state_rx_2.borrow_and_update().clone();
+ let (acquire_at, ct) = match state {
+ LockState::Unknown => {
+ // If state of the lock is unknown, don't try to acquire
+ state_rx_2.changed().await?;
+ continue;
+ }
+ LockState::Empty => (now, None),
+ LockState::Held(pid, ts, ct) => {
+ if pid == our_pid {
+ (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
+ } else {
+ (ts, Some(ct))
+ }
+ }
+ };
+
+ // Wait until it is time to acquire lock
+ if acquire_at > now {
+ tokio::select!(
+ r = state_rx_2.changed() => {
+ // If lock state changed in the meantime, don't acquire and loop around
+ r?;
+ continue;
+ }
+ _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
+ );
+ }
+
+ // Acquire lock
+ let mut lock = vec![0u8; 32];
+ lock[..8].copy_from_slice(&u64::to_be_bytes(
+ now_msec() + LOCK_DURATION.as_millis() as u64,
+ ));
+ lock[8..].copy_from_slice(&our_pid.0);
+ let row = match ct {
+ Some(existing) => existing,
+ None => row_ref.clone(),
+ };
+ if let Err(e) = storage
+ .row_insert(vec![storage::RowVal::new(row, lock)])
+ .await
+ {
+ error!("Could not take lock: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+
+ // Wait for new information to loop back
+ state_rx_2.changed().await?;
+ }
+ }
+ .boxed();
+
+ let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
+
+ debug!("lock loop exited, releasing");
+
+ if !held_tx.is_closed() {
+ warn!("weird...");
+ let _ = held_tx.send(false);
+ }
+
+ // If lock is ours, release it
+ let release = match &*state_rx.borrow() {
+ LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
+ _ => None,
+ };
+ if let Some(ct) = release {
+ match storage.row_rm(&storage::Selector::Single(&ct)).await {
+ Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
+ Ok(_) => (),
+ };
+ }
+}
+
+// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
+
+pub struct EncryptedMessage {
+ key: cryptoblob::Key,
+ encrypted_body: Vec<u8>,
+}
+
+impl EncryptedMessage {
+ pub fn new(body: Vec<u8>) -> Result<Self> {
+ let key = cryptoblob::gen_key();
+ let encrypted_body = cryptoblob::seal(&body, &key)?;
+ Ok(Self {
+ key,
+ encrypted_body,
+ })
+ }
+
+ pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
+ let storage = creds.storage.build().await?;
+
+ // Get causality token of previous watch key
+ let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+ let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
+ Err(_) => query,
+ Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
+ };
+
+ // Write mail to encrypted storage
+ let encrypted_key =
+ sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
+
+ let blob_val = storage::BlobVal::new(
+ storage::BlobRef(format!("incoming/{}", gen_ident())),
+ self.encrypted_body.clone().into(),
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+ storage.blob_insert(blob_val).await?;
+
+ // Update watch key to signal new mail
+ let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
+ storage.row_insert(vec![watch_val]).await?;
+ Ok(())
+ }
+}