aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/imap/command/selected.rs15
-rw-r--r--src/imap/mailbox_view.rs122
-rw-r--r--src/mail/mailbox.rs189
-rw-r--r--src/mail/mod.rs11
4 files changed, 306 insertions, 31 deletions
diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs
index 3a44a3f..e10eab6 100644
--- a/src/imap/command/selected.rs
+++ b/src/imap/command/selected.rs
@@ -42,17 +42,22 @@ pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::T
impl<'a> SelectedContext<'a> {
pub async fn fetch(
self,
- _sequence_set: &SequenceSet,
- _attributes: &MacroOrFetchAttributes,
- _uid: &bool,
+ sequence_set: &SequenceSet,
+ attributes: &MacroOrFetchAttributes,
+ uid: &bool,
) -> Result<(Response, flow::Transition)> {
- Ok((Response::bad("Not implemented")?, flow::Transition::None))
+ let resp = self.mailbox.fetch(sequence_set, attributes, uid).await?;
+
+ Ok((
+ Response::ok("FETCH completed")?.with_body(resp),
+ flow::Transition::None,
+ ))
}
pub async fn noop(self) -> Result<(Response, flow::Transition)> {
let updates = self.mailbox.update().await?;
Ok((
- Response::ok("Noop completed.")?.with_body(updates),
+ Response::ok("NOOP completed.")?.with_body(updates),
flow::Transition::None,
))
}
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs
index 6066528..df9c3fe 100644
--- a/src/imap/mailbox_view.rs
+++ b/src/imap/mailbox_view.rs
@@ -1,11 +1,14 @@
use std::num::NonZeroU32;
use std::sync::Arc;
-use anyhow::{Error, Result};
+use anyhow::{anyhow, bail, Error, Result};
use boitalettres::proto::res::body::Data as Body;
-use imap_codec::types::core::Atom;
+use futures::stream::{FuturesOrdered, StreamExt};
+use imap_codec::types::core::{Atom, IString, NString};
+use imap_codec::types::fetch_attributes::{FetchAttribute, MacroOrFetchAttributes};
use imap_codec::types::flag::Flag;
use imap_codec::types::response::{Code, Data, MessageAttribute, Status};
+use imap_codec::types::sequence::{self, SequenceSet};
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::UidIndex;
@@ -132,6 +135,121 @@ impl MailboxView {
Ok(data)
}
+ /// Looks up state changes in the mailbox and produces a set of IMAP
+ /// responses describing the new state.
+ pub async fn fetch(
+ &self,
+ sequence_set: &SequenceSet,
+ attributes: &MacroOrFetchAttributes,
+ uid: &bool,
+ ) -> Result<Vec<Body>> {
+ if *uid {
+ bail!("UID FETCH not implemented");
+ }
+
+ let mail_vec = self
+ .known_state
+ .idx_by_uid
+ .iter()
+ .map(|(uid, uuid)| (*uid, *uuid))
+ .collect::<Vec<_>>();
+
+ let mut mails = vec![];
+ let iter_strat = sequence::Strategy::Naive {
+ largest: NonZeroU32::try_from((self.known_state.idx_by_uid.len() + 1) as u32).unwrap(),
+ };
+ for i in sequence_set.iter(iter_strat) {
+ if let Some(mail) = mail_vec.get(i.get() as usize - 1) {
+ mails.push((i, *mail));
+ } else {
+ bail!("No such mail: {}", i);
+ }
+ }
+
+ let mails_uuid = mails
+ .iter()
+ .map(|(_i, (_uid, uuid))| *uuid)
+ .collect::<Vec<_>>();
+ let mails_meta = self.mailbox.fetch_meta(&mails_uuid).await?;
+
+ let fetch_attrs = match attributes {
+ MacroOrFetchAttributes::Macro(m) => m.expand(),
+ MacroOrFetchAttributes::FetchAttributes(a) => a.clone(),
+ };
+ let need_body = fetch_attrs.iter().any(|x| {
+ matches!(
+ x,
+ FetchAttribute::Body
+ | FetchAttribute::BodyExt { .. }
+ | FetchAttribute::Rfc822
+ | FetchAttribute::Rfc822Text
+ | FetchAttribute::BodyStructure
+ )
+ });
+
+ let mails = if need_body {
+ let mut iter = mails
+ .into_iter()
+ .zip(mails_meta.into_iter())
+ .map(|((i, (uid, uuid)), meta)| async move {
+ let body = self.mailbox.fetch_full(uuid, &meta.message_key).await?;
+ Ok::<_, anyhow::Error>((i, uid, uuid, meta, Some(body)))
+ })
+ .collect::<FuturesOrdered<_>>();
+ let mut mails = vec![];
+ while let Some(m) = iter.next().await {
+ mails.push(m?);
+ }
+ mails
+ } else {
+ mails
+ .into_iter()
+ .zip(mails_meta.into_iter())
+ .map(|((i, (uid, uuid)), meta)| (i, uid, uuid, meta, None))
+ .collect::<Vec<_>>()
+ };
+
+ let mut ret = vec![];
+ for (i, uid, uuid, meta, body) in mails {
+ let mut attributes = vec![MessageAttribute::Uid(uid)];
+
+ let (uid2, flags) = self
+ .known_state
+ .table
+ .get(&uuid)
+ .ok_or_else(|| anyhow!("Mail not in uidindex table: {}", uuid))?;
+
+ for attr in fetch_attrs.iter() {
+ match attr {
+ FetchAttribute::Uid => (),
+ FetchAttribute::Flags => {
+ attributes.push(MessageAttribute::Flags(
+ flags.iter().filter_map(|f| string_to_flag(f)).collect(),
+ ));
+ }
+ FetchAttribute::Rfc822Size => {
+ attributes.push(MessageAttribute::Rfc822Size(meta.rfc822_size as u32))
+ }
+ FetchAttribute::Rfc822Header => {
+ attributes.push(MessageAttribute::Rfc822Header(NString(Some(
+ IString::Literal(meta.headers.clone().try_into().unwrap()),
+ ))))
+ }
+
+ // TODO
+ _ => (),
+ }
+ }
+
+ ret.push(Body::Data(Data::Fetch {
+ seq_or_uid: i,
+ attributes,
+ }));
+ }
+
+ Ok(ret)
+ }
+
// ----
/// Produce an OK [UIDVALIDITY _] message corresponding to `known_state`
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index 6801ab7..3ebaa0b 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -1,14 +1,20 @@
-use anyhow::Result;
+use anyhow::{anyhow, bail, Result};
use k2v_client::K2vClient;
-use rusoto_s3::S3Client;
+use k2v_client::{BatchReadOp, Filter, K2vValue};
+use rusoto_s3::{
+ DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
+};
+use serde::{Deserialize, Serialize};
+use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
-use crate::cryptoblob::Key;
+use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
+use crate::time::now_msec;
pub struct Mailbox {
id: UniqueIdent,
@@ -48,8 +54,8 @@ impl Mailbox {
}
/// Insert an email in the mailbox
- pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> {
- unimplemented!()
+ pub async fn append<'a>(&self, msg: IMF<'a>) -> Result<()> {
+ self.mbox.write().await.append(msg, None).await
}
/// Copy an email from an other Mailbox to this mailbox
@@ -58,21 +64,15 @@ impl Mailbox {
unimplemented!()
}
- /// Delete all emails with the \Delete flag in the mailbox
- /// Can be called by CLOSE and EXPUNGE
- /// @FIXME do we want to implement this feature or a simpler "delete" command
- /// The controller could then "fetch \Delete" and call delete on each email?
- pub async fn expunge(&self) -> Result<()> {
- unimplemented!()
- }
-
- /// Update flags of a range of emails
- pub async fn store(&self) -> Result<()> {
- unimplemented!()
+ /// Fetch the metadata (headers + some more info) of the specified
+ /// mail IDs
+ pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ self.mbox.read().await.fetch_meta(ids).await
}
- pub async fn fetch(&self) -> Result<()> {
- unimplemented!()
+ /// Fetch an entire e-mail
+ pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ self.mbox.read().await.fetch_full(id, message_key).await
}
/// Test procedure TODO WILL REMOVE THIS
@@ -98,17 +98,142 @@ struct MailboxInternal {
}
impl MailboxInternal {
- pub async fn test(&mut self) -> Result<()> {
- self.uid_index.sync().await?;
+ async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
+ let ops = ids
+ .iter()
+ .map(|id| BatchReadOp {
+ partition_key: &self.mail_path,
+ filter: Filter {
+ start: Some(id),
+ end: None,
+ prefix: None,
+ limit: None,
+ reverse: false,
+ },
+ single_item: true,
+ conflicts_only: false,
+ tombstones: false,
+ })
+ .collect::<Vec<_>>();
+ let res_vec = self.k2v.read_batch(&ops).await?;
- dump(&self.uid_index);
+ let mut meta_vec = vec![];
+ for res in res_vec {
+ if res.items.len() != 1 {
+ bail!("Expected 1 item, got {}", res.items.len());
+ }
+ let (_, cv) = res.items.iter().next().unwrap();
+ if cv.value.len() != 1 {
+ bail!("Expected 1 value, got {}", cv.value.len());
+ }
+ match &cv.value[0] {
+ K2vValue::Tombstone => bail!("Expected value, got tombstone"),
+ K2vValue::Value(v) => {
+ let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
+ meta_vec.push(meta);
+ }
+ }
+ }
+
+ Ok(meta_vec)
+ }
+
+ async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ let mut gor = GetObjectRequest::default();
+ gor.bucket = self.bucket.clone();
+ gor.key = format!("{}/{}", self.mail_path, id);
+
+ let obj_res = self.s3.get_object(gor).await?;
+
+ let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
+ let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
+ obj_body.into_async_read().read_to_end(&mut buf).await?;
+
+ Ok(cryptoblob::open(&buf, &message_key)?)
+ }
+
+ async fn append(&mut self, mail: IMF<'_>, ident: Option<UniqueIdent>) -> Result<()> {
+ let ident = ident.unwrap_or_else(|| gen_ident());
+ let message_key = gen_key();
+
+ futures::try_join!(
+ async {
+ // Encrypt and save mail body
+ let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
+ let mut por = PutObjectRequest::default();
+ por.bucket = self.bucket.clone();
+ por.key = format!("{}/{}", self.mail_path, ident);
+ por.body = Some(message_blob.into());
+ self.s3.put_object(por).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Save mail meta
+ let meta = MailMeta {
+ internaldate: now_msec(),
+ headers: mail.raw[..mail.parsed.offset_body].to_vec(),
+ message_key: message_key.clone(),
+ rfc822_size: mail.raw.len(),
+ };
+ let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?;
+ self.k2v
+ .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ }
+ )?;
+ // Add mail to Bayou mail index
let add_mail_op = self
.uid_index
.state()
- .op_mail_add(gen_ident(), vec!["\\Unseen".into()]);
+ .op_mail_add(ident, vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
+ Ok(())
+ }
+
+ async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
+ let del_mail_op = self.uid_index.state().op_mail_del(ident);
+ self.uid_index.push(del_mail_op).await?;
+
+ futures::try_join!(
+ async {
+ // Delete mail body from S3
+ let mut dor = DeleteObjectRequest::default();
+ dor.bucket = self.bucket.clone();
+ dor.key = format!("{}/{}", self.mail_path, ident);
+ self.s3.delete_object(dor).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Delete mail meta from K2V
+ let sk = ident.to_string();
+ let v = self.k2v.read_item(&self.mail_path, &sk).await?;
+ self.k2v
+ .delete_item(&self.mail_path, &sk, v.causality)
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ }
+ )?;
+ Ok(())
+ }
+
+ // ----
+
+ async fn test(&mut self) -> Result<()> {
+ self.uid_index.sync().await?;
+
+ dump(&self.uid_index);
+
+ let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
+Subject: Welcome to Aerogramme!!
+
+This is just a test email, feel free to ignore."#;
+ let mail = IMF::try_from(&mail[..]).unwrap();
+ self.append(mail, None).await?;
+
dump(&self.uid_index);
if self.uid_index.state().idx_by_uid.len() > 6 {
@@ -121,8 +246,8 @@ impl MailboxInternal {
.skip(3 + i)
.next()
.unwrap();
- let del_mail_op = self.uid_index.state().op_mail_del(*ident);
- self.uid_index.push(del_mail_op).await?;
+
+ self.delete(*ident).await?;
dump(&self.uid_index);
}
@@ -148,3 +273,19 @@ fn dump(uid_index: &Bayou<UidIndex>) {
}
println!("");
}
+
+// ----
+
+/// The metadata of a message that is stored in K2V
+/// at pk = mail/<mailbox uuid>, sk = <message uuid>
+#[derive(Serialize, Deserialize)]
+pub struct MailMeta {
+ /// INTERNALDATE field (milliseconds since epoch)
+ pub internaldate: u64,
+ /// Headers of the message
+ pub headers: Vec<u8>,
+ /// Secret key for decrypting entire message
+ pub message_key: Key,
+ /// RFC822 size
+ pub rfc822_size: usize,
+}
diff --git a/src/mail/mod.rs b/src/mail/mod.rs
index bbb553a..8dc42c4 100644
--- a/src/mail/mod.rs
+++ b/src/mail/mod.rs
@@ -1,3 +1,5 @@
+use std::convert::TryFrom;
+
pub mod mailbox;
pub mod uidindex;
pub mod unique_ident;
@@ -9,3 +11,12 @@ pub struct IMF<'a> {
raw: &'a [u8],
parsed: mail_parser::Message<'a>,
}
+
+impl<'a> TryFrom<&'a [u8]> for IMF<'a> {
+ type Error = ();
+
+ fn try_from(body: &'a [u8]) -> Result<IMF<'a>, ()> {
+ let parsed = mail_parser::Message::parse(body).ok_or(())?;
+ Ok(Self { raw: body, parsed })
+ }
+}