diff options
Diffstat (limited to 'src/mail/mailbox.rs')
-rw-r--r-- | src/mail/mailbox.rs | 159 |
1 files changed, 64 insertions, 95 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index d92140d..e424ba3 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,11 +1,5 @@ use anyhow::{anyhow, bail, Result}; -use k2v_client::K2vClient; -use k2v_client::{BatchReadOp, Filter, K2vValue}; -use rusoto_s3::{ - CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3, -}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; @@ -14,7 +8,8 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::time::now_msec; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; +use crate::timestamp::now_msec; pub struct Mailbox { pub(super) id: UniqueIdent, @@ -30,7 +25,7 @@ impl Mailbox { let index_path = format!("index/{}", id); let mail_path = format!("mail/{}", id); - let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?; + let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?; uid_index.sync().await?; let uidvalidity = uid_index.state().uidvalidity; @@ -48,10 +43,8 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, - bucket: creds.bucket().to_string(), encryption_key: creds.keys.master.clone(), - k2v: creds.k2v_client()?, - s3: creds.s3_client()?, + storage: creds.storage.build().await?, uid_index, mail_path, }); @@ -121,13 +114,13 @@ impl Mailbox { &self, msg: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_ref: storage::BlobRef, message_key: Key, ) -> Result<()> { self.mbox .write() .await - .append_from_s3(msg, ident, s3_key, message_key) + .append_from_s3(msg, ident, blob_ref, message_key) .await } @@ -182,13 +175,9 @@ struct MailboxInternal { // 2023-05-15 will probably be used later. #[allow(dead_code)] id: UniqueIdent, - bucket: String, mail_path: String, encryption_key: Key, - - k2v: K2vClient, - s3: S3Client, - + storage: Store, uid_index: Bayou<UidIndex>, } @@ -209,33 +198,19 @@ impl MailboxInternal { let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let ops = ids .iter() - .map(|id| BatchReadOp { - partition_key: &self.mail_path, - filter: Filter { - start: Some(id), - end: None, - prefix: None, - limit: None, - reverse: false, - }, - single_item: true, - conflicts_only: false, - tombstones: false, - }) + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) .collect::<Vec<_>>(); - let res_vec = self.k2v.read_batch(&ops).await?; + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; - for (op, res) in ops.iter().zip(res_vec.into_iter()) { - if res.items.len() != 1 { - bail!("Expected 1 item, got {}", res.items.len()); - } - let (_, cv) = res.items.iter().next().unwrap(); + for res in res_vec.into_iter() { let mut meta_opt = None; - for v in cv.value.iter() { + + // Resolve conflicts + for v in res.value.iter() { match v { - K2vValue::Tombstone => (), - K2vValue::Value(v) => { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?; match meta_opt.as_mut() { None => { @@ -251,7 +226,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", op.filter.start); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -259,19 +234,12 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, id), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - - cryptoblob::open(&buf, message_key) + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -304,13 +272,12 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - body: Some(message_blob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; Ok::<_, anyhow::Error>(()) }, async { @@ -322,8 +289,11 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, @@ -349,20 +319,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - copy_source: format!("{}/{}", self.bucket, s3_key), - metadata_directive: Some("REPLACE".into()), - ..Default::default() - }; - self.s3.copy_object(cor).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -374,8 +338,11 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, @@ -400,21 +367,26 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let v = self.k2v.read_item(&self.mail_path, &sk).await?; - self.k2v - .delete_item(&self.mail_path, &sk, v.causality) + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; + } Ok::<_, anyhow::Error>(()) } )?; @@ -445,7 +417,7 @@ impl MailboxInternal { source_id: UniqueIdent, new_id: UniqueIdent, ) -> Result<()> { - if self.bucket != from.bucket || self.encryption_key != from.encryption_key { + if self.encryption_key != from.encryption_key { bail!("Message to be copied/moved does not belong to same account."); } @@ -460,23 +432,20 @@ impl MailboxInternal { futures::try_join!( async { - // Copy mail body from S3 - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, new_id), - copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id), - ..Default::default() - }; - - self.s3.copy_object(cor).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, |