diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-29 19:24:21 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-29 19:24:21 +0200 |
commit | 509e7e4bed3b011b5e0b276db3900edb9fee7255 (patch) | |
tree | 8bf760b1c367160ce4364f1485bc5598b78ec36b /src/mail/mailbox.rs | |
parent | a8d0e4a994daca39f9619cddf2847c1a7820c040 (diff) | |
download | aerogramme-509e7e4bed3b011b5e0b276db3900edb9fee7255.tar.gz aerogramme-509e7e4bed3b011b5e0b276db3900edb9fee7255.zip |
Implement beginning of a FETCH command
Diffstat (limited to 'src/mail/mailbox.rs')
-rw-r--r-- | src/mail/mailbox.rs | 189 |
1 files changed, 165 insertions, 24 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 6801ab7..3ebaa0b 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,14 +1,20 @@ -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; use k2v_client::K2vClient; -use rusoto_s3::S3Client; +use k2v_client::{BatchReadOp, Filter, K2vValue}; +use rusoto_s3::{ + DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, +}; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; -use crate::cryptoblob::Key; +use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; +use crate::time::now_msec; pub struct Mailbox { id: UniqueIdent, @@ -48,8 +54,8 @@ impl Mailbox { } /// Insert an email in the mailbox - pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> { - unimplemented!() + pub async fn append<'a>(&self, msg: IMF<'a>) -> Result<()> { + self.mbox.write().await.append(msg, None).await } /// Copy an email from an other Mailbox to this mailbox @@ -58,21 +64,15 @@ impl Mailbox { unimplemented!() } - /// Delete all emails with the \Delete flag in the mailbox - /// Can be called by CLOSE and EXPUNGE - /// @FIXME do we want to implement this feature or a simpler "delete" command - /// The controller could then "fetch \Delete" and call delete on each email? - pub async fn expunge(&self) -> Result<()> { - unimplemented!() - } - - /// Update flags of a range of emails - pub async fn store(&self) -> Result<()> { - unimplemented!() + /// Fetch the metadata (headers + some more info) of the specified + /// mail IDs + pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { + self.mbox.read().await.fetch_meta(ids).await } - pub async fn fetch(&self) -> Result<()> { - unimplemented!() + /// Fetch an entire e-mail + pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { + self.mbox.read().await.fetch_full(id, message_key).await } /// Test procedure TODO WILL REMOVE THIS @@ -98,17 +98,142 @@ struct MailboxInternal { } impl MailboxInternal { - pub async fn test(&mut self) -> Result<()> { - self.uid_index.sync().await?; + async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { + let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); + let ops = ids + .iter() + .map(|id| BatchReadOp { + partition_key: &self.mail_path, + filter: Filter { + start: Some(id), + end: None, + prefix: None, + limit: None, + reverse: false, + }, + single_item: true, + conflicts_only: false, + tombstones: false, + }) + .collect::<Vec<_>>(); + let res_vec = self.k2v.read_batch(&ops).await?; - dump(&self.uid_index); + let mut meta_vec = vec![]; + for res in res_vec { + if res.items.len() != 1 { + bail!("Expected 1 item, got {}", res.items.len()); + } + let (_, cv) = res.items.iter().next().unwrap(); + if cv.value.len() != 1 { + bail!("Expected 1 value, got {}", cv.value.len()); + } + match &cv.value[0] { + K2vValue::Tombstone => bail!("Expected value, got tombstone"), + K2vValue::Value(v) => { + let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?; + meta_vec.push(meta); + } + } + } + + Ok(meta_vec) + } + + async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { + let mut gor = GetObjectRequest::default(); + gor.bucket = self.bucket.clone(); + gor.key = format!("{}/{}", self.mail_path, id); + + let obj_res = self.s3.get_object(gor).await?; + + let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; + let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); + obj_body.into_async_read().read_to_end(&mut buf).await?; + + Ok(cryptoblob::open(&buf, &message_key)?) + } + + async fn append(&mut self, mail: IMF<'_>, ident: Option<UniqueIdent>) -> Result<()> { + let ident = ident.unwrap_or_else(|| gen_ident()); + let message_key = gen_key(); + + futures::try_join!( + async { + // Encrypt and save mail body + let message_blob = cryptoblob::seal(mail.raw, &message_key)?; + let mut por = PutObjectRequest::default(); + por.bucket = self.bucket.clone(); + por.key = format!("{}/{}", self.mail_path, ident); + por.body = Some(message_blob.into()); + self.s3.put_object(por).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.raw[..mail.parsed.offset_body].to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?; + self.k2v + .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + .await?; + Ok::<_, anyhow::Error>(()) + } + )?; + // Add mail to Bayou mail index let add_mail_op = self .uid_index .state() - .op_mail_add(gen_ident(), vec!["\\Unseen".into()]); + .op_mail_add(ident, vec!["\\Unseen".into()]); self.uid_index.push(add_mail_op).await?; + Ok(()) + } + + async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { + let del_mail_op = self.uid_index.state().op_mail_del(ident); + self.uid_index.push(del_mail_op).await?; + + futures::try_join!( + async { + // Delete mail body from S3 + let mut dor = DeleteObjectRequest::default(); + dor.bucket = self.bucket.clone(); + dor.key = format!("{}/{}", self.mail_path, ident); + self.s3.delete_object(dor).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Delete mail meta from K2V + let sk = ident.to_string(); + let v = self.k2v.read_item(&self.mail_path, &sk).await?; + self.k2v + .delete_item(&self.mail_path, &sk, v.causality) + .await?; + Ok::<_, anyhow::Error>(()) + } + )?; + Ok(()) + } + + // ---- + + async fn test(&mut self) -> Result<()> { + self.uid_index.sync().await?; + + dump(&self.uid_index); + + let mail = br#"From: Garage team <garagehq@deuxfleurs.fr> +Subject: Welcome to Aerogramme!! + +This is just a test email, feel free to ignore."#; + let mail = IMF::try_from(&mail[..]).unwrap(); + self.append(mail, None).await?; + dump(&self.uid_index); if self.uid_index.state().idx_by_uid.len() > 6 { @@ -121,8 +246,8 @@ impl MailboxInternal { .skip(3 + i) .next() .unwrap(); - let del_mail_op = self.uid_index.state().op_mail_del(*ident); - self.uid_index.push(del_mail_op).await?; + + self.delete(*ident).await?; dump(&self.uid_index); } @@ -148,3 +273,19 @@ fn dump(uid_index: &Bayou<UidIndex>) { } println!(""); } + +// ---- + +/// The metadata of a message that is stored in K2V +/// at pk = mail/<mailbox uuid>, sk = <message uuid> +#[derive(Serialize, Deserialize)] +pub struct MailMeta { + /// INTERNALDATE field (milliseconds since epoch) + pub internaldate: u64, + /// Headers of the message + pub headers: Vec<u8>, + /// Secret key for decrypting entire message + pub message_key: Key, + /// RFC822 size + pub rfc822_size: usize, +} |