aboutsummaryrefslogtreecommitdiff
path: root/src/mail/incoming.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r--src/mail/incoming.rs96
1 files changed, 89 insertions, 7 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index d87456f..86cbc07 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -1,4 +1,5 @@
use std::collections::HashMap;
+use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
@@ -6,7 +7,10 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, Future, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
-use rusoto_s3::{PutObjectRequest, S3Client, S3};
+use rusoto_s3::{
+ GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
+};
+use tokio::io::AsyncReadExt;
use tokio::sync::watch;
use tracing::{error, info, warn};
@@ -16,12 +20,15 @@ use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::*;
use crate::mail::user::User;
+use crate::mail::IMF;
use crate::time::now_msec;
const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch";
+const MESSAGE_KEY: &str = "Message-Key";
+
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
// It is renewed every LOCK_DURATION/3
// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
@@ -113,10 +120,10 @@ async fn incoming_mail_watch_process_internal(
}
}
- // If we were able to open INBOX, and we have mail (implies lock is held),
+ // If we were able to open INBOX, and we have mail,
// fetch new mail
if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) {
- match handle_incoming_mail(&user, &s3, inbox).await {
+ match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
Ok(()) => {
prev_ct = Some(new_ct);
}
@@ -131,8 +138,84 @@ async fn incoming_mail_watch_process_internal(
Ok(())
}
-async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>) -> Result<()> {
- unimplemented!()
+async fn handle_incoming_mail(
+ user: &Arc<User>,
+ s3: &S3Client,
+ inbox: &Arc<Mailbox>,
+ lock_held: &watch::Receiver<bool>,
+) -> Result<()> {
+ let mut lor = ListObjectsV2Request::default();
+ lor.bucket = user.creds.storage.bucket.clone();
+ lor.max_keys = Some(1000);
+ lor.prefix = Some("incoming/".into());
+ let mails_res = s3.list_objects_v2(lor).await?;
+
+ for object in mails_res.contents.unwrap_or_default() {
+ if !*lock_held.borrow() {
+ break;
+ }
+ if let Some(key) = object.key {
+ if let Some(mail_id) = key.strip_prefix("incoming/") {
+ if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
+ move_incoming_message(user, s3, inbox, mail_id).await?;
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn move_incoming_message(
+ user: &Arc<User>,
+ s3: &S3Client,
+ inbox: &Arc<Mailbox>,
+ id: UniqueIdent,
+) -> Result<()> {
+ info!("Moving incoming message: {}", id);
+
+ let object_key = format!("incoming/{}", id);
+
+ // 1. Fetch message from S3
+ let mut gor = GetObjectRequest::default();
+ gor.bucket = user.creds.storage.bucket.clone();
+ gor.key = object_key.clone();
+ let get_result = s3.get_object(gor).await?;
+
+ // 1.a decrypt message key from headers
+ let key_encrypted_b64 = get_result
+ .metadata
+ .as_ref()
+ .ok_or(anyhow!("Missing key in metadata"))?
+ .get(MESSAGE_KEY)
+ .ok_or(anyhow!("Missing key in metadata"))?;
+ let key_encrypted = base64::decode(key_encrypted_b64)?;
+ let message_key = sodiumoxide::crypto::sealedbox::open(
+ &key_encrypted,
+ &user.creds.keys.public,
+ &user.creds.keys.secret,
+ )
+ .map_err(|_| anyhow!("Cannot decrypt message key"))?;
+ let message_key =
+ cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
+
+ // 1.b retrieve message body
+ let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?;
+ let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize);
+ obj_body
+ .into_async_read()
+ .read_to_end(&mut mail_buf)
+ .await?;
+ let plain_mail = cryptoblob::open(&mail_buf, &message_key)
+ .map_err(|_| anyhow!("Cannot decrypt email content"))?;
+
+ // 2 parse mail and add to inbox
+ let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
+ inbox
+ .append_from_s3(msg, id, &object_key, message_key)
+ .await?;
+
+ Ok(())
}
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
@@ -307,7 +390,6 @@ async fn k2v_lock_loop_internal(
if let Some(ct) = release {
let _ = k2v.delete_item(pk, sk, ct.clone()).await;
}
-
}
// ---- UTIL: function to wait for a value to have changed in K2V ----
@@ -372,7 +454,7 @@ impl EncryptedMessage {
por.bucket = creds.storage.bucket.clone();
por.key = format!("incoming/{}", gen_ident().to_string());
por.metadata = Some(
- [("Message-Key".to_string(), key_header)]
+ [(MESSAGE_KEY.to_string(), key_header)]
.into_iter()
.collect::<HashMap<_, _>>(),
);