diff options
Diffstat (limited to 'src/mail')
-rw-r--r-- | src/mail/incoming.rs | 21 | ||||
-rw-r--r-- | src/mail/mailbox.rs | 64 | ||||
-rw-r--r-- | src/mail/user.rs | 6 |
3 files changed, 60 insertions, 31 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 7e33a9a..04d2ef1 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -51,7 +51,10 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); let storage = creds.storage.build().await?; let mut inbox: Option<Arc<Mailbox>> = None; @@ -63,8 +66,7 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match storage.row_poll(&incoming_key).await - { + match storage.row_poll(&incoming_key).await { Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); @@ -360,7 +362,10 @@ async fn k2v_lock_loop_internal( Some(existing) => existing, None => row_ref.clone(), }; - if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -428,14 +433,12 @@ impl EncryptedMessage { let blob_val = storage::BlobVal::new( storage::BlobRef(format!("incoming/{}", gen_ident())), self.encrypted_body.clone().into(), - ).with_meta(MESSAGE_KEY.to_string(), key_header); + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); storage.blob_insert(blob_val).await?; // Update watch key to signal new mail - let watch_val = storage::RowVal::new( - watch_ct.clone(), - gen_ident().0.to_vec(), - ); + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); storage.row_insert(vec![watch_val]).await?; Ok(()) } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 6fb7dea..e424ba3 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -196,7 +196,10 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); - let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>(); + let ops = ids + .iter() + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) + .collect::<Vec<_>>(); let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; @@ -231,7 +234,10 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { - let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; let body = obj_res.value; cryptoblob::open(&body, message_key) } @@ -266,10 +272,12 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.storage.blob_insert(BlobVal::new( - BlobRef(format!("{}/{}", self.mail_path, ident)), - message_blob, - )).await?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; Ok::<_, anyhow::Error>(()) }, async { @@ -281,10 +289,12 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &ident.to_string()), - meta_blob, - )]).await?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -328,10 +338,12 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( + self.storage + .row_insert(vec![RowVal::new( RowRef::new(&self.mail_path, &ident.to_string()), meta_blob, - )]).await?; + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -355,17 +367,25 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let res = self.storage - .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) .await?; if let Some(row_val) = res.into_iter().next() { - self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?; + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; } Ok::<_, anyhow::Error>(()) } @@ -421,10 +441,12 @@ impl MailboxInternal { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &new_id.to_string()), - meta_blob, - )]).await?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/mail/user.rs b/src/mail/user.rs index 8d12c58..da0d509 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -226,7 +226,11 @@ impl User { async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> { let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); - let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), Ok(rv) => { |