diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/imap/capability.rs | 4 | ||||
-rw-r--r-- | src/imap/command/authenticated.rs | 22 | ||||
-rw-r--r-- | src/imap/command/selected.rs | 17 | ||||
-rw-r--r-- | src/imap/flow.rs | 11 | ||||
-rw-r--r-- | src/imap/mailbox_view.rs | 131 | ||||
-rw-r--r-- | src/imap/mime_view.rs | 12 | ||||
-rw-r--r-- | src/imap/mod.rs | 189 | ||||
-rw-r--r-- | src/imap/request.rs | 4 | ||||
-rw-r--r-- | src/imap/response.rs | 3 | ||||
-rw-r--r-- | src/imap/search.rs | 25 | ||||
-rw-r--r-- | src/imap/session.rs | 52 | ||||
-rw-r--r-- | src/login/ldap_provider.rs | 7 | ||||
-rw-r--r-- | src/login/static_provider.rs | 6 | ||||
-rw-r--r-- | src/mail/mailbox.rs | 2 | ||||
-rw-r--r-- | src/mail/query.rs | 91 | ||||
-rw-r--r-- | src/storage/garage.rs | 74 |
16 files changed, 374 insertions, 276 deletions
diff --git a/src/imap/capability.rs b/src/imap/capability.rs index 256d820..c76b51c 100644 --- a/src/imap/capability.rs +++ b/src/imap/capability.rs @@ -1,5 +1,5 @@ use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier}; -use imap_codec::imap_types::core::NonEmptyVec; +use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::response::Capability; use std::collections::HashSet; @@ -49,7 +49,7 @@ impl Default for ServerCapability { } impl ServerCapability { - pub fn to_vec(&self) -> NonEmptyVec<Capability<'static>> { + pub fn to_vec(&self) -> Vec1<Capability<'static>> { self.0 .iter() .map(|v| v.clone()) diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 26b1946..eb8833d 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result}; use imap_codec::imap_types::command::{ Command, CommandBody, ListReturnItem, SelectExamineModifier, }; -use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar}; +use imap_codec::imap_types::core::{Atom, Literal, QuotedChar, Vec1}; use imap_codec::imap_types::datetime::DateTime; use imap_codec::imap_types::extensions::enable::CapabilityEnable; use imap_codec::imap_types::flag::{Flag, FlagNameAttribute}; @@ -17,10 +17,10 @@ use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName}; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, MailboxName}; use crate::imap::flow; -use crate::imap::mailbox_view::MailboxView; +use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::response::Response; +use crate::imap::Body; -use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::*; use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW}; use crate::mail::IMF; @@ -549,6 +549,8 @@ impl<'a> AuthenticatedContext<'a> { )) } + //@FIXME we should write a specific version for the "selected" state + //that returns some unsollicited responses async fn append( self, mailbox: &MailboxCodec<'a>, @@ -558,7 +560,7 @@ impl<'a> AuthenticatedContext<'a> { ) -> Result<(Response<'static>, flow::Transition)> { let append_tag = self.req.tag.clone(); match self.append_internal(mailbox, flags, date, message).await { - Ok((_mb, uidvalidity, uid, _modseq)) => Ok(( + Ok((_mb_view, uidvalidity, uid, _modseq)) => Ok(( Response::build() .tag(append_tag) .message("APPEND completed") @@ -580,7 +582,7 @@ impl<'a> AuthenticatedContext<'a> { fn enable( self, - cap_enable: &NonEmptyVec<CapabilityEnable<'static>>, + cap_enable: &Vec1<CapabilityEnable<'static>>, ) -> Result<(Response<'static>, flow::Transition)> { let mut response_builder = Response::build().to_req(self.req); let capabilities = self.client_capabilities.try_enable(cap_enable.as_ref()); @@ -593,13 +595,14 @@ impl<'a> AuthenticatedContext<'a> { )) } + //@FIXME should be refactored and integrated to the mailbox view pub(crate) async fn append_internal( self, mailbox: &MailboxCodec<'a>, flags: &[Flag<'a>], date: &Option<DateTime>, message: &Literal<'a>, - ) -> Result<(Arc<Mailbox>, ImapUidvalidity, ImapUid, ModSeq)> { + ) -> Result<(MailboxView, ImapUidvalidity, ImapUid, ModSeq)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -607,6 +610,7 @@ impl<'a> AuthenticatedContext<'a> { Some(mb) => mb, None => bail!("Mailbox does not exist"), }; + let mut view = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await; if date.is_some() { tracing::warn!("Cannot set date when appending message"); @@ -617,9 +621,11 @@ impl<'a> AuthenticatedContext<'a> { let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); // TODO: filter allowed flags? ping @Quentin - let (uidvalidity, uid, modseq) = mb.append(msg, None, &flags[..]).await?; + let (uidvalidity, uid, modseq) = + view.internal.mailbox.append(msg, None, &flags[..]).await?; + //let unsollicited = view.update(UpdateParameters::default()).await?; - Ok((mb, uidvalidity, uid, modseq)) + Ok((view, uidvalidity, uid, modseq)) } } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 73f8aec..d000905 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::core::{Charset, Vec1}; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; @@ -54,11 +54,15 @@ pub async fn dispatch<'a>( ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid) .await } + //@FIXME SearchKey::And is a legacy hack, should be refactored CommandBody::Search { charset, criteria, uid, - } => ctx.search(charset, criteria, uid).await, + } => { + ctx.search(charset, &SearchKey::And(criteria.clone()), uid) + .await + } CommandBody::Expunge { // UIDPLUS (rfc4315) uid_sequence_set, @@ -88,15 +92,6 @@ pub async fn dispatch<'a>( // UNSELECT extension (rfc3691) CommandBody::Unselect => ctx.unselect().await, - // IDLE extension (rfc2177) - CommandBody::Idle => Ok(( - Response::build() - .to_req(ctx.req) - .message("DUMMY command due to anti-pattern in the code") - .ok()?, - flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), - )), - // In selected mode, we fallback to authenticated when needed _ => { authenticated::dispatch(authenticated::AuthenticatedContext { diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 6ddd092..e372d69 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,11 +1,12 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; + +use imap_codec::imap_types::core::Tag; use tokio::sync::Notify; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; -use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Error { @@ -31,6 +32,14 @@ pub enum State { ), Logout, } +impl State { + pub fn notify(&self) -> Option<Arc<Notify>> { + match self { + Self::Idle(_, _, _, _, anotif) => Some(anotif.clone()), + _ => None, + } + } +} impl fmt::Display for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use State::*; diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index d57e9a3..1c53b93 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -4,9 +4,9 @@ use std::sync::Arc; use anyhow::{anyhow, Error, Result}; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{StreamExt, TryStreamExt}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::core::{Charset, Vec1}; use imap_codec::imap_types::fetch::MessageDataItem; use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType}; use imap_codec::imap_types::response::{Code, CodeOther, Data, Status}; @@ -362,46 +362,36 @@ impl MailboxView { .iter() .map(|midx| midx.uuid) .collect::<Vec<_>>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [3/6] Derive an IMAP-specific view from the results, apply the filters - let views = query_result - .iter() - .zip(mail_idx_list.into_iter()) - .map(|(qr, midx)| MailView::new(qr, midx)) - .collect::<Result<Vec<_>, _>>()?; - - // [4/6] Apply the IMAP transformation, bubble up any error - // We get 2 results: - // - The one we send to the client - // - The \Seen flags we must set internally - let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views - .iter() - .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body))) - .collect::<Result<Vec<_>, _>>()? - .into_iter() - .unzip(); + let query = self.internal.query(&uuids, query_scope); + //let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [5/6] Register the \Seen flags - flag_mgmt - .iter() - .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) - .map(|(mv, _seen)| async move { - let seen_flag = Flag::Seen.to_string(); - self.internal - .mailbox - .add_flags(*mv.query_result.uuid(), &[seen_flag]) - .await?; - Ok::<_, anyhow::Error>(()) + let query_stream = query + .fetch() + .zip(futures::stream::iter(mail_idx_list)) + // [3/6] Derive an IMAP-specific view from the results, apply the filters + .map(|(maybe_qr, midx)| match maybe_qr { + Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)), + Err(e) => Err(e), }) - .collect::<FuturesOrdered<_>>() - .collect::<Vec<_>>() - .await - .into_iter() - .collect::<Result<_, _>>()?; + // [4/6] Apply the IMAP transformation + .then(|maybe_ret| async move { + let ((body, seen), midx) = maybe_ret?; + + // [5/6] Register the \Seen flags + if matches!(seen, SeenFlag::MustAdd) { + let seen_flag = Flag::Seen.to_string(); + self.internal + .mailbox + .add_flags(midx.uuid, &[seen_flag]) + .await?; + } + + Ok::<_, anyhow::Error>(body) + }); // [6/6] Build the final result that will be sent to the client. - Ok(imap_ret) + query_stream.try_collect().await } /// A naive search implementation... @@ -423,39 +413,54 @@ impl MailboxView { // 3. Filter the selection based on the ID / UID / Flags let (kept_idx, to_fetch) = crit.filter_on_idx(&selection); - // 4. Fetch additional info about the emails + // 4.a Fetch additional info about the emails let query_scope = crit.query_scope(); let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; + let query = self.internal.query(&uuids, query_scope); + + // 4.b We don't want to keep all data in memory, so we do the computing in a stream + let query_stream = query + .fetch() + .zip(futures::stream::iter(&to_fetch)) + // 5.a Build a mailview with the body, might fail with an error + // 5.b If needed, filter the selection based on the body, but keep the errors + // 6. Drop the query+mailbox, keep only the mail index + // Here we release a lot of memory, this is the most important part ^^ + .filter_map(|(maybe_qr, midx)| { + let r = match maybe_qr { + Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) { + Ok(true) => Some(Ok(*midx)), + Ok(_) => None, + Err(e) => Some(Err(e)), + }, + Err(e) => Some(Err(e)), + }; + futures::future::ready(r) + }); - // 5. If needed, filter the selection based on the body - let kept_query = crit.filter_on_query(&to_fetch, &query_result)?; + // 7. Chain both streams (part resolved from index, part resolved from metadata+body) + let main_stream = futures::stream::iter(kept_idx) + .map(Ok) + .chain(query_stream) + .map_ok(|idx| match uid { + true => (idx.uid, idx.modseq), + _ => (idx.i, idx.modseq), + }); - // 6. Format the result according to the client's taste: - // either return UID or ID. - let final_selection = kept_idx.iter().chain(kept_query.iter()); - let selection_fmt = match uid { - true => final_selection.map(|in_idx| in_idx.uid).collect(), - _ => final_selection.map(|in_idx| in_idx.i).collect(), - }; + // 8. Do the actual computation + let internal_result: Vec<_> = main_stream.try_collect().await?; + let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip(); - // 7. Add the modseq entry if needed - let is_modseq = crit.is_modseq(); - let maybe_modseq = match is_modseq { - true => { - let final_selection = kept_idx.iter().chain(kept_query.iter()); - final_selection - .map(|in_idx| in_idx.modseq) - .max() - .map(|r| NonZeroU64::try_from(r)) - .transpose()? - } + // 9. Aggregate the maximum modseq value + let maybe_modseq = match crit.is_modseq() { + true => modseqs.into_iter().max(), _ => None, }; + // 10. Return the final result Ok(( - vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], - is_modseq, + vec![Body::Data(Data::Search(selection, maybe_modseq))], + maybe_modseq.is_some(), )) } @@ -626,7 +631,7 @@ impl MailboxView { mod tests { use super::*; use imap_codec::encode::Encoder; - use imap_codec::imap_types::core::NonEmptyVec; + use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::fetch::Section; use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName}; use imap_codec::imap_types::response::Response; @@ -746,7 +751,7 @@ mod tests { let test_repr = Response::Data(Data::Fetch { seq: NonZeroU32::new(1).unwrap(), - items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure( + items: Vec1::from(MessageDataItem::Body(mime_view::bodystructure( &message.child, false, )?)), diff --git a/src/imap/mime_view.rs b/src/imap/mime_view.rs index 8fc043b..8bbbd2d 100644 --- a/src/imap/mime_view.rs +++ b/src/imap/mime_view.rs @@ -8,7 +8,7 @@ use imap_codec::imap_types::body::{ BasicFields, Body as FetchBody, BodyStructure, MultiPartExtensionData, SinglePartExtensionData, SpecificFields, }; -use imap_codec::imap_types::core::{AString, IString, NString, NonEmptyVec}; +use imap_codec::imap_types::core::{AString, IString, NString, Vec1}; use imap_codec::imap_types::fetch::{Part as FetchPart, Section as FetchSection}; use eml_codec::{ @@ -141,8 +141,8 @@ impl<'a> NodeMime<'a> { enum SubsettedSection<'a> { Part, Header, - HeaderFields(&'a NonEmptyVec<AString<'a>>), - HeaderFieldsNot(&'a NonEmptyVec<AString<'a>>), + HeaderFields(&'a Vec1<AString<'a>>), + HeaderFieldsNot(&'a Vec1<AString<'a>>), Text, Mime, } @@ -238,7 +238,7 @@ impl<'a> SelectedMime<'a> { /// case-insensitive but otherwise exact. fn header_fields( &self, - fields: &'a NonEmptyVec<AString<'a>>, + fields: &'a Vec1<AString<'a>>, invert: bool, ) -> Result<ExtractedFull<'a>> { // Build a lowercase ascii hashset with the fields to fetch @@ -398,8 +398,8 @@ impl<'a> NodeMult<'a> { .filter_map(|inner| NodeMime(&inner).structure(is_ext).ok()) .collect::<Vec<_>>(); - NonEmptyVec::validate(&inner_bodies)?; - let bodies = NonEmptyVec::unvalidated(inner_bodies); + Vec1::validate(&inner_bodies)?; + let bodies = Vec1::unvalidated(inner_bodies); Ok(BodyStructure::Multi { bodies, diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 58c4dc0..02ab9ce 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -15,7 +15,7 @@ mod session; use std::net::SocketAddr; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; @@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut; const PIPELINABLE_COMMANDS: usize = 64; -#[derive(Debug)] -enum LoopMode { - Quit, - Interactive, - Idle(BytesMut, Arc<Notify>), -} - // @FIXME a full refactor of this part of the code will be needed sooner or later struct NetLoop { ctx: ClientContext, @@ -163,7 +156,7 @@ impl NetLoop { async fn handler(ctx: ClientContext, sock: AnyStream) { let addr = ctx.addr.clone(); - let nl = match Self::new(ctx, sock).await { + let mut nl = match Self::new(ctx, sock).await { Ok(nl) => { tracing::debug!(addr=?addr, "netloop successfully initialized"); nl @@ -185,15 +178,15 @@ impl NetLoop { } async fn new(ctx: ClientContext, sock: AnyStream) -> Result<Self> { + let mut opts = ServerFlowOptions::default(); + opts.crlf_relaxed = false; + opts.literal_accept_text = Text::unvalidated("OK"); + opts.literal_reject_text = Text::unvalidated("Literal rejected"); + // Send greeting let (server, _) = ServerFlow::send_greeting( sock, - ServerFlowOptions { - crlf_relaxed: false, - literal_accept_text: Text::unvalidated("OK"), - literal_reject_text: Text::unvalidated("Literal rejected"), - ..ServerFlowOptions::default() - }, + opts, Greeting::ok( Some(Code::Capability(ctx.server_capabilities.to_vec())), "Aerogramme", @@ -241,85 +234,111 @@ impl NetLoop { tracing::info!("runner is quitting"); } - async fn core(mut self) -> Result<()> { - tracing::trace!("Starting the core loop"); - let mut mode = LoopMode::Interactive; + async fn core(&mut self) -> Result<()> { + let mut maybe_idle: Option<Arc<Notify>> = None; loop { - tracing::trace!(mode=?mode, "Core loop iter"); - mode = match mode { - LoopMode::Interactive => self.interactive_mode().await?, - LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, - LoopMode::Quit => break, - } - } - Ok(()) - } - - async fn interactive_mode(&mut self) -> Result<LoopMode> { - tokio::select! { - // Managing imap_flow stuff - srv_evt = self.server.progress() => match srv_evt? { - ServerFlowEvent::ResponseSent { handle: _handle, response } => { - match response { - Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), - _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), - } - }, - ServerFlowEvent::CommandReceived { command } => { - match self.cmd_tx.try_send(Request::ImapCommand(command)) { - Ok(_) => (), - Err(mpsc::error::TrySendError::Full(_)) => { - self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + tokio::select! { + // Managing imap_flow stuff + srv_evt = self.server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => return Ok(()), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), + } + }, + ServerFlowEvent::CommandReceived { command } => { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } } - _ => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + ServerFlowEvent::IdleCommandReceived { tag } => { + match self.cmd_tx.try_send(Request::IdleStart(tag)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } } } - }, - flow => { - self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); - } - }, - - // Managing response generated by Aerogramme - maybe_msg = self.resp_rx.recv() => match maybe_msg { - Some(ResponseOrIdle::Response(response)) => { - tracing::trace!("Interactive, server has a response for the client"); - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => self.server.enqueue_data(d), - Body::Status(s) => self.server.enqueue_status(s), - }; + ServerFlowEvent::IdleDoneReceived => { + tracing::trace!("client sent DONE and want to stop IDLE"); + maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one(); + maybe_idle = None; + } + flow => { + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } - self.server.enqueue_status(response.completion); - }, - Some(ResponseOrIdle::StartIdle(stop)) => { - tracing::trace!("Interactive, server agreed to switch in idle mode"); - let cr = CommandContinuationRequest::basic(None, "Idling")?; - self.server.enqueue_continuation(cr); - self.cmd_tx.try_send(Request::Idle)?; - return Ok(LoopMode::Idle(BytesMut::new(), stop)) - }, - None => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, - Some(_) => unreachable!(), - }, + // Managing response generated by Aerogramme + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + tracing::trace!("Interactive, server has a response for the client"); + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::IdleAccept(stop)) => { + tracing::trace!("Interactive, server agreed to switch in idle mode"); + let cr = CommandContinuationRequest::basic(None, "Idling")?; + self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?; + self.cmd_tx.try_send(Request::IdlePoll)?; + if maybe_idle.is_some() { + bail!("Can't start IDLE if already idling"); + } + maybe_idle = Some(stop); + }, + Some(ResponseOrIdle::IdleEvent(elems)) => { + tracing::trace!("server imap session has some change to communicate to the client"); + for body_elem in elems.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.cmd_tx.try_send(Request::IdlePoll)?; + }, + Some(ResponseOrIdle::IdleReject(response)) => { + tracing::trace!("inform client that session rejected idle"); + self.server + .idle_reject(response.completion) + .or(Err(anyhow!("wrong reject command")))?; + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + Some(_) => unreachable!(), - // When receiving a CTRL+C - _ = self.ctx.must_exit.changed() => { - tracing::trace!("Interactive, CTRL+C, exiting"); - self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); - }, - }; - Ok(LoopMode::Interactive) + }, + + // When receiving a CTRL+C + _ = self.ctx.must_exit.changed() => { + tracing::trace!("Interactive, CTRL+C, exiting"); + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; + } } + /* async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> { // Flush send loop { @@ -398,5 +417,5 @@ impl NetLoop { return Ok(LoopMode::Interactive) }, }; - } + }*/ } diff --git a/src/imap/request.rs b/src/imap/request.rs index 49b4992..cff18a3 100644 --- a/src/imap/request.rs +++ b/src/imap/request.rs @@ -1,7 +1,9 @@ use imap_codec::imap_types::command::Command; +use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Request { ImapCommand(Command<'static>), - Idle, + IdleStart(Tag<'static>), + IdlePoll, } diff --git a/src/imap/response.rs b/src/imap/response.rs index 40e6927..b6a0e98 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -118,6 +118,7 @@ impl<'a> Response<'a> { #[derive(Debug)] pub enum ResponseOrIdle { Response(Response<'static>), - StartIdle(Arc<Notify>), + IdleAccept(Arc<Notify>), + IdleReject(Response<'static>), IdleEvent(Vec<Body<'static>>), } diff --git a/src/imap/search.rs b/src/imap/search.rs index d06c3bd..37a7e9e 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -1,13 +1,12 @@ use std::num::{NonZeroU32, NonZeroU64}; -use anyhow::Result; -use imap_codec::imap_types::core::NonEmptyVec; +use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use crate::imap::index::MailIndex; use crate::imap::mail_view::MailView; -use crate::mail::query::{QueryResult, QueryScope}; +use crate::mail::query::QueryScope; pub enum SeqType { Undefined, @@ -49,7 +48,7 @@ impl<'a> Criteria<'a> { let mut new_vec = base.0.into_inner(); new_vec.extend_from_slice(ext.0.as_ref()); let seq = SequenceSet( - NonEmptyVec::try_from(new_vec) + Vec1::try_from(new_vec) .expect("merging non empty vec lead to non empty vec"), ); (seq, x) @@ -145,22 +144,6 @@ impl<'a> Criteria<'a> { (to_keep, to_fetch) } - pub fn filter_on_query<'b>( - &self, - midx_list: &[&'b MailIndex<'b>], - query_result: &'b Vec<QueryResult>, - ) -> Result<Vec<&'b MailIndex<'b>>> { - Ok(midx_list - .iter() - .zip(query_result.iter()) - .map(|(midx, qr)| MailView::new(qr, midx)) - .collect::<Result<Vec<_>, _>>()? - .into_iter() - .filter(|mail_view| self.is_keep_on_query(mail_view)) - .map(|mail_view| mail_view.in_idx) - .collect()) - } - // ---- /// Here we are doing a partial filtering: we do not have access @@ -213,7 +196,7 @@ impl<'a> Criteria<'a> { /// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true, /// we could simplify the request to just body(x) and truncate the first OR. Today, we are /// not doing that, and thus we reevaluate everything. - fn is_keep_on_query(&self, mail_view: &MailView) -> bool { + pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool { use SearchKey::*; match self.0 { // Combinator logic diff --git a/src/imap/session.rs b/src/imap/session.rs index 12bbfee..fa3232a 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -4,8 +4,8 @@ use crate::imap::flow; use crate::imap::request::Request; use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; -use anyhow::{anyhow, bail, Result}; -use imap_codec::imap_types::command::Command; +use anyhow::{anyhow, bail, Context, Result}; +use imap_codec::imap_types::{command::Command, core::Tag}; //----- pub struct Instance { @@ -27,13 +27,48 @@ impl Instance { pub async fn request(&mut self, req: Request) -> ResponseOrIdle { match req { - Request::Idle => self.idle().await, + Request::IdleStart(tag) => self.idle_init(tag), + Request::IdlePoll => self.idle_poll().await, Request::ImapCommand(cmd) => self.command(cmd).await, } } - pub async fn idle(&mut self) -> ResponseOrIdle { - match self.idle_happy().await { + pub fn idle_init(&mut self, tag: Tag<'static>) -> ResponseOrIdle { + // Build transition + //@FIXME the notifier should be hidden inside the state and thus not part of the transition! + let transition = flow::Transition::Idle(tag.clone(), tokio::sync::Notify::new()); + + // Try to apply the transition and get the stop notifier + let maybe_stop = self + .state + .apply(transition) + .context("IDLE transition failed") + .and_then(|_| { + self.state + .notify() + .ok_or(anyhow!("IDLE state has no Notify object")) + }); + + // Build an appropriate response + match maybe_stop { + Ok(stop) => ResponseOrIdle::IdleAccept(stop), + Err(e) => { + tracing::error!(err=?e, "unable to init idle due to a transition error"); + //ResponseOrIdle::IdleReject(tag) + let no = Response::build() + .tag(tag) + .message( + "Internal error, processing command triggered an illegal IMAP state transition", + ) + .no() + .unwrap(); + ResponseOrIdle::IdleReject(no) + } + } + } + + pub async fn idle_poll(&mut self) -> ResponseOrIdle { + match self.idle_poll_happy().await { Ok(r) => r, Err(e) => { tracing::error!(err=?e, "something bad happened in idle"); @@ -42,7 +77,7 @@ impl Instance { } } - pub async fn idle_happy(&mut self) -> Result<ResponseOrIdle> { + pub async fn idle_poll_happy(&mut self) -> Result<ResponseOrIdle> { let (mbx, tag, stop) = match &mut self.state { flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()), _ => bail!("Invalid session state, can't idle"), @@ -128,10 +163,11 @@ impl Instance { .bad() .unwrap()); } + ResponseOrIdle::Response(resp) - match &self.state { + /*match &self.state { flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), _ => ResponseOrIdle::Response(resp), - } + }*/ } } diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index e73e1dc..0af5676 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -21,6 +21,7 @@ pub struct LdapLoginProvider { storage_specific: StorageSpecific, in_memory_store: storage::in_memory::MemDb, + garage_store: storage::garage::GarageRoot, } enum BucketSource { @@ -91,7 +92,11 @@ impl LdapLoginProvider { mail_attr: config.mail_attr, crypto_root_attr: config.crypto_root_attr, storage_specific: specific, + //@FIXME should be created outside of the login provider + //Login provider should return only a cryptoroot + a storage URI + //storage URI that should be resolved outside... in_memory_store: storage::in_memory::MemDb::new(), + garage_store: storage::garage::GarageRoot::new()?, }) } @@ -114,7 +119,7 @@ impl LdapLoginProvider { BucketSource::Attr(a) => get_attr(user, &a)?, }; - storage::garage::GarageBuilder::new(storage::garage::GarageConf { + self.garage_store.user(storage::garage::GarageConf { region: from_config.aws_region.clone(), s3_endpoint: from_config.s3_endpoint.clone(), k2v_endpoint: from_config.k2v_endpoint.clone(), diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index 1e1ecbf..79626df 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -25,6 +25,7 @@ pub struct UserDatabase { pub struct StaticLoginProvider { user_db: watch::Receiver<UserDatabase>, in_memory_store: storage::in_memory::MemDb, + garage_store: storage::garage::GarageRoot, } pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> { @@ -84,6 +85,7 @@ impl StaticLoginProvider { Ok(Self { user_db: rx, in_memory_store: storage::in_memory::MemDb::new(), + garage_store: storage::garage::GarageRoot::new()?, }) } } @@ -109,7 +111,7 @@ impl LoginProvider for StaticLoginProvider { let storage: storage::Builder = match &user.config.storage { StaticStorage::InMemory => self.in_memory_store.builder(username).await, StaticStorage::Garage(grgconf) => { - storage::garage::GarageBuilder::new(storage::garage::GarageConf { + self.garage_store.user(storage::garage::GarageConf { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), s3_endpoint: grgconf.s3_endpoint.clone(), @@ -140,7 +142,7 @@ impl LoginProvider for StaticLoginProvider { let storage: storage::Builder = match &user.config.storage { StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await, StaticStorage::Garage(grgconf) => { - storage::garage::GarageBuilder::new(storage::garage::GarageConf { + self.garage_store.user(storage::garage::GarageConf { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), s3_endpoint: grgconf.s3_endpoint.clone(), diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c20d815..9190883 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou<UidIndex>) { /// The metadata of a message that is stored in K2V /// at pk = mail/<mailbox uuid>, sk = <message uuid> -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MailMeta { /// INTERNALDATE field (milliseconds since epoch) pub internaldate: u64, diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..3e6fe99 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::future::FutureExt; +use futures::stream::{BoxStream, Stream, StreamExt}; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,62 @@ impl QueryScope { } } +//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result<Vec<QueryResult>> { + pub fn fetch(&self) -> BoxStream<Result<QueryResult>> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin( + futures::stream::iter(self.emails) + .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), + ), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + async move { + let maybe_meta_list: Result<Vec<MailMeta>> = + self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| { + meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + }) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result<Vec<QueryResult>> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::<Vec<_>>(); - - Ok(result) + futures::stream::iter(list_res) + } + .flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result<Vec<QueryResult>> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { - let content = self - .frozen - .mailbox - .fetch_full( - *meta.uuid(), - &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) - .await?; + fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + self.partial().then(move |maybe_meta| async move { + let meta = maybe_meta?; - Ok(meta.into_full(content).expect("meta to be PartialResult")) - }) - .collect::<FuturesOrdered<_>>() - .collect::<Vec<_>>() - .await - .into_iter() - .collect::<Result<Vec<_>, _>>() + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent, diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 709e729..7152764 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,7 +1,45 @@ -use crate::storage::*; use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use aws_smithy_runtime_api::client::http::SharedHttpClient; +use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; +use hyper_util::rt::TokioExecutor; use serde::Serialize; +use crate::storage::*; + +pub struct GarageRoot { + k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>, + aws_http: SharedHttpClient, +} + +impl GarageRoot { + pub fn new() -> anyhow::Result<Self> { + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots()? + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector); + let aws_http = HyperClientBuilder::new().build_https(); + Ok(Self { k2v_http, aws_http }) + } + + pub fn user(&self, conf: GarageConf) -> anyhow::Result<Arc<GarageUser>> { + let mut unicity: Vec<u8> = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.append(&mut rmp_serde::to_vec(&conf)?); + + Ok(Arc::new(GarageUser { + conf, + aws_http: self.aws_http.clone(), + k2v_http: self.k2v_http.clone(), + unicity, + })) + } +} + #[derive(Clone, Debug, Serialize)] pub struct GarageConf { pub region: String, @@ -12,23 +50,19 @@ pub struct GarageConf { pub bucket: String, } +//@FIXME we should get rid of this builder +//and allocate a S3 + K2V client only once per user +//(and using a shared HTTP client) #[derive(Clone, Debug)] -pub struct GarageBuilder { +pub struct GarageUser { conf: GarageConf, + aws_http: SharedHttpClient, + k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>, unicity: Vec<u8>, } -impl GarageBuilder { - pub fn new(conf: GarageConf) -> anyhow::Result<Arc<Self>> { - let mut unicity: Vec<u8> = vec![]; - unicity.extend_from_slice(file!().as_bytes()); - unicity.append(&mut rmp_serde::to_vec(&conf)?); - Ok(Arc::new(Self { conf, unicity })) - } -} - #[async_trait] -impl IBuilder for GarageBuilder { +impl IBuilder for GarageUser { async fn build(&self) -> Result<Store, StorageError> { let s3_creds = s3::config::Credentials::new( self.conf.aws_access_key_id.clone(), @@ -41,6 +75,7 @@ impl IBuilder for GarageBuilder { let sdk_config = aws_config::from_env() .region(aws_config::Region::new(self.conf.region.clone())) .credentials_provider(s3_creds) + .http_client(self.aws_http.clone()) .endpoint_url(self.conf.s3_endpoint.clone()) .load() .await; @@ -60,13 +95,14 @@ impl IBuilder for GarageBuilder { user_agent: None, }; - let k2v_client = match k2v_client::K2vClient::new(k2v_config) { - Err(e) => { - tracing::error!("unable to build k2v client: {}", e); - return Err(StorageError::Internal); - } - Ok(v) => v, - }; + let k2v_client = + match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) { + Err(e) => { + tracing::error!("unable to build k2v client: {}", e); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; Ok(Box::new(GarageStore { bucket: self.conf.bucket.clone(), |