diff options
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r-- | src/mail/incoming.rs | 103 |
1 files changed, 53 insertions, 50 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index e3c729f..f6b831d 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use base64::Engine; use futures::{future::BoxFuture, FutureExt}; //use tokio::io::AsyncReadExt; use tokio::sync::watch; @@ -50,13 +51,11 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK); - - let k2v = creds.row_client()?; - let s3 = creds.blob_client()?; + let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let storage = creds.storage.build()?; let mut inbox: Option<Arc<Mailbox>> = None; - let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK); + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); loop { let maybe_updated_incoming_key = if *lock_held.borrow() { @@ -64,9 +63,9 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match incoming_key.poll().await + match storage.row_poll(&incoming_key).await { - Ok(row_val) => break row_val.to_ref(), + Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -77,7 +76,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -119,7 +118,7 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { - match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { Ok(()) => { incoming_key = updated_incoming_key; } @@ -136,20 +135,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc<User>, - blobs: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc<Mailbox>, lock_held: &watch::Receiver<bool>, ) -> Result<()> { - let mails_res = blobs.list("incoming/").await?; + let mails_res = storage.blob_list("incoming/").await?; for object in mails_res { if !*lock_held.borrow() { break; } - let key = object.key(); + let key = object.0; if let Some(mail_id) = key.strip_prefix("incoming/") { if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { - move_incoming_message(user, blobs, inbox, mail_id).await?; + move_incoming_message(user, storage, inbox, mail_id).await?; } } } @@ -159,7 +158,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc<User>, - s3: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc<Mailbox>, id: UniqueIdent, ) -> Result<()> { @@ -168,14 +167,15 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let object = s3.blob(&object_key).fetch().await?; + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; // 1.a decrypt message key from headers //info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = object - .get_meta(MESSAGE_KEY) + .meta + .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::decode(key_encrypted_b64)?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, @@ -186,28 +186,28 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = object.content().ok_or(anyhow!("Missing object body"))?; + let obj_body = object.value; let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, object.to_ref(), message_key) + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) .await?; // 3 delete from incoming - object.to_ref().rm().await?; + storage.blob_rm(&object.blob_ref).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> { +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> { let (held_tx, held_rx) = watch::channel(false); - tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); held_rx } @@ -216,13 +216,12 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, storage::OrphanRowRef), + Held(UniqueIdent, u64, storage::RowRef), } async fn k2v_lock_loop_internal( - k2v: storage::RowStore, - pk: &'static str, - sk: &'static str, + storage: storage::Store, + row_ref: storage::RowRef, held_tx: watch::Sender<bool>, ) { let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown); @@ -232,10 +231,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture<Result<()>> = async { - let mut ct = k2v.row(pk, sk); + let mut ct = row_ref.clone(); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match ct.poll().await { + match storage.row_poll(&ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -246,7 +245,7 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.content().iter() { + for v in cv.value.iter() { if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); @@ -260,7 +259,7 @@ async fn k2v_lock_loop_internal( } } } - let new_ct = cv.to_ref(); + let new_ct = cv.row_ref; info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", @@ -268,7 +267,7 @@ async fn k2v_lock_loop_internal( ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) .unwrap_or(LockState::Empty), )?; ct = new_ct; @@ -358,10 +357,10 @@ async fn k2v_lock_loop_internal( )); lock[8..].copy_from_slice(&our_pid.0); let row = match ct { - Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"), - None => k2v.row(pk, sk), + Some(existing) => existing, + None => row_ref.clone(), }; - if let Err(e) = row.set_value(&lock).push().await { + if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -377,7 +376,7 @@ async fn k2v_lock_loop_internal( info!("lock loop exited, releasing"); if !held_tx.is_closed() { - warn!("wierd..."); + warn!("weird..."); let _ = held_tx.send(false); } @@ -387,8 +386,10 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let row = k2v.from_orphan(ct).expect("Incompatible source & target storage"); - let _ = row.rm().await; + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; } } @@ -410,30 +411,32 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.blob_store()?; - let k2v_client = creds.storage.row_store()?; + let storage = creds.storage.build()?; // Get causality token of previous watch key - let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); - let watch_ct = match query.fetch().await { + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { Err(_) => query, - Ok(cv) => cv.to_ref(), + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - let mut send = s3_client - .blob(&format!("incoming/{}", gen_ident())) - .set_value(self.encrypted_body.clone().into()); - send.set_meta(MESSAGE_KEY, &key_header); - send.push().await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ).with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(&blob_val).await?; // Update watch key to signal new mail - watch_ct.set_value(gen_ident().0.as_ref()).push().await?; - + let watch_val = storage::RowVal::new( + watch_ct.clone(), + gen_ident().0.to_vec(), + ); + storage.row_insert(vec![watch_val]).await?; Ok(()) } } |