aboutsummaryrefslogtreecommitdiff
path: root/src/mail/mailbox.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-29 19:24:21 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-29 19:24:21 +0200
commit509e7e4bed3b011b5e0b276db3900edb9fee7255 (patch)
tree8bf760b1c367160ce4364f1485bc5598b78ec36b /src/mail/mailbox.rs
parenta8d0e4a994daca39f9619cddf2847c1a7820c040 (diff)
downloadaerogramme-509e7e4bed3b011b5e0b276db3900edb9fee7255.tar.gz
aerogramme-509e7e4bed3b011b5e0b276db3900edb9fee7255.zip
Implement beginning of a FETCH command
Diffstat (limited to 'src/mail/mailbox.rs')
-rw-r--r--src/mail/mailbox.rs189
1 files changed, 165 insertions, 24 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index 6801ab7..3ebaa0b 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -1,14 +1,20 @@
-use anyhow::Result;
+use anyhow::{anyhow, bail, Result};
use k2v_client::K2vClient;
-use rusoto_s3::S3Client;
+use k2v_client::{BatchReadOp, Filter, K2vValue};
+use rusoto_s3::{
+ DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
+};
+use serde::{Deserialize, Serialize};
+use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
-use crate::cryptoblob::Key;
+use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
+use crate::time::now_msec;
pub struct Mailbox {
id: UniqueIdent,
@@ -48,8 +54,8 @@ impl Mailbox {
}
/// Insert an email in the mailbox
- pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> {
- unimplemented!()
+ pub async fn append<'a>(&self, msg: IMF<'a>) -> Result<()> {
+ self.mbox.write().await.append(msg, None).await
}
/// Copy an email from an other Mailbox to this mailbox
@@ -58,21 +64,15 @@ impl Mailbox {
unimplemented!()
}
- /// Delete all emails with the \Delete flag in the mailbox
- /// Can be called by CLOSE and EXPUNGE
- /// @FIXME do we want to implement this feature or a simpler "delete" command
- /// The controller could then "fetch \Delete" and call delete on each email?
- pub async fn expunge(&self) -> Result<()> {
- unimplemented!()
- }
-
- /// Update flags of a range of emails
- pub async fn store(&self) -> Result<()> {
- unimplemented!()
+ /// Fetch the metadata (headers + some more info) of the specified
+ /// mail IDs
+ pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ self.mbox.read().await.fetch_meta(ids).await
}
- pub async fn fetch(&self) -> Result<()> {
- unimplemented!()
+ /// Fetch an entire e-mail
+ pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ self.mbox.read().await.fetch_full(id, message_key).await
}
/// Test procedure TODO WILL REMOVE THIS
@@ -98,17 +98,142 @@ struct MailboxInternal {
}
impl MailboxInternal {
- pub async fn test(&mut self) -> Result<()> {
- self.uid_index.sync().await?;
+ async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
+ let ops = ids
+ .iter()
+ .map(|id| BatchReadOp {
+ partition_key: &self.mail_path,
+ filter: Filter {
+ start: Some(id),
+ end: None,
+ prefix: None,
+ limit: None,
+ reverse: false,
+ },
+ single_item: true,
+ conflicts_only: false,
+ tombstones: false,
+ })
+ .collect::<Vec<_>>();
+ let res_vec = self.k2v.read_batch(&ops).await?;
- dump(&self.uid_index);
+ let mut meta_vec = vec![];
+ for res in res_vec {
+ if res.items.len() != 1 {
+ bail!("Expected 1 item, got {}", res.items.len());
+ }
+ let (_, cv) = res.items.iter().next().unwrap();
+ if cv.value.len() != 1 {
+ bail!("Expected 1 value, got {}", cv.value.len());
+ }
+ match &cv.value[0] {
+ K2vValue::Tombstone => bail!("Expected value, got tombstone"),
+ K2vValue::Value(v) => {
+ let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
+ meta_vec.push(meta);
+ }
+ }
+ }
+
+ Ok(meta_vec)
+ }
+
+ async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ let mut gor = GetObjectRequest::default();
+ gor.bucket = self.bucket.clone();
+ gor.key = format!("{}/{}", self.mail_path, id);
+
+ let obj_res = self.s3.get_object(gor).await?;
+
+ let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
+ let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
+ obj_body.into_async_read().read_to_end(&mut buf).await?;
+
+ Ok(cryptoblob::open(&buf, &message_key)?)
+ }
+
+ async fn append(&mut self, mail: IMF<'_>, ident: Option<UniqueIdent>) -> Result<()> {
+ let ident = ident.unwrap_or_else(|| gen_ident());
+ let message_key = gen_key();
+
+ futures::try_join!(
+ async {
+ // Encrypt and save mail body
+ let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
+ let mut por = PutObjectRequest::default();
+ por.bucket = self.bucket.clone();
+ por.key = format!("{}/{}", self.mail_path, ident);
+ por.body = Some(message_blob.into());
+ self.s3.put_object(por).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Save mail meta
+ let meta = MailMeta {
+ internaldate: now_msec(),
+ headers: mail.raw[..mail.parsed.offset_body].to_vec(),
+ message_key: message_key.clone(),
+ rfc822_size: mail.raw.len(),
+ };
+ let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?;
+ self.k2v
+ .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ }
+ )?;
+ // Add mail to Bayou mail index
let add_mail_op = self
.uid_index
.state()
- .op_mail_add(gen_ident(), vec!["\\Unseen".into()]);
+ .op_mail_add(ident, vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
+ Ok(())
+ }
+
+ async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
+ let del_mail_op = self.uid_index.state().op_mail_del(ident);
+ self.uid_index.push(del_mail_op).await?;
+
+ futures::try_join!(
+ async {
+ // Delete mail body from S3
+ let mut dor = DeleteObjectRequest::default();
+ dor.bucket = self.bucket.clone();
+ dor.key = format!("{}/{}", self.mail_path, ident);
+ self.s3.delete_object(dor).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Delete mail meta from K2V
+ let sk = ident.to_string();
+ let v = self.k2v.read_item(&self.mail_path, &sk).await?;
+ self.k2v
+ .delete_item(&self.mail_path, &sk, v.causality)
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ }
+ )?;
+ Ok(())
+ }
+
+ // ----
+
+ async fn test(&mut self) -> Result<()> {
+ self.uid_index.sync().await?;
+
+ dump(&self.uid_index);
+
+ let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
+Subject: Welcome to Aerogramme!!
+
+This is just a test email, feel free to ignore."#;
+ let mail = IMF::try_from(&mail[..]).unwrap();
+ self.append(mail, None).await?;
+
dump(&self.uid_index);
if self.uid_index.state().idx_by_uid.len() > 6 {
@@ -121,8 +246,8 @@ impl MailboxInternal {
.skip(3 + i)
.next()
.unwrap();
- let del_mail_op = self.uid_index.state().op_mail_del(*ident);
- self.uid_index.push(del_mail_op).await?;
+
+ self.delete(*ident).await?;
dump(&self.uid_index);
}
@@ -148,3 +273,19 @@ fn dump(uid_index: &Bayou<UidIndex>) {
}
println!("");
}
+
+// ----
+
+/// The metadata of a message that is stored in K2V
+/// at pk = mail/<mailbox uuid>, sk = <message uuid>
+#[derive(Serialize, Deserialize)]
+pub struct MailMeta {
+ /// INTERNALDATE field (milliseconds since epoch)
+ pub internaldate: u64,
+ /// Headers of the message
+ pub headers: Vec<u8>,
+ /// Secret key for decrypting entire message
+ pub message_key: Key,
+ /// RFC822 size
+ pub rfc822_size: usize,
+}