From b95028f89e4db7c3158fab3b71ea56a742daba21 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 29 Jun 2022 15:39:54 +0200 Subject: Some refactoring on mailbox structures and views --- src/imap/command/authenticated.rs | 68 ++++------------- src/imap/command/selected.rs | 3 +- src/imap/flow.rs | 5 +- src/imap/mailbox_view.rs | 154 ++++++++++++++++++++++++++++++++++++++ src/imap/mod.rs | 1 + src/login/ldap_provider.rs | 1 - src/login/mod.rs | 25 ++++++- src/login/static_provider.rs | 1 - src/mail/mailbox.rs | 88 +++++++++------------- src/mail/mod.rs | 7 +- src/mail/user.rs | 44 ++++++++++- src/main.rs | 6 +- src/server.rs | 7 +- 13 files changed, 282 insertions(+), 128 deletions(-) create mode 100644 src/imap/mailbox_view.rs (limited to 'src') diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 47df5be..443edda 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -8,18 +8,11 @@ use imap_codec::types::response::{Code, Data, Status}; use crate::imap::command::anonymous; use crate::imap::flow; +use crate::imap::mailbox_view::MailboxView; use crate::mail::mailbox::Mailbox; use crate::mail::user::User; -const DEFAULT_FLAGS: [Flag; 5] = [ - Flag::Seen, - Flag::Answered, - Flag::Flagged, - Flag::Deleted, - Flag::Draft, -]; - pub struct AuthenticatedContext<'a> { pub req: &'a Request, pub user: &'a User, @@ -96,59 +89,24 @@ impl<'a> AuthenticatedContext<'a> { async fn select(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> { let name = String::try_from(mailbox.clone())?; - let mut mb = self.user.open_mailbox(name)?; + let mb_opt = self.user.open_mailbox(&name).await?; + let mb = match mb_opt { + Some(mb) => mb, + None => { + return Ok(( + Response::no("Mailbox does not exist")?, + flow::Transition::None, + )) + } + }; tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.selected"); - let sum = mb.summary().await?; - tracing::trace!(summary=%sum, "mailbox.summary"); - - let mut res = Vec::::new(); - - res.push(Body::Data(Data::Exists(sum.exists))); - - res.push(Body::Data(Data::Recent(sum.recent))); - - let mut flags: Vec = sum.flags.map(|f| match f.chars().next() { - Some('\\') => None, - Some('$') if f == "$unseen" => None, - Some(_) => match Atom::try_from(f.clone()) { - Err(_) => { - tracing::error!(username=%self.user.username, mailbox=%name, flag=%f, "Unable to encode flag as IMAP atom"); - None - }, - Ok(a) => Some(Flag::Keyword(a)), - }, - None => None, - }).flatten().collect(); - flags.extend_from_slice(&DEFAULT_FLAGS); - - res.push(Body::Data(Data::Flags(flags.clone()))); - - let uid_validity = Status::ok(None, Some(Code::UidValidity(sum.validity)), "UIDs valid") - .map_err(Error::msg)?; - res.push(Body::Status(uid_validity)); - - let next_uid = Status::ok(None, Some(Code::UidNext(sum.next)), "Predict next UID") - .map_err(Error::msg)?; - res.push(Body::Status(next_uid)); - - if let Some(unseen) = sum.unseen { - let status_unseen = - Status::ok(None, Some(Code::Unseen(unseen.clone())), "First unseen UID") - .map_err(Error::msg)?; - res.push(Body::Status(status_unseen)); - } - - flags.push(Flag::Permanent); - let permanent_flags = - Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted") - .map_err(Error::msg)?; - res.push(Body::Status(permanent_flags)); + let (mb, data) = MailboxView::new(mb).await?; Ok(( Response::ok("Select completed")? .with_extra_code(Code::ReadWrite) - .with_body(res), + .with_body(data), flow::Transition::Select(mb), )) } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index b1bba23..4e3ff2f 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -9,6 +9,7 @@ use imap_codec::types::sequence::SequenceSet; use crate::imap::command::authenticated; use crate::imap::flow; +use crate::imap::mailbox_view::MailboxView; use crate::mail::mailbox::Mailbox; use crate::mail::user::User; @@ -16,7 +17,7 @@ use crate::mail::user::User; pub struct SelectedContext<'a> { pub req: &'a Request, pub user: &'a User, - pub mailbox: &'a mut Mailbox, + pub mailbox: &'a mut MailboxView, } pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::Transition)> { diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 0fe6f92..c9d7e40 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,6 +1,7 @@ use std::error::Error as StdError; use std::fmt; +use crate::imap::mailbox_view::MailboxView; use crate::mail::mailbox::Mailbox; use crate::mail::user::User; @@ -18,14 +19,14 @@ impl StdError for Error {} pub enum State { NotAuthenticated, Authenticated(User), - Selected(User, Mailbox), + Selected(User, MailboxView), Logout, } pub enum Transition { None, Authenticate(User), - Select(Mailbox), + Select(MailboxView), Unselect, Logout, } diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs new file mode 100644 index 0000000..ec5580d --- /dev/null +++ b/src/imap/mailbox_view.rs @@ -0,0 +1,154 @@ +use std::sync::Arc; + +use anyhow::{Error, Result}; +use boitalettres::proto::{res::body::Data as Body, Request, Response}; +use imap_codec::types::command::CommandBody; +use imap_codec::types::core::Atom; +use imap_codec::types::flag::Flag; +use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; +use imap_codec::types::response::{Code, Data, Status}; + +use crate::mail::mailbox::{Mailbox, Summary}; +use crate::mail::uidindex::UidIndex; + +const DEFAULT_FLAGS: [Flag; 5] = [ + Flag::Seen, + Flag::Answered, + Flag::Flagged, + Flag::Deleted, + Flag::Draft, +]; + +/// A MailboxView is responsible for giving the client the information +/// it needs about a mailbox, such as an initial summary of the mailbox's +/// content and continuous updates indicating when the content +/// of the mailbox has been changed. +/// To do this, it keeps a variable `known_state` that corresponds to +/// what the client knows, and produces IMAP messages to be sent to the +/// client that go along updates to `known_state`. +pub struct MailboxView { + mailbox: Arc, + known_state: UidIndex, +} + +impl MailboxView { + /// Creates a new IMAP view into a mailbox. + /// Generates the necessary IMAP messages so that the client + /// has a satisfactory summary of the current mailbox's state. + pub async fn new(mailbox: Arc) -> Result<(Self, Vec)> { + let state = mailbox.current_uid_index().await; + + let new_view = Self { + mailbox, + known_state: state, + }; + + let mut data = Vec::::new(); + data.push(new_view.exists()?); + data.push(new_view.recent()?); + data.extend(new_view.flags()?.into_iter()); + data.push(new_view.uidvalidity()?); + data.push(new_view.uidnext()?); + if let Some(unseen) = new_view.unseen()? { + data.push(unseen); + } + + Ok((new_view, data)) + } + + // ---- + + /// Produce an OK [UIDVALIDITY _] message corresponding to `known_state` + fn uidvalidity(&self) -> Result { + let uid_validity = Status::ok( + None, + Some(Code::UidValidity(self.known_state.uidvalidity)), + "UIDs valid", + ) + .map_err(Error::msg)?; + Ok(Body::Status(uid_validity)) + } + + /// Produce an OK [UIDNEXT _] message corresponding to `known_state` + fn uidnext(&self) -> Result { + let next_uid = Status::ok( + None, + Some(Code::UidNext(self.known_state.uidnext)), + "Predict next UID", + ) + .map_err(Error::msg)?; + Ok(Body::Status(next_uid)) + } + + /// Produces an UNSEEN message (if relevant) corresponding to the + /// first unseen message id in `known_state` + fn unseen(&self) -> Result> { + let unseen = self + .known_state + .idx_by_flag + .get(&"$unseen".to_string()) + .and_then(|os| os.get_min()) + .cloned(); + if let Some(unseen) = unseen { + let status_unseen = + Status::ok(None, Some(Code::Unseen(unseen.clone())), "First unseen UID") + .map_err(Error::msg)?; + Ok(Some(Body::Status(status_unseen))) + } else { + Ok(None) + } + } + + /// Produce an EXISTS message corresponding to the number of mails + /// in `known_state` + fn exists(&self) -> Result { + let exists = u32::try_from(self.known_state.idx_by_uid.len())?; + Ok(Body::Data(Data::Exists(exists))) + } + + /// Produce a RECENT message corresponding to the number of + /// recent mails in `known_state` + fn recent(&self) -> Result { + let recent = self + .known_state + .idx_by_flag + .get(&"\\Recent".to_string()) + .map(|os| os.len()) + .unwrap_or(0); + let recent = u32::try_from(recent)?; + Ok(Body::Data(Data::Recent(recent))) + } + + /// Produce a FLAGS and a PERMANENTFLAGS message that indicates + /// the flags that are in `known_state` + default flags + fn flags(&self) -> Result> { + let mut flags: Vec = self + .known_state + .idx_by_flag + .flags() + .map(|f| match f.chars().next() { + Some('\\') => None, + Some('$') if f == "$unseen" => None, + Some(_) => match Atom::try_from(f.clone()) { + Err(_) => { + tracing::error!(flag=%f, "Unable to encode flag as IMAP atom"); + None + } + Ok(a) => Some(Flag::Keyword(a)), + }, + None => None, + }) + .flatten() + .collect(); + flags.extend_from_slice(&DEFAULT_FLAGS); + let mut ret = vec![Body::Data(Data::Flags(flags.clone()))]; + + flags.push(Flag::Permanent); + let permanent_flags = + Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted") + .map_err(Error::msg)?; + ret.push(Body::Status(permanent_flags)); + + Ok(ret) + } +} diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 0e9f49a..f85bcc6 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -1,5 +1,6 @@ mod command; mod flow; +mod mailbox_view; mod session; use std::task::{Context, Poll}; diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index 9310e55..2eeb6d9 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -2,7 +2,6 @@ use anyhow::Result; use async_trait::async_trait; use ldap3::{LdapConnAsync, Scope, SearchEntry}; use log::debug; -use rusoto_signature::Region; use crate::config::*; use crate::login::*; diff --git a/src/login/mod.rs b/src/login/mod.rs index 1d5d634..0605d4e 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -13,7 +13,6 @@ use rand::prelude::*; use rusoto_core::HttpClient; use rusoto_credential::{AwsCredentials, StaticProvider}; use rusoto_s3::S3Client; -use rusoto_signature::Region; use crate::cryptoblob::*; @@ -52,7 +51,7 @@ pub struct PublicCredentials { } /// The struct StorageCredentials contains access key to an S3 and K2V bucket -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct StorageCredentials { pub s3_region: Region, pub k2v_region: Region, @@ -87,6 +86,24 @@ pub struct CryptoKeys { pub public: PublicKey, } +/// A custom S3 region, composed of a region name and endpoint. +/// We use this instead of rusoto_signature::Region so that we can +/// derive Hash and Eq +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct Region { + pub name: String, + pub endpoint: String, +} + +impl Region { + pub fn as_rusoto_region(&self) -> rusoto_signature::Region { + rusoto_signature::Region::Custom { + name: self.name.clone(), + endpoint: self.endpoint.clone(), + } + } +} + // ---- impl Credentials { @@ -111,7 +128,7 @@ impl StorageCredentials { ); Ok(K2vClient::new( - self.k2v_region.clone(), + self.k2v_region.as_rusoto_region(), self.bucket.clone(), aws_creds, None, @@ -127,7 +144,7 @@ impl StorageCredentials { Ok(S3Client::new_with( HttpClient::new()?, aws_creds_provider, - self.s3_region.clone(), + self.s3_region.as_rusoto_region(), )) } } diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index 6bbc717..5ea765f 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; -use rusoto_signature::Region; use crate::config::*; use crate::cryptoblob::{Key, SecretKey}; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index a2d28fb..9e8f0db 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -3,6 +3,7 @@ use std::convert::TryFrom; use anyhow::Result; use k2v_client::K2vClient; use rusoto_s3::S3Client; +use tokio::sync::RwLock; use crate::bayou::Bayou; use crate::cryptoblob::Key; @@ -11,16 +12,16 @@ use crate::mail::mail_ident::*; use crate::mail::uidindex::*; use crate::mail::IMF; -pub struct Summary<'a> { +pub struct Summary { pub validity: ImapUidvalidity, pub next: ImapUid, pub exists: u32, pub recent: u32, - pub flags: FlagIter<'a>, - pub unseen: Option<&'a ImapUid>, + pub flags: Vec, + pub unseen: Option, } -impl std::fmt::Display for Summary<'_> { +impl std::fmt::Display for Summary { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, @@ -30,70 +31,40 @@ impl std::fmt::Display for Summary<'_> { } } -// Non standard but common flags: -// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml -pub struct Mailbox { - bucket: String, - pub name: String, - key: Key, - - k2v: K2vClient, - s3: S3Client, - - uid_index: Bayou, - mail_path: String, -} +pub struct Mailbox(RwLock); impl Mailbox { - pub(super) fn new(creds: &Credentials, name: &str) -> Result { + pub(super) async fn open(creds: &Credentials, name: &str) -> Result { let index_path = format!("index/{}", name); let mail_path = format!("mail/{}", name); - let uid_index = Bayou::::new(creds, index_path)?; - Ok(Self { + let mut uid_index = Bayou::::new(creds, index_path)?; + uid_index.sync().await?; + + Ok(Self(RwLock::new(MailboxInternal { bucket: creds.bucket().to_string(), - name: name.to_string(), // TODO: don't use name field if possible, use mail_path instead key: creds.keys.master.clone(), k2v: creds.k2v_client()?, s3: creds.s3_client()?, uid_index, mail_path, - }) + }))) } - /// Get a summary of the mailbox, useful for the SELECT command for example - pub async fn summary(&mut self) -> Result { - self.uid_index.sync().await?; - let state = self.uid_index.state(); - - let unseen = state - .idx_by_flag - .get(&"$unseen".to_string()) - .and_then(|os| os.get_min()); - let recent = state - .idx_by_flag - .get(&"\\Recent".to_string()) - .map(|os| os.len()) - .unwrap_or(0); - - return Ok(Summary { - validity: state.uidvalidity, - next: state.uidnext, - exists: u32::try_from(state.idx_by_uid.len())?, - recent: u32::try_from(recent)?, - flags: state.idx_by_flag.flags(), - unseen, - }); + /// Get a clone of the current UID Index of this mailbox + /// (cloning is cheap so don't hesitate to use this) + pub async fn current_uid_index(&self) -> UidIndex { + self.0.read().await.uid_index.state().clone() } /// Insert an email in the mailbox - pub async fn append(&mut self, _msg: IMF) -> Result<()> { + pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> { unimplemented!() } /// Copy an email from an other Mailbox to this mailbox /// (use this when possible, as it allows for a certain number of storage optimizations) - pub async fn copy(&mut self, _from: &Mailbox, _uid: ImapUid) -> Result<()> { + pub async fn copy(&self, _from: &Mailbox, _uid: ImapUid) -> Result<()> { unimplemented!() } @@ -101,21 +72,36 @@ impl Mailbox { /// Can be called by CLOSE and EXPUNGE /// @FIXME do we want to implement this feature or a simpler "delete" command /// The controller could then "fetch \Delete" and call delete on each email? - pub async fn expunge(&mut self) -> Result<()> { + pub async fn expunge(&self) -> Result<()> { unimplemented!() } /// Update flags of a range of emails - pub async fn store(&mut self) -> Result<()> { + pub async fn store(&self) -> Result<()> { unimplemented!() } - pub async fn fetch(&mut self) -> Result<()> { + pub async fn fetch(&self) -> Result<()> { unimplemented!() } +} + +// ---- + +// Non standard but common flags: +// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml +struct MailboxInternal { + bucket: String, + key: Key, - // ---- + k2v: K2vClient, + s3: S3Client, + + uid_index: Bayou, + mail_path: String, +} +impl MailboxInternal { pub async fn test(&mut self) -> Result<()> { self.uid_index.sync().await?; diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 4339038..70182a9 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,6 +1,6 @@ pub mod mail_ident; pub mod mailbox; -mod uidindex; +pub mod uidindex; pub mod user; use std::convert::TryFrom; @@ -17,4 +17,7 @@ use crate::mail::uidindex::*; // Internet Message Format // aka RFC 822 - RFC 2822 - RFC 5322 -pub struct IMF(Vec); +pub struct IMF<'a> { + raw: &'a [u8], + parsed: mail_parser::Message<'a>, +} diff --git a/src/mail/user.rs b/src/mail/user.rs index 4864509..e2b33e2 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -1,9 +1,13 @@ +use std::collections::HashMap; +use std::sync::{Arc, Weak}; + use anyhow::Result; +use lazy_static::lazy_static; use k2v_client::K2vClient; use rusoto_s3::S3Client; -use crate::login::Credentials; +use crate::login::{Credentials, StorageCredentials}; use crate::mail::mailbox::Mailbox; pub struct User { @@ -31,8 +35,24 @@ impl User { } /// Opens an existing mailbox given its IMAP name. - pub fn open_mailbox(&self, name: &str) -> Result> { - Mailbox::new(&self.creds, name).map(Some) + pub async fn open_mailbox(&self, name: &str) -> Result>> { + { + let cache = MAILBOX_CACHE.cache.lock().unwrap(); + if let Some(mb) = cache.get(&self.creds.storage).and_then(Weak::upgrade) { + return Ok(Some(mb)); + } + } + + let mb = Arc::new(Mailbox::open(&self.creds, name).await?); + + let mut cache = MAILBOX_CACHE.cache.lock().unwrap(); + if let Some(concurrent_mb) = cache.get(&self.creds.storage).and_then(Weak::upgrade) { + drop(mb); // we worked for nothing but at least we didn't starve someone else + Ok(Some(concurrent_mb)) + } else { + cache.insert(self.creds.storage.clone(), Arc::downgrade(&mb)); + Ok(Some(mb)) + } } /// Creates a new mailbox in the user's IMAP namespace. @@ -50,3 +70,21 @@ impl User { unimplemented!() } } + +// ---- Mailbox cache ---- + +struct MailboxCache { + cache: std::sync::Mutex>>, +} + +impl MailboxCache { + fn new() -> Self { + Self { + cache: std::sync::Mutex::new(HashMap::new()), + } + } +} + +lazy_static! { + static ref MAILBOX_CACHE: MailboxCache = MailboxCache::new(); +} diff --git a/src/main.rs b/src/main.rs index 401ed53..5d139b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,6 @@ use anyhow::{bail, Result}; use clap::{Parser, Subcommand}; use rand::prelude::*; -use rusoto_signature::Region; - use config::*; use cryptoblob::*; use login::{static_provider::*, *}; @@ -264,11 +262,11 @@ async fn main() -> Result<()> { } fn make_storage_creds(c: StorageCredsArgs) -> StorageCredentials { - let s3_region = Region::Custom { + let s3_region = Region { name: c.region.clone(), endpoint: c.s3_endpoint, }; - let k2v_region = Region::Custom { + let k2v_region = Region { name: c.region, endpoint: c.k2v_endpoint, }; diff --git a/src/server.rs b/src/server.rs index 2cc481c..55fa5ba 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use anyhow::{bail, Result}; use futures::{try_join, StreamExt}; use log::*; -use rusoto_signature::Region; use tokio::sync::watch; use crate::config::*; use crate::imap; use crate::lmtp::*; use crate::login::ArcLoginProvider; -use crate::login::{ldap_provider::*, static_provider::*}; +use crate::login::{ldap_provider::*, static_provider::*, Region}; pub struct Server { lmtp_server: Option>, @@ -62,11 +61,11 @@ impl Server { } fn build(config: Config) -> Result<(ArcLoginProvider, Option, Option)> { - let s3_region = Region::Custom { + let s3_region = Region { name: config.aws_region.clone(), endpoint: config.s3_endpoint, }; - let k2v_region = Region::Custom { + let k2v_region = Region { name: config.aws_region, endpoint: config.k2v_endpoint, }; -- cgit v1.2.3