aboutsummaryrefslogtreecommitdiff
path: root/src/mail/incoming.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r--src/mail/incoming.rs81
1 files changed, 28 insertions, 53 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 3ea7d6a..4e3fc8c 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -22,6 +22,7 @@ use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::*;
use crate::mail::user::User;
use crate::mail::IMF;
+use crate::storage;
use crate::time::now_msec;
const INCOMING_PK: &str = "incoming";
@@ -60,18 +61,17 @@ async fn incoming_mail_watch_process_internal(
let s3 = creds.blob_client()?;
let mut inbox: Option<Arc<Mailbox>> = None;
- let mut prev_ct: Option<CausalityToken> = None;
+ let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK);
loop {
- let new_mail = if *lock_held.borrow() {
+ let maybe_updated_incoming_key = if *lock_held.borrow() {
info!("incoming lock held");
let wait_new_mail = async {
loop {
- match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct)
- .await
+ match incoming_key.poll().await
{
- Ok(cv) => break cv,
+ Ok(row_val) => break row_val.to_ref(),
Err(e) => {
error!("Error in wait_new_mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
@@ -81,10 +81,10 @@ async fn incoming_mail_watch_process_internal(
};
tokio::select! {
- cv = wait_new_mail => Some(cv.causality),
- _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(),
- _ = lock_held.changed() => None,
- _ = rx_inbox_id.changed() => None,
+ inc_k = wait_new_mail => Some(inc_k),
+ _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
}
} else {
info!("incoming lock not held");
@@ -123,10 +123,10 @@ async fn incoming_mail_watch_process_internal(
// If we were able to open INBOX, and we have mail,
// fetch new mail
- if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) {
+ if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
Ok(()) => {
- prev_ct = Some(new_ct);
+ incoming_key = updated_incoming_key;
}
Err(e) => {
error!("Could not fetch incoming mail: {}", e);
@@ -141,27 +141,20 @@ async fn incoming_mail_watch_process_internal(
async fn handle_incoming_mail(
user: &Arc<User>,
- s3: &S3Client,
+ blobs: &storage::BlobStore,
inbox: &Arc<Mailbox>,
lock_held: &watch::Receiver<bool>,
) -> Result<()> {
- let lor = ListObjectsV2Request {
- bucket: user.creds.storage.bucket.clone(),
- max_keys: Some(1000),
- prefix: Some("incoming/".into()),
- ..Default::default()
- };
- let mails_res = s3.list_objects_v2(lor).await?;
+ let mails_res = blobs.list("incoming/").await?;
- for object in mails_res.contents.unwrap_or_default() {
+ for object in mails_res {
if !*lock_held.borrow() {
break;
}
- if let Some(key) = object.key {
- if let Some(mail_id) = key.strip_prefix("incoming/") {
- if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
- move_incoming_message(user, s3, inbox, mail_id).await?;
- }
+ let key = object.key();
+ if let Some(mail_id) = key.strip_prefix("incoming/") {
+ if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
+ move_incoming_message(user, blobs, inbox, mail_id).await?;
}
}
}
@@ -171,7 +164,7 @@ async fn handle_incoming_mail(
async fn move_incoming_message(
user: &Arc<User>,
- s3: &S3Client,
+ s3: &storage::BlobStore,
inbox: &Arc<Mailbox>,
id: UniqueIdent,
) -> Result<()> {
@@ -180,20 +173,12 @@ async fn move_incoming_message(
let object_key = format!("incoming/{}", id);
// 1. Fetch message from S3
- let gor = GetObjectRequest {
- bucket: user.creds.storage.bucket.clone(),
- key: object_key.clone(),
- ..Default::default()
- };
- let get_result = s3.get_object(gor).await?;
+ let object = s3.blob(&object_key).fetch().await?;
// 1.a decrypt message key from headers
- info!("Object metadata: {:?}", get_result.metadata);
- let key_encrypted_b64 = get_result
- .metadata
- .as_ref()
- .ok_or(anyhow!("Missing key in metadata"))?
- .get(MESSAGE_KEY)
+ //info!("Object metadata: {:?}", get_result.metadata);
+ let key_encrypted_b64 = object
+ .get_meta(MESSAGE_KEY)
.ok_or(anyhow!("Missing key in metadata"))?;
let key_encrypted = base64::decode(key_encrypted_b64)?;
let message_key = sodiumoxide::crypto::sealedbox::open(
@@ -206,13 +191,8 @@ async fn move_incoming_message(
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
// 1.b retrieve message body
- let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?;
- let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize);
- obj_body
- .into_async_read()
- .read_to_end(&mut mail_buf)
- .await?;
- let plain_mail = cryptoblob::open(&mail_buf, &message_key)
+ let obj_body = object.content().ok_or(anyhow!("Missing object body"))?;
+ let plain_mail = cryptoblob::open(&obj_body, &message_key)
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
// 2 parse mail and add to inbox
@@ -222,19 +202,14 @@ async fn move_incoming_message(
.await?;
// 3 delete from incoming
- let dor = DeleteObjectRequest {
- bucket: user.creds.storage.bucket.clone(),
- key: object_key.clone(),
- ..Default::default()
- };
- s3.delete_object(dor).await?;
+ object.to_ref().rm().await?;
Ok(())
}
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
-fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
+fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
let (held_tx, held_rx) = watch::channel(false);
tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx));
@@ -250,7 +225,7 @@ enum LockState {
}
async fn k2v_lock_loop_internal(
- k2v: K2vClient,
+ k2v: storage::RowStore,
pk: &'static str,
sk: &'static str,
held_tx: watch::Sender<bool>,