use std::collections::HashMap; use std::convert::TryFrom; use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, FutureExt}; use k2v_client::{CausalityToken, K2vClient, K2vValue}; use rusoto_s3::{ DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, }; use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; use crate::cryptoblob; use crate::k2v_util::k2v_wait_value_changed; use crate::login::{Credentials, PublicCredentials}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; use crate::time::now_msec; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; const INCOMING_WATCH_SK: &str = "watch"; const MESSAGE_KEY: &str = "message-key"; // When a lock is held, it is held for LOCK_DURATION (here 5 minutes) // It is renewed every LOCK_DURATION/3 // If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we // lost the lock. const LOCK_DURATION: Duration = Duration::from_secs(300); // In addition to checking when notified, also check for new mail every 10 minutes const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600); pub async fn incoming_mail_watch_process( user: Weak<User>, creds: Credentials, rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) { if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await { error!("Error in incoming mail watch process: {}", e); } } async fn incoming_mail_watch_process_internal( user: Weak<User>, creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); let k2v = creds.k2v_client()?; let s3 = creds.s3_client()?; let mut inbox: Option<Arc<Mailbox>> = None; let mut prev_ct: Option<CausalityToken> = None; loop { let new_mail = if *lock_held.borrow() { info!("incoming lock held"); let wait_new_mail = async { loop { match k2v_wait_value_changed(&k2v, &INCOMING_PK, &INCOMING_WATCH_SK, &prev_ct) .await { Ok(cv) => break cv, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } } } }; tokio::select! { cv = wait_new_mail => Some(cv.causality), _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } } else { info!("incoming lock not held"); tokio::select! { _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } }; let user = match Weak::upgrade(&user) { Some(user) => user, None => { info!("User no longer available, exiting incoming loop."); break; } }; info!("User still available"); // If INBOX no longer is same mailbox, open new mailbox let inbox_id = rx_inbox_id.borrow().clone(); if let Some((id, uidvalidity)) = inbox_id { if Some(id) != inbox.as_ref().map(|b| b.id) { match user.open_mailbox_by_id(id, uidvalidity).await { Ok(mb) => { inbox = Some(mb); } Err(e) => { inbox = None; error!("Error when opening inbox ({}): {}", id, e); tokio::time::sleep(Duration::from_secs(30)).await; continue; } } } } // If we were able to open INBOX, and we have mail, // fetch new mail if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { Ok(()) => { prev_ct = Some(new_ct); } Err(e) => { error!("Could not fetch incoming mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } } } } drop(rx_inbox_id); Ok(()) } async fn handle_incoming_mail( user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>, lock_held: &watch::Receiver<bool>, ) -> Result<()> { let mut lor = ListObjectsV2Request::default(); lor.bucket = user.creds.storage.bucket.clone(); lor.max_keys = Some(1000); lor.prefix = Some("incoming/".into()); let mails_res = s3.list_objects_v2(lor).await?; for object in mails_res.contents.unwrap_or_default() { if !*lock_held.borrow() { break; } if let Some(key) = object.key { if let Some(mail_id) = key.strip_prefix("incoming/") { if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { move_incoming_message(user, s3, inbox, mail_id).await?; } } } } Ok(()) } async fn move_incoming_message( user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>, id: UniqueIdent, ) -> Result<()> { info!("Moving incoming message: {}", id); let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 let mut gor = GetObjectRequest::default(); gor.bucket = user.creds.storage.bucket.clone(); gor.key = object_key.clone(); let get_result = s3.get_object(gor).await?; // 1.a decrypt message key from headers info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = get_result .metadata .as_ref() .ok_or(anyhow!("Missing key in metadata"))? .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; let key_encrypted = base64::decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, &user.creds.keys.secret, ) .map_err(|_| anyhow!("Cannot decrypt message key"))?; let message_key = cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); obj_body .into_async_read() .read_to_end(&mut mail_buf) .await?; let plain_mail = cryptoblob::open(&mail_buf, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox .append_from_s3(msg, id, &object_key, message_key) .await?; // 3 delete from incoming let mut dor = DeleteObjectRequest::default(); dor.bucket = user.creds.storage.bucket.clone(); dor.key = object_key.clone(); s3.delete_object(dor).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> { let (held_tx, held_rx) = watch::channel(false); tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); held_rx } #[derive(Clone, Debug)] enum LockState { Unknown, Empty, Held(UniqueIdent, u64, CausalityToken), } async fn k2v_lock_loop_internal( k2v: K2vClient, pk: &'static str, sk: &'static str, held_tx: watch::Sender<bool>, ) { let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown); let mut state_rx_2 = state_rx.clone(); let our_pid = gen_ident(); // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture<Result<()>> = async { let mut ct = None; loop { info!("k2v watch lock loop iter: ct = {:?}", ct); match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", e ); state_tx.send(LockState::Unknown)?; tokio::time::sleep(Duration::from_secs(30)).await; } Ok(cv) => { let mut lock_state = None; for v in cv.value.iter() { if let K2vValue::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); if lock_state .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2)) .unwrap_or(true) { lock_state = Some((pid, ts)); } } } } info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", ct, cv.causality, lock_state ); state_tx.send( lock_state .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) .unwrap_or(LockState::Empty), )?; ct = Some(cv.causality); } } } } .boxed(); // Loop 2: notify user whether we are holding the lock or not let lock_notify_loop: BoxFuture<Result<()>> = async { loop { let now = now_msec(); let held_with_expiration_time = match &*state_rx.borrow_and_update() { LockState::Held(pid, ts, _ct) if *pid == our_pid => { let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64; if now < expiration_time { Some(expiration_time) } else { None } } _ => None, }; let held = held_with_expiration_time.is_some(); if held != *held_tx.borrow() { held_tx.send(held)?; } let await_expired = async { match held_with_expiration_time { None => futures::future::pending().await, Some(expiration_time) => { tokio::time::sleep(Duration::from_millis(expiration_time - now)).await } }; }; tokio::select!( r = state_rx.changed() => { r?; } _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"), _ = await_expired => continue, ); } } .boxed(); // Loop 3: acquire lock when relevant let take_lock_loop: BoxFuture<Result<()>> = async { loop { let now = now_msec(); let state: LockState = state_rx_2.borrow_and_update().clone(); let (acquire_at, ct) = match state { LockState::Unknown => { // If state of the lock is unknown, don't try to acquire state_rx_2.changed().await?; continue; } LockState::Empty => (now, None), LockState::Held(pid, ts, ct) => { if pid == our_pid { (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct)) } else { (ts, Some(ct)) } } }; // Wait until it is time to acquire lock if acquire_at > now { tokio::select!( r = state_rx_2.changed() => { // If lock state changed in the meantime, don't acquire and loop around r?; continue; } _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => () ); } // Acquire lock let mut lock = vec![0u8; 32]; lock[..8].copy_from_slice(&u64::to_be_bytes( now_msec() + LOCK_DURATION.as_millis() as u64, )); lock[8..].copy_from_slice(&our_pid.0); if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } // Wait for new information to loop back state_rx_2.changed().await?; } } .boxed(); let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop); info!("lock loop exited, releasing"); if !held_tx.is_closed() { warn!("wierd..."); let _ = held_tx.send(false); } // If lock is ours, release it let release = match &*state_rx.borrow() { LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()), _ => None, }; if let Some(ct) = release { let _ = k2v.delete_item(pk, sk, ct.clone()).await; } } // ---- LMTP SIDE: storing messages encrypted with user's pubkey ---- pub struct EncryptedMessage { key: cryptoblob::Key, encrypted_body: Vec<u8>, } impl EncryptedMessage { pub fn new(body: Vec<u8>) -> Result<Self> { let key = cryptoblob::gen_key(); let encrypted_body = cryptoblob::seal(&body, &key)?; Ok(Self { key, encrypted_body, }) } pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> { let s3_client = creds.storage.s3_client()?; let k2v_client = creds.storage.k2v_client()?; // Get causality token of previous watch key let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { Err(_) => None, Ok(cv) => Some(cv.causality), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); let key_header = base64::encode(&encrypted_key); let mut por = PutObjectRequest::default(); por.bucket = creds.storage.bucket.clone(); por.key = format!("incoming/{}", gen_ident().to_string()); por.metadata = Some( [(MESSAGE_KEY.to_string(), key_header)] .into_iter() .collect::<HashMap<_, _>>(), ); por.body = Some(self.encrypted_body.clone().into()); s3_client.put_object(por).await?; // Update watch key to signal new mail k2v_client .insert_item( INCOMING_PK, INCOMING_WATCH_SK, gen_ident().0.to_vec(), watch_ct, ) .await?; Ok(()) } }