aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lmtp.rs39
-rw-r--r--src/mail/incoming.rs219
-rw-r--r--src/mail/mailbox.rs2
-rw-r--r--src/mail/mod.rs2
-rw-r--r--src/mail/uidindex.rs3
-rw-r--r--src/mail/user.rs18
6 files changed, 229 insertions, 54 deletions
diff --git a/src/lmtp.rs b/src/lmtp.rs
index 5ab7429..78fe28d 100644
--- a/src/lmtp.rs
+++ b/src/lmtp.rs
@@ -21,6 +21,7 @@ use crate::config::*;
use crate::cryptoblob::*;
use crate::login::*;
use crate::mail::unique_ident::*;
+use crate::mail::incoming::EncryptedMessage;
pub struct LmtpServer {
bind_addr: SocketAddr,
@@ -210,41 +211,3 @@ impl Config for LmtpServer {
}
}
-// ----
-
-struct EncryptedMessage {
- key: Key,
- encrypted_body: Vec<u8>,
-}
-
-impl EncryptedMessage {
- fn new(body: Vec<u8>) -> Result<Self> {
- let key = gen_key();
- let encrypted_body = seal(&body, &key)?;
- Ok(Self {
- key,
- encrypted_body,
- })
- }
-
- async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
- let s3_client = creds.storage.s3_client()?;
-
- let encrypted_key =
- sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
- let key_header = base64::encode(&encrypted_key);
-
- let mut por = PutObjectRequest::default();
- por.bucket = creds.storage.bucket.clone();
- por.key = format!("incoming/{}", gen_ident().to_string());
- por.metadata = Some(
- [("Message-Key".to_string(), key_header)]
- .into_iter()
- .collect::<HashMap<_, _>>(),
- );
- por.body = Some(self.encrypted_body.clone().into());
- s3_client.put_object(por).await?;
-
- Ok(())
- }
-}
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 4455c91..24c7a2a 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -1,16 +1,223 @@
+use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::Duration;
+use anyhow::Result;
+use k2v_client::{CausalityToken, K2vClient, K2vValue};
+use rusoto_s3::{PutObjectRequest, S3Client, S3};
use tokio::sync::watch;
+use tracing::error;
-use crate::mail::unique_ident::UniqueIdent;
-use crate::mail::user::User;
+use crate::cryptoblob;
+use crate::login::{Credentials, PublicCredentials};
+use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::unique_ident::*;
+use crate::mail::user::User;
+use crate::time::now_msec;
+
+const INCOMING_PK: &str = "incoming";
+const INCOMING_LOCK_SK: &str = "lock";
+const INCOMING_WATCH_SK: &str = "watch";
+
+pub async fn incoming_mail_watch_process(
+ user: Weak<User>,
+ creds: Credentials,
+ rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) {
+ if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
+ error!("Error in incoming mail watch process: {}", e);
+ }
+}
+
+async fn incoming_mail_watch_process_internal(
+ user: Weak<User>,
+ creds: Credentials,
+ mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) -> Result<()> {
+ let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK);
+
+ let k2v = creds.k2v_client()?;
+ let s3 = creds.s3_client()?;
+
+ let mut inbox: Option<Arc<Mailbox>> = None;
+ let mut prev_ct: Option<CausalityToken> = None;
-pub async fn incoming_mail_watch_process(user: Weak<User>, rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>) {
- while Weak::upgrade(&user).is_some() {
- eprintln!("User still available");
- tokio::time::sleep(Duration::from_secs(10)).await;
+ loop {
+ let new_mail = if *lock_held.borrow() {
+ tokio::select! {
+ ct = wait_new_mail(&k2v, &prev_ct) => Some(ct),
+ _ = tokio::time::sleep(Duration::from_secs(300)) => prev_ct.take(),
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
+ }
+ } else {
+ tokio::select! {
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
+ }
+ };
+
+ if let Some(user) = Weak::upgrade(&user) {
+ eprintln!("User still available");
+
+ // If INBOX no longer is same mailbox, open new mailbox
+ let inbox_id = rx_inbox_id.borrow().clone();
+ if let Some((id, uidvalidity)) = inbox_id {
+ if Some(id) != inbox.as_ref().map(|b| b.id) {
+ match user.open_mailbox_by_id(id, uidvalidity).await {
+ Ok(mb) => {
+ inbox = mb;
+ }
+ Err(e) => {
+ inbox = None;
+ error!("Error when opening inbox ({}): {}", id, e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ }
+
+ // If we were able to open INBOX, and we have mail (implies lock is held),
+ // fetch new mail
+ if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) {
+ match handle_incoming_mail(&user, &s3, inbox).await {
+ Ok(()) => {
+ prev_ct = Some(new_ct);
+ }
+ Err(e) => {
+ error!("Could not fetch incoming mail: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ } else {
+ eprintln!("User no longer available, exiting incoming loop.");
+ break;
+ }
}
drop(rx_inbox_id);
+ Ok(())
+}
+
+async fn wait_new_mail(k2v: &K2vClient, prev_ct: &Option<CausalityToken>) -> CausalityToken {
+ loop {
+ if let Some(ct) = &prev_ct {
+ match k2v
+ .poll_item(INCOMING_PK, INCOMING_WATCH_SK, ct.clone(), None)
+ .await
+ {
+ Err(e) => {
+ error!("Error when waiting for incoming watch: {}, sleeping", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(None) => continue,
+ Ok(Some(cv)) => {
+ return cv.causality;
+ }
+ }
+ } else {
+ match k2v.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
+ Err(k2v_client::Error::NotFound) => {
+ if let Err(e) = k2v
+ .insert_item(INCOMING_PK, INCOMING_WATCH_SK, vec![0u8], None)
+ .await
+ {
+ error!("Error when waiting for incoming watch: {}, sleeping", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ Err(e) => {
+ error!("Error when waiting for incoming watch: {}, sleeping", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(cv) => {
+ return cv.causality;
+ }
+ }
+ }
+ }
+}
+
+async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>) -> Result<()> {
+ unimplemented!()
+}
+
+fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
+ let (held_tx, held_rx) = watch::channel(false);
+
+ tokio::spawn(async move {
+ if let Err(e) = k2v_lock_loop_internal(k2v, pk, sk, &held_tx).await {
+ error!("Error in k2v locking loop: {}", e);
+ }
+ let _ = held_tx.send(false);
+ });
+
+ held_rx
+}
+
+async fn k2v_lock_loop_internal(
+ k2v: K2vClient,
+ pk: &'static str,
+ sk: &'static str,
+ held_tx: &watch::Sender<bool>,
+) -> Result<()> {
+ unimplemented!()
+}
+
+// ----
+
+pub struct EncryptedMessage {
+ key: cryptoblob::Key,
+ encrypted_body: Vec<u8>,
+}
+
+impl EncryptedMessage {
+ pub fn new(body: Vec<u8>) -> Result<Self> {
+ let key = cryptoblob::gen_key();
+ let encrypted_body = cryptoblob::seal(&body, &key)?;
+ Ok(Self {
+ key,
+ encrypted_body,
+ })
+ }
+
+ pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
+ let s3_client = creds.storage.s3_client()?;
+ let k2v_client = creds.storage.k2v_client()?;
+
+ // Get causality token of previous watch key
+ let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
+ Err(_) => None,
+ Ok(cv) => Some(cv.causality),
+ };
+
+ // Write mail to encrypted storage
+ let encrypted_key =
+ sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
+ let key_header = base64::encode(&encrypted_key);
+
+ let mut por = PutObjectRequest::default();
+ por.bucket = creds.storage.bucket.clone();
+ por.key = format!("incoming/{}", gen_ident().to_string());
+ por.metadata = Some(
+ [("Message-Key".to_string(), key_header)]
+ .into_iter()
+ .collect::<HashMap<_, _>>(),
+ );
+ por.body = Some(self.encrypted_body.clone().into());
+ s3_client.put_object(por).await?;
+
+ // Update watch key to signal new mail
+ k2v_client
+ .insert_item(
+ INCOMING_PK,
+ INCOMING_WATCH_SK,
+ gen_ident().0.to_vec(),
+ watch_ct,
+ )
+ .await?;
+
+ Ok(())
+ }
}
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index c4fa435..35b681a 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -17,7 +17,7 @@ use crate::mail::IMF;
use crate::time::now_msec;
pub struct Mailbox {
- id: UniqueIdent,
+ pub(super) id: UniqueIdent,
mbox: RwLock<MailboxInternal>,
}
diff --git a/src/mail/mod.rs b/src/mail/mod.rs
index 94a1712..b6054b0 100644
--- a/src/mail/mod.rs
+++ b/src/mail/mod.rs
@@ -1,10 +1,10 @@
use std::convert::TryFrom;
+pub mod incoming;
pub mod mailbox;
pub mod uidindex;
pub mod unique_ident;
pub mod user;
-pub mod incoming;
// Internet Message Format
// aka RFC 822 - RFC 2822 - RFC 5322
diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs
index 736c763..7166ea7 100644
--- a/src/mail/uidindex.rs
+++ b/src/mail/uidindex.rs
@@ -163,7 +163,8 @@ impl BayouState for UidIndex {
}
}
UidIndexOp::BumpUidvalidity(count) => {
- new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count).unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap());
+ new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count)
+ .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap());
}
}
new
diff --git a/src/mail/user.rs b/src/mail/user.rs
index fefb084..c2d1d85 100644
--- a/src/mail/user.rs
+++ b/src/mail/user.rs
@@ -9,10 +9,10 @@ use tokio::sync::watch;
use crate::cryptoblob::{open_deserialize, seal_serialize};
use crate::login::{Credentials, StorageCredentials};
+use crate::mail::incoming::incoming_mail_watch_process;
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::{gen_ident, UniqueIdent};
-use crate::mail::incoming::incoming_mail_watch_process;
use crate::time::now_msec;
const MAILBOX_HIERARCHY_DELIMITER: &str = "/";
@@ -119,7 +119,7 @@ impl User {
let user = Arc::new(Self {
username,
- creds,
+ creds: creds.clone(),
k2v,
tx_inbox_id,
mailboxes: std::sync::Mutex::new(HashMap::new()),
@@ -128,12 +128,16 @@ impl User {
// Ensure INBOX exists (done inside load_mailbox_list)
user.load_mailbox_list().await?;
- tokio::spawn(incoming_mail_watch_process(Arc::downgrade(&user), rx_inbox_id));
+ tokio::spawn(incoming_mail_watch_process(
+ Arc::downgrade(&user),
+ user.creds.clone(),
+ rx_inbox_id,
+ ));
Ok(user)
}
- async fn open_mailbox_by_id(
+ pub(super) async fn open_mailbox_by_id(
&self,
id: UniqueIdent,
min_uidvalidity: ImapUidvalidity,
@@ -167,15 +171,15 @@ impl User {
let mut list = BTreeMap::new();
for v in cv.value {
if let K2vValue::Value(vbytes) = v {
- let list2 = open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
+ let list2 =
+ open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
list = merge_mailbox_lists(list, list2);
}
}
(list, Some(cv.causality))
- },
+ }
};
-
// If INBOX doesn't exist, create a new mailbox with that name
// and save new mailbox list.
// Also, ensure that the mpsc::watch that keeps track of the