From 509e7e4bed3b011b5e0b276db3900edb9fee7255 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 29 Jun 2022 19:24:21 +0200 Subject: Implement beginning of a FETCH command --- src/imap/command/selected.rs | 15 ++-- src/imap/mailbox_view.rs | 122 +++++++++++++++++++++++++++- src/mail/mailbox.rs | 189 +++++++++++++++++++++++++++++++++++++------ src/mail/mod.rs | 11 +++ 4 files changed, 306 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 3a44a3f..e10eab6 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -42,17 +42,22 @@ pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::T impl<'a> SelectedContext<'a> { pub async fn fetch( self, - _sequence_set: &SequenceSet, - _attributes: &MacroOrFetchAttributes, - _uid: &bool, + sequence_set: &SequenceSet, + attributes: &MacroOrFetchAttributes, + uid: &bool, ) -> Result<(Response, flow::Transition)> { - Ok((Response::bad("Not implemented")?, flow::Transition::None)) + let resp = self.mailbox.fetch(sequence_set, attributes, uid).await?; + + Ok(( + Response::ok("FETCH completed")?.with_body(resp), + flow::Transition::None, + )) } pub async fn noop(self) -> Result<(Response, flow::Transition)> { let updates = self.mailbox.update().await?; Ok(( - Response::ok("Noop completed.")?.with_body(updates), + Response::ok("NOOP completed.")?.with_body(updates), flow::Transition::None, )) } diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 6066528..df9c3fe 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -1,11 +1,14 @@ use std::num::NonZeroU32; use std::sync::Arc; -use anyhow::{Error, Result}; +use anyhow::{anyhow, bail, Error, Result}; use boitalettres::proto::res::body::Data as Body; -use imap_codec::types::core::Atom; +use futures::stream::{FuturesOrdered, StreamExt}; +use imap_codec::types::core::{Atom, IString, NString}; +use imap_codec::types::fetch_attributes::{FetchAttribute, MacroOrFetchAttributes}; use imap_codec::types::flag::Flag; use imap_codec::types::response::{Code, Data, MessageAttribute, Status}; +use imap_codec::types::sequence::{self, SequenceSet}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::UidIndex; @@ -132,6 +135,121 @@ impl MailboxView { Ok(data) } + /// Looks up state changes in the mailbox and produces a set of IMAP + /// responses describing the new state. + pub async fn fetch( + &self, + sequence_set: &SequenceSet, + attributes: &MacroOrFetchAttributes, + uid: &bool, + ) -> Result> { + if *uid { + bail!("UID FETCH not implemented"); + } + + let mail_vec = self + .known_state + .idx_by_uid + .iter() + .map(|(uid, uuid)| (*uid, *uuid)) + .collect::>(); + + let mut mails = vec![]; + let iter_strat = sequence::Strategy::Naive { + largest: NonZeroU32::try_from((self.known_state.idx_by_uid.len() + 1) as u32).unwrap(), + }; + for i in sequence_set.iter(iter_strat) { + if let Some(mail) = mail_vec.get(i.get() as usize - 1) { + mails.push((i, *mail)); + } else { + bail!("No such mail: {}", i); + } + } + + let mails_uuid = mails + .iter() + .map(|(_i, (_uid, uuid))| *uuid) + .collect::>(); + let mails_meta = self.mailbox.fetch_meta(&mails_uuid).await?; + + let fetch_attrs = match attributes { + MacroOrFetchAttributes::Macro(m) => m.expand(), + MacroOrFetchAttributes::FetchAttributes(a) => a.clone(), + }; + let need_body = fetch_attrs.iter().any(|x| { + matches!( + x, + FetchAttribute::Body + | FetchAttribute::BodyExt { .. } + | FetchAttribute::Rfc822 + | FetchAttribute::Rfc822Text + | FetchAttribute::BodyStructure + ) + }); + + let mails = if need_body { + let mut iter = mails + .into_iter() + .zip(mails_meta.into_iter()) + .map(|((i, (uid, uuid)), meta)| async move { + let body = self.mailbox.fetch_full(uuid, &meta.message_key).await?; + Ok::<_, anyhow::Error>((i, uid, uuid, meta, Some(body))) + }) + .collect::>(); + let mut mails = vec![]; + while let Some(m) = iter.next().await { + mails.push(m?); + } + mails + } else { + mails + .into_iter() + .zip(mails_meta.into_iter()) + .map(|((i, (uid, uuid)), meta)| (i, uid, uuid, meta, None)) + .collect::>() + }; + + let mut ret = vec![]; + for (i, uid, uuid, meta, body) in mails { + let mut attributes = vec![MessageAttribute::Uid(uid)]; + + let (uid2, flags) = self + .known_state + .table + .get(&uuid) + .ok_or_else(|| anyhow!("Mail not in uidindex table: {}", uuid))?; + + for attr in fetch_attrs.iter() { + match attr { + FetchAttribute::Uid => (), + FetchAttribute::Flags => { + attributes.push(MessageAttribute::Flags( + flags.iter().filter_map(|f| string_to_flag(f)).collect(), + )); + } + FetchAttribute::Rfc822Size => { + attributes.push(MessageAttribute::Rfc822Size(meta.rfc822_size as u32)) + } + FetchAttribute::Rfc822Header => { + attributes.push(MessageAttribute::Rfc822Header(NString(Some( + IString::Literal(meta.headers.clone().try_into().unwrap()), + )))) + } + + // TODO + _ => (), + } + } + + ret.push(Body::Data(Data::Fetch { + seq_or_uid: i, + attributes, + })); + } + + Ok(ret) + } + // ---- /// Produce an OK [UIDVALIDITY _] message corresponding to `known_state` diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 6801ab7..3ebaa0b 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,14 +1,20 @@ -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; use k2v_client::K2vClient; -use rusoto_s3::S3Client; +use k2v_client::{BatchReadOp, Filter, K2vValue}; +use rusoto_s3::{ + DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, +}; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; -use crate::cryptoblob::Key; +use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; +use crate::time::now_msec; pub struct Mailbox { id: UniqueIdent, @@ -48,8 +54,8 @@ impl Mailbox { } /// Insert an email in the mailbox - pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> { - unimplemented!() + pub async fn append<'a>(&self, msg: IMF<'a>) -> Result<()> { + self.mbox.write().await.append(msg, None).await } /// Copy an email from an other Mailbox to this mailbox @@ -58,21 +64,15 @@ impl Mailbox { unimplemented!() } - /// Delete all emails with the \Delete flag in the mailbox - /// Can be called by CLOSE and EXPUNGE - /// @FIXME do we want to implement this feature or a simpler "delete" command - /// The controller could then "fetch \Delete" and call delete on each email? - pub async fn expunge(&self) -> Result<()> { - unimplemented!() - } - - /// Update flags of a range of emails - pub async fn store(&self) -> Result<()> { - unimplemented!() + /// Fetch the metadata (headers + some more info) of the specified + /// mail IDs + pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { + self.mbox.read().await.fetch_meta(ids).await } - pub async fn fetch(&self) -> Result<()> { - unimplemented!() + /// Fetch an entire e-mail + pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { + self.mbox.read().await.fetch_full(id, message_key).await } /// Test procedure TODO WILL REMOVE THIS @@ -98,17 +98,142 @@ struct MailboxInternal { } impl MailboxInternal { - pub async fn test(&mut self) -> Result<()> { - self.uid_index.sync().await?; + async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { + let ids = ids.iter().map(|x| x.to_string()).collect::>(); + let ops = ids + .iter() + .map(|id| BatchReadOp { + partition_key: &self.mail_path, + filter: Filter { + start: Some(id), + end: None, + prefix: None, + limit: None, + reverse: false, + }, + single_item: true, + conflicts_only: false, + tombstones: false, + }) + .collect::>(); + let res_vec = self.k2v.read_batch(&ops).await?; - dump(&self.uid_index); + let mut meta_vec = vec![]; + for res in res_vec { + if res.items.len() != 1 { + bail!("Expected 1 item, got {}", res.items.len()); + } + let (_, cv) = res.items.iter().next().unwrap(); + if cv.value.len() != 1 { + bail!("Expected 1 value, got {}", cv.value.len()); + } + match &cv.value[0] { + K2vValue::Tombstone => bail!("Expected value, got tombstone"), + K2vValue::Value(v) => { + let meta = open_deserialize::(v, &self.encryption_key)?; + meta_vec.push(meta); + } + } + } + + Ok(meta_vec) + } + + async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { + let mut gor = GetObjectRequest::default(); + gor.bucket = self.bucket.clone(); + gor.key = format!("{}/{}", self.mail_path, id); + + let obj_res = self.s3.get_object(gor).await?; + + let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; + let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); + obj_body.into_async_read().read_to_end(&mut buf).await?; + + Ok(cryptoblob::open(&buf, &message_key)?) + } + + async fn append(&mut self, mail: IMF<'_>, ident: Option) -> Result<()> { + let ident = ident.unwrap_or_else(|| gen_ident()); + let message_key = gen_key(); + + futures::try_join!( + async { + // Encrypt and save mail body + let message_blob = cryptoblob::seal(mail.raw, &message_key)?; + let mut por = PutObjectRequest::default(); + por.bucket = self.bucket.clone(); + por.key = format!("{}/{}", self.mail_path, ident); + por.body = Some(message_blob.into()); + self.s3.put_object(por).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.raw[..mail.parsed.offset_body].to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?; + self.k2v + .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + .await?; + Ok::<_, anyhow::Error>(()) + } + )?; + // Add mail to Bayou mail index let add_mail_op = self .uid_index .state() - .op_mail_add(gen_ident(), vec!["\\Unseen".into()]); + .op_mail_add(ident, vec!["\\Unseen".into()]); self.uid_index.push(add_mail_op).await?; + Ok(()) + } + + async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { + let del_mail_op = self.uid_index.state().op_mail_del(ident); + self.uid_index.push(del_mail_op).await?; + + futures::try_join!( + async { + // Delete mail body from S3 + let mut dor = DeleteObjectRequest::default(); + dor.bucket = self.bucket.clone(); + dor.key = format!("{}/{}", self.mail_path, ident); + self.s3.delete_object(dor).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Delete mail meta from K2V + let sk = ident.to_string(); + let v = self.k2v.read_item(&self.mail_path, &sk).await?; + self.k2v + .delete_item(&self.mail_path, &sk, v.causality) + .await?; + Ok::<_, anyhow::Error>(()) + } + )?; + Ok(()) + } + + // ---- + + async fn test(&mut self) -> Result<()> { + self.uid_index.sync().await?; + + dump(&self.uid_index); + + let mail = br#"From: Garage team +Subject: Welcome to Aerogramme!! + +This is just a test email, feel free to ignore."#; + let mail = IMF::try_from(&mail[..]).unwrap(); + self.append(mail, None).await?; + dump(&self.uid_index); if self.uid_index.state().idx_by_uid.len() > 6 { @@ -121,8 +246,8 @@ impl MailboxInternal { .skip(3 + i) .next() .unwrap(); - let del_mail_op = self.uid_index.state().op_mail_del(*ident); - self.uid_index.push(del_mail_op).await?; + + self.delete(*ident).await?; dump(&self.uid_index); } @@ -148,3 +273,19 @@ fn dump(uid_index: &Bayou) { } println!(""); } + +// ---- + +/// The metadata of a message that is stored in K2V +/// at pk = mail/, sk = +#[derive(Serialize, Deserialize)] +pub struct MailMeta { + /// INTERNALDATE field (milliseconds since epoch) + pub internaldate: u64, + /// Headers of the message + pub headers: Vec, + /// Secret key for decrypting entire message + pub message_key: Key, + /// RFC822 size + pub rfc822_size: usize, +} diff --git a/src/mail/mod.rs b/src/mail/mod.rs index bbb553a..8dc42c4 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,3 +1,5 @@ +use std::convert::TryFrom; + pub mod mailbox; pub mod uidindex; pub mod unique_ident; @@ -9,3 +11,12 @@ pub struct IMF<'a> { raw: &'a [u8], parsed: mail_parser::Message<'a>, } + +impl<'a> TryFrom<&'a [u8]> for IMF<'a> { + type Error = (); + + fn try_from(body: &'a [u8]) -> Result, ()> { + let parsed = mail_parser::Message::parse(body).ok_or(())?; + Ok(Self { raw: body, parsed }) + } +} -- cgit v1.2.3