diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-18 17:09:44 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-18 17:09:44 +0100 |
commit | 3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf (patch) | |
tree | fff5d16e266788b28e812c24669f50118831512b /src/mail/mailbox.rs | |
parent | 684f4de225c44464abcb6a9cb2ef6dcae90537a8 (diff) | |
download | aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.tar.gz aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.zip |
Storage trait new implementation
Diffstat (limited to 'src/mail/mailbox.rs')
-rw-r--r-- | src/mail/mailbox.rs | 59 |
1 files changed, 34 insertions, 25 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 060267a..b4afd5e 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{RowStore, BlobStore, self}; +use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -44,8 +44,7 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, encryption_key: creds.keys.master.clone(), - k2v: creds.storage.row_store()?, - s3: creds.storage.blob_store()?, + storage: creds.storage.build()?, uid_index, mail_path, }); @@ -178,10 +177,7 @@ struct MailboxInternal { id: UniqueIdent, mail_path: String, encryption_key: Key, - - k2v: RowStore, - s3: BlobStore, - + storage: Store, uid_index: Bayou<UidIndex>, } @@ -200,15 +196,15 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); - let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>(); - let res_vec = self.k2v.select(storage::Selector::List(ops)).await?; + let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>(); + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; for res in res_vec.into_iter() { let mut meta_opt = None; // Resolve conflicts - for v in res.content().iter() { + for v in res.value.iter() { match v { storage::Alternative::Tombstone => (), storage::Alternative::Value(v) => { @@ -227,7 +223,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", res.to_ref().key()); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -235,9 +231,9 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { - let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?; - let body = obj_res.content().ok_or(anyhow!("missing body"))?; - cryptoblob::open(body, message_key) + let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -270,7 +266,10 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?; + self.storage.blob_insert(&BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -282,7 +281,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -307,14 +309,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - blob_ref: storage::BlobRef, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident)); - blob_ref.copy(&dst).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -326,7 +328,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -350,13 +355,13 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?; + self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?; + self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?; Ok::<_, anyhow::Error>(()) } )?; @@ -402,15 +407,19 @@ impl MailboxInternal { futures::try_join!( async { - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id)); - self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), |