aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorQuentin <quentin@dufour.io>2024-02-23 17:32:38 +0000
committerQuentin <quentin@dufour.io>2024-02-23 17:32:38 +0000
commitd92ae5220cdaddf941da5a216fbd2c3549ccdec3 (patch)
tree464814294b6e50811e0c667cd1430c6855006f8a /src
parent0bb7cdf696190200d1885ec822518ac45b685a9b (diff)
parent1ea3de30995b434c4e59123c1ab634d89a0274b5 (diff)
downloadaerogramme-0.2.2.tar.gz
aerogramme-0.2.2.zip
Merge pull request 'Perf measurement & bottleneck fix' (#102) from perf/cpu-ram-bottleneck into main0.2.2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/102
Diffstat (limited to 'src')
-rw-r--r--src/imap/capability.rs4
-rw-r--r--src/imap/command/authenticated.rs22
-rw-r--r--src/imap/command/selected.rs17
-rw-r--r--src/imap/flow.rs11
-rw-r--r--src/imap/mailbox_view.rs131
-rw-r--r--src/imap/mime_view.rs12
-rw-r--r--src/imap/mod.rs189
-rw-r--r--src/imap/request.rs4
-rw-r--r--src/imap/response.rs3
-rw-r--r--src/imap/search.rs25
-rw-r--r--src/imap/session.rs52
-rw-r--r--src/login/ldap_provider.rs7
-rw-r--r--src/login/static_provider.rs6
-rw-r--r--src/mail/mailbox.rs2
-rw-r--r--src/mail/query.rs91
-rw-r--r--src/storage/garage.rs74
16 files changed, 374 insertions, 276 deletions
diff --git a/src/imap/capability.rs b/src/imap/capability.rs
index 256d820..c76b51c 100644
--- a/src/imap/capability.rs
+++ b/src/imap/capability.rs
@@ -1,5 +1,5 @@
use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier};
-use imap_codec::imap_types::core::NonEmptyVec;
+use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
use imap_codec::imap_types::response::Capability;
use std::collections::HashSet;
@@ -49,7 +49,7 @@ impl Default for ServerCapability {
}
impl ServerCapability {
- pub fn to_vec(&self) -> NonEmptyVec<Capability<'static>> {
+ pub fn to_vec(&self) -> Vec1<Capability<'static>> {
self.0
.iter()
.map(|v| v.clone())
diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs
index 26b1946..eb8833d 100644
--- a/src/imap/command/authenticated.rs
+++ b/src/imap/command/authenticated.rs
@@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::{
Command, CommandBody, ListReturnItem, SelectExamineModifier,
};
-use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar};
+use imap_codec::imap_types::core::{Atom, Literal, QuotedChar, Vec1};
use imap_codec::imap_types::datetime::DateTime;
use imap_codec::imap_types::extensions::enable::CapabilityEnable;
use imap_codec::imap_types::flag::{Flag, FlagNameAttribute};
@@ -17,10 +17,10 @@ use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName};
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, MailboxName};
use crate::imap::flow;
-use crate::imap::mailbox_view::MailboxView;
+use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response;
+use crate::imap::Body;
-use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::*;
use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW};
use crate::mail::IMF;
@@ -549,6 +549,8 @@ impl<'a> AuthenticatedContext<'a> {
))
}
+ //@FIXME we should write a specific version for the "selected" state
+ //that returns some unsollicited responses
async fn append(
self,
mailbox: &MailboxCodec<'a>,
@@ -558,7 +560,7 @@ impl<'a> AuthenticatedContext<'a> {
) -> Result<(Response<'static>, flow::Transition)> {
let append_tag = self.req.tag.clone();
match self.append_internal(mailbox, flags, date, message).await {
- Ok((_mb, uidvalidity, uid, _modseq)) => Ok((
+ Ok((_mb_view, uidvalidity, uid, _modseq)) => Ok((
Response::build()
.tag(append_tag)
.message("APPEND completed")
@@ -580,7 +582,7 @@ impl<'a> AuthenticatedContext<'a> {
fn enable(
self,
- cap_enable: &NonEmptyVec<CapabilityEnable<'static>>,
+ cap_enable: &Vec1<CapabilityEnable<'static>>,
) -> Result<(Response<'static>, flow::Transition)> {
let mut response_builder = Response::build().to_req(self.req);
let capabilities = self.client_capabilities.try_enable(cap_enable.as_ref());
@@ -593,13 +595,14 @@ impl<'a> AuthenticatedContext<'a> {
))
}
+ //@FIXME should be refactored and integrated to the mailbox view
pub(crate) async fn append_internal(
self,
mailbox: &MailboxCodec<'a>,
flags: &[Flag<'a>],
date: &Option<DateTime>,
message: &Literal<'a>,
- ) -> Result<(Arc<Mailbox>, ImapUidvalidity, ImapUid, ModSeq)> {
+ ) -> Result<(MailboxView, ImapUidvalidity, ImapUid, ModSeq)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -607,6 +610,7 @@ impl<'a> AuthenticatedContext<'a> {
Some(mb) => mb,
None => bail!("Mailbox does not exist"),
};
+ let mut view = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await;
if date.is_some() {
tracing::warn!("Cannot set date when appending message");
@@ -617,9 +621,11 @@ impl<'a> AuthenticatedContext<'a> {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
// TODO: filter allowed flags? ping @Quentin
- let (uidvalidity, uid, modseq) = mb.append(msg, None, &flags[..]).await?;
+ let (uidvalidity, uid, modseq) =
+ view.internal.mailbox.append(msg, None, &flags[..]).await?;
+ //let unsollicited = view.update(UpdateParameters::default()).await?;
- Ok((mb, uidvalidity, uid, modseq))
+ Ok((view, uidvalidity, uid, modseq))
}
}
diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs
index 73f8aec..d000905 100644
--- a/src/imap/command/selected.rs
+++ b/src/imap/command/selected.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
-use imap_codec::imap_types::core::Charset;
+use imap_codec::imap_types::core::{Charset, Vec1};
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
@@ -54,11 +54,15 @@ pub async fn dispatch<'a>(
ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid)
.await
}
+ //@FIXME SearchKey::And is a legacy hack, should be refactored
CommandBody::Search {
charset,
criteria,
uid,
- } => ctx.search(charset, criteria, uid).await,
+ } => {
+ ctx.search(charset, &SearchKey::And(criteria.clone()), uid)
+ .await
+ }
CommandBody::Expunge {
// UIDPLUS (rfc4315)
uid_sequence_set,
@@ -88,15 +92,6 @@ pub async fn dispatch<'a>(
// UNSELECT extension (rfc3691)
CommandBody::Unselect => ctx.unselect().await,
- // IDLE extension (rfc2177)
- CommandBody::Idle => Ok((
- Response::build()
- .to_req(ctx.req)
- .message("DUMMY command due to anti-pattern in the code")
- .ok()?,
- flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()),
- )),
-
// In selected mode, we fallback to authenticated when needed
_ => {
authenticated::dispatch(authenticated::AuthenticatedContext {
diff --git a/src/imap/flow.rs b/src/imap/flow.rs
index 6ddd092..e372d69 100644
--- a/src/imap/flow.rs
+++ b/src/imap/flow.rs
@@ -1,11 +1,12 @@
use std::error::Error as StdError;
use std::fmt;
use std::sync::Arc;
+
+use imap_codec::imap_types::core::Tag;
use tokio::sync::Notify;
use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User;
-use imap_codec::imap_types::core::Tag;
#[derive(Debug)]
pub enum Error {
@@ -31,6 +32,14 @@ pub enum State {
),
Logout,
}
+impl State {
+ pub fn notify(&self) -> Option<Arc<Notify>> {
+ match self {
+ Self::Idle(_, _, _, _, anotif) => Some(anotif.clone()),
+ _ => None,
+ }
+ }
+}
impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use State::*;
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs
index d57e9a3..1c53b93 100644
--- a/src/imap/mailbox_view.rs
+++ b/src/imap/mailbox_view.rs
@@ -4,9 +4,9 @@ use std::sync::Arc;
use anyhow::{anyhow, Error, Result};
-use futures::stream::{FuturesOrdered, StreamExt};
+use futures::stream::{StreamExt, TryStreamExt};
-use imap_codec::imap_types::core::Charset;
+use imap_codec::imap_types::core::{Charset, Vec1};
use imap_codec::imap_types::fetch::MessageDataItem;
use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType};
use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
@@ -362,46 +362,36 @@ impl MailboxView {
.iter()
.map(|midx| midx.uuid)
.collect::<Vec<_>>();
- let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
- // [3/6] Derive an IMAP-specific view from the results, apply the filters
- let views = query_result
- .iter()
- .zip(mail_idx_list.into_iter())
- .map(|(qr, midx)| MailView::new(qr, midx))
- .collect::<Result<Vec<_>, _>>()?;
-
- // [4/6] Apply the IMAP transformation, bubble up any error
- // We get 2 results:
- // - The one we send to the client
- // - The \Seen flags we must set internally
- let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views
- .iter()
- .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body)))
- .collect::<Result<Vec<_>, _>>()?
- .into_iter()
- .unzip();
+ let query = self.internal.query(&uuids, query_scope);
+ //let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
- // [5/6] Register the \Seen flags
- flag_mgmt
- .iter()
- .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd))
- .map(|(mv, _seen)| async move {
- let seen_flag = Flag::Seen.to_string();
- self.internal
- .mailbox
- .add_flags(*mv.query_result.uuid(), &[seen_flag])
- .await?;
- Ok::<_, anyhow::Error>(())
+ let query_stream = query
+ .fetch()
+ .zip(futures::stream::iter(mail_idx_list))
+ // [3/6] Derive an IMAP-specific view from the results, apply the filters
+ .map(|(maybe_qr, midx)| match maybe_qr {
+ Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)),
+ Err(e) => Err(e),
})
- .collect::<FuturesOrdered<_>>()
- .collect::<Vec<_>>()
- .await
- .into_iter()
- .collect::<Result<_, _>>()?;
+ // [4/6] Apply the IMAP transformation
+ .then(|maybe_ret| async move {
+ let ((body, seen), midx) = maybe_ret?;
+
+ // [5/6] Register the \Seen flags
+ if matches!(seen, SeenFlag::MustAdd) {
+ let seen_flag = Flag::Seen.to_string();
+ self.internal
+ .mailbox
+ .add_flags(midx.uuid, &[seen_flag])
+ .await?;
+ }
+
+ Ok::<_, anyhow::Error>(body)
+ });
// [6/6] Build the final result that will be sent to the client.
- Ok(imap_ret)
+ query_stream.try_collect().await
}
/// A naive search implementation...
@@ -423,39 +413,54 @@ impl MailboxView {
// 3. Filter the selection based on the ID / UID / Flags
let (kept_idx, to_fetch) = crit.filter_on_idx(&selection);
- // 4. Fetch additional info about the emails
+ // 4.a Fetch additional info about the emails
let query_scope = crit.query_scope();
let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>();
- let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
+ let query = self.internal.query(&uuids, query_scope);
+
+ // 4.b We don't want to keep all data in memory, so we do the computing in a stream
+ let query_stream = query
+ .fetch()
+ .zip(futures::stream::iter(&to_fetch))
+ // 5.a Build a mailview with the body, might fail with an error
+ // 5.b If needed, filter the selection based on the body, but keep the errors
+ // 6. Drop the query+mailbox, keep only the mail index
+ // Here we release a lot of memory, this is the most important part ^^
+ .filter_map(|(maybe_qr, midx)| {
+ let r = match maybe_qr {
+ Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) {
+ Ok(true) => Some(Ok(*midx)),
+ Ok(_) => None,
+ Err(e) => Some(Err(e)),
+ },
+ Err(e) => Some(Err(e)),
+ };
+ futures::future::ready(r)
+ });
- // 5. If needed, filter the selection based on the body
- let kept_query = crit.filter_on_query(&to_fetch, &query_result)?;
+ // 7. Chain both streams (part resolved from index, part resolved from metadata+body)
+ let main_stream = futures::stream::iter(kept_idx)
+ .map(Ok)
+ .chain(query_stream)
+ .map_ok(|idx| match uid {
+ true => (idx.uid, idx.modseq),
+ _ => (idx.i, idx.modseq),
+ });
- // 6. Format the result according to the client's taste:
- // either return UID or ID.
- let final_selection = kept_idx.iter().chain(kept_query.iter());
- let selection_fmt = match uid {
- true => final_selection.map(|in_idx| in_idx.uid).collect(),
- _ => final_selection.map(|in_idx| in_idx.i).collect(),
- };
+ // 8. Do the actual computation
+ let internal_result: Vec<_> = main_stream.try_collect().await?;
+ let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip();
- // 7. Add the modseq entry if needed
- let is_modseq = crit.is_modseq();
- let maybe_modseq = match is_modseq {
- true => {
- let final_selection = kept_idx.iter().chain(kept_query.iter());
- final_selection
- .map(|in_idx| in_idx.modseq)
- .max()
- .map(|r| NonZeroU64::try_from(r))
- .transpose()?
- }
+ // 9. Aggregate the maximum modseq value
+ let maybe_modseq = match crit.is_modseq() {
+ true => modseqs.into_iter().max(),
_ => None,
};
+ // 10. Return the final result
Ok((
- vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
- is_modseq,
+ vec![Body::Data(Data::Search(selection, maybe_modseq))],
+ maybe_modseq.is_some(),
))
}
@@ -626,7 +631,7 @@ impl MailboxView {
mod tests {
use super::*;
use imap_codec::encode::Encoder;
- use imap_codec::imap_types::core::NonEmptyVec;
+ use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::fetch::Section;
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName};
use imap_codec::imap_types::response::Response;
@@ -746,7 +751,7 @@ mod tests {
let test_repr = Response::Data(Data::Fetch {
seq: NonZeroU32::new(1).unwrap(),
- items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure(
+ items: Vec1::from(MessageDataItem::Body(mime_view::bodystructure(
&message.child,
false,
)?)),
diff --git a/src/imap/mime_view.rs b/src/imap/mime_view.rs
index 8fc043b..8bbbd2d 100644
--- a/src/imap/mime_view.rs
+++ b/src/imap/mime_view.rs
@@ -8,7 +8,7 @@ use imap_codec::imap_types::body::{
BasicFields, Body as FetchBody, BodyStructure, MultiPartExtensionData, SinglePartExtensionData,
SpecificFields,
};
-use imap_codec::imap_types::core::{AString, IString, NString, NonEmptyVec};
+use imap_codec::imap_types::core::{AString, IString, NString, Vec1};
use imap_codec::imap_types::fetch::{Part as FetchPart, Section as FetchSection};
use eml_codec::{
@@ -141,8 +141,8 @@ impl<'a> NodeMime<'a> {
enum SubsettedSection<'a> {
Part,
Header,
- HeaderFields(&'a NonEmptyVec<AString<'a>>),
- HeaderFieldsNot(&'a NonEmptyVec<AString<'a>>),
+ HeaderFields(&'a Vec1<AString<'a>>),
+ HeaderFieldsNot(&'a Vec1<AString<'a>>),
Text,
Mime,
}
@@ -238,7 +238,7 @@ impl<'a> SelectedMime<'a> {
/// case-insensitive but otherwise exact.
fn header_fields(
&self,
- fields: &'a NonEmptyVec<AString<'a>>,
+ fields: &'a Vec1<AString<'a>>,
invert: bool,
) -> Result<ExtractedFull<'a>> {
// Build a lowercase ascii hashset with the fields to fetch
@@ -398,8 +398,8 @@ impl<'a> NodeMult<'a> {
.filter_map(|inner| NodeMime(&inner).structure(is_ext).ok())
.collect::<Vec<_>>();
- NonEmptyVec::validate(&inner_bodies)?;
- let bodies = NonEmptyVec::unvalidated(inner_bodies);
+ Vec1::validate(&inner_bodies)?;
+ let bodies = Vec1::unvalidated(inner_bodies);
Ok(BodyStructure::Multi {
bodies,
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index 58c4dc0..02ab9ce 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -15,7 +15,7 @@ mod session;
use std::net::SocketAddr;
-use anyhow::{bail, Result};
+use anyhow::{anyhow, bail, Context, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
@@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut;
const PIPELINABLE_COMMANDS: usize = 64;
-#[derive(Debug)]
-enum LoopMode {
- Quit,
- Interactive,
- Idle(BytesMut, Arc<Notify>),
-}
-
// @FIXME a full refactor of this part of the code will be needed sooner or later
struct NetLoop {
ctx: ClientContext,
@@ -163,7 +156,7 @@ impl NetLoop {
async fn handler(ctx: ClientContext, sock: AnyStream) {
let addr = ctx.addr.clone();
- let nl = match Self::new(ctx, sock).await {
+ let mut nl = match Self::new(ctx, sock).await {
Ok(nl) => {
tracing::debug!(addr=?addr, "netloop successfully initialized");
nl
@@ -185,15 +178,15 @@ impl NetLoop {
}
async fn new(ctx: ClientContext, sock: AnyStream) -> Result<Self> {
+ let mut opts = ServerFlowOptions::default();
+ opts.crlf_relaxed = false;
+ opts.literal_accept_text = Text::unvalidated("OK");
+ opts.literal_reject_text = Text::unvalidated("Literal rejected");
+
// Send greeting
let (server, _) = ServerFlow::send_greeting(
sock,
- ServerFlowOptions {
- crlf_relaxed: false,
- literal_accept_text: Text::unvalidated("OK"),
- literal_reject_text: Text::unvalidated("Literal rejected"),
- ..ServerFlowOptions::default()
- },
+ opts,
Greeting::ok(
Some(Code::Capability(ctx.server_capabilities.to_vec())),
"Aerogramme",
@@ -241,85 +234,111 @@ impl NetLoop {
tracing::info!("runner is quitting");
}
- async fn core(mut self) -> Result<()> {
- tracing::trace!("Starting the core loop");
- let mut mode = LoopMode::Interactive;
+ async fn core(&mut self) -> Result<()> {
+ let mut maybe_idle: Option<Arc<Notify>> = None;
loop {
- tracing::trace!(mode=?mode, "Core loop iter");
- mode = match mode {
- LoopMode::Interactive => self.interactive_mode().await?,
- LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
- LoopMode::Quit => break,
- }
- }
- Ok(())
- }
-
- async fn interactive_mode(&mut self) -> Result<LoopMode> {
- tokio::select! {
- // Managing imap_flow stuff
- srv_evt = self.server.progress() => match srv_evt? {
- ServerFlowEvent::ResponseSent { handle: _handle, response } => {
- match response {
- Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
- _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
- }
- },
- ServerFlowEvent::CommandReceived { command } => {
- match self.cmd_tx.try_send(Request::ImapCommand(command)) {
- Ok(_) => (),
- Err(mpsc::error::TrySendError::Full(_)) => {
- self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
- tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
+ tokio::select! {
+ // Managing imap_flow stuff
+ srv_evt = self.server.progress() => match srv_evt? {
+ ServerFlowEvent::ResponseSent { handle: _handle, response } => {
+ match response {
+ Response::Status(Status::Bye(_)) => return Ok(()),
+ _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
+ }
+ },
+ ServerFlowEvent::CommandReceived { command } => {
+ match self.cmd_tx.try_send(Request::ImapCommand(command)) {
+ Ok(_) => (),
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
+ }
+ _ => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ }
}
- _ => {
- self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
+ ServerFlowEvent::IdleCommandReceived { tag } => {
+ match self.cmd_tx.try_send(Request::IdleStart(tag)) {
+ Ok(_) => (),
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
+ }
+ _ => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ }
}
}
- },
- flow => {
- self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
- tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
- }
- },
-
- // Managing response generated by Aerogramme
- maybe_msg = self.resp_rx.recv() => match maybe_msg {
- Some(ResponseOrIdle::Response(response)) => {
- tracing::trace!("Interactive, server has a response for the client");
- for body_elem in response.body.into_iter() {
- let _handle = match body_elem {
- Body::Data(d) => self.server.enqueue_data(d),
- Body::Status(s) => self.server.enqueue_status(s),
- };
+ ServerFlowEvent::IdleDoneReceived => {
+ tracing::trace!("client sent DONE and want to stop IDLE");
+ maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one();
+ maybe_idle = None;
+ }
+ flow => {
+ self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
+ tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
- self.server.enqueue_status(response.completion);
- },
- Some(ResponseOrIdle::StartIdle(stop)) => {
- tracing::trace!("Interactive, server agreed to switch in idle mode");
- let cr = CommandContinuationRequest::basic(None, "Idling")?;
- self.server.enqueue_continuation(cr);
- self.cmd_tx.try_send(Request::Idle)?;
- return Ok(LoopMode::Idle(BytesMut::new(), stop))
- },
- None => {
- self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
},
- Some(_) => unreachable!(),
- },
+ // Managing response generated by Aerogramme
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ Some(ResponseOrIdle::Response(response)) => {
+ tracing::trace!("Interactive, server has a response for the client");
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ },
+ Some(ResponseOrIdle::IdleAccept(stop)) => {
+ tracing::trace!("Interactive, server agreed to switch in idle mode");
+ let cr = CommandContinuationRequest::basic(None, "Idling")?;
+ self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?;
+ self.cmd_tx.try_send(Request::IdlePoll)?;
+ if maybe_idle.is_some() {
+ bail!("Can't start IDLE if already idling");
+ }
+ maybe_idle = Some(stop);
+ },
+ Some(ResponseOrIdle::IdleEvent(elems)) => {
+ tracing::trace!("server imap session has some change to communicate to the client");
+ for body_elem in elems.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.cmd_tx.try_send(Request::IdlePoll)?;
+ },
+ Some(ResponseOrIdle::IdleReject(response)) => {
+ tracing::trace!("inform client that session rejected idle");
+ self.server
+ .idle_reject(response.completion)
+ .or(Err(anyhow!("wrong reject command")))?;
+ },
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
+ Some(_) => unreachable!(),
- // When receiving a CTRL+C
- _ = self.ctx.must_exit.changed() => {
- tracing::trace!("Interactive, CTRL+C, exiting");
- self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
- },
- };
- Ok(LoopMode::Interactive)
+ },
+
+ // When receiving a CTRL+C
+ _ = self.ctx.must_exit.changed() => {
+ tracing::trace!("Interactive, CTRL+C, exiting");
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ },
+ };
+ }
}
+ /*
async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
// Flush send
loop {
@@ -398,5 +417,5 @@ impl NetLoop {
return Ok(LoopMode::Interactive)
},
};
- }
+ }*/
}
diff --git a/src/imap/request.rs b/src/imap/request.rs
index 49b4992..cff18a3 100644
--- a/src/imap/request.rs
+++ b/src/imap/request.rs
@@ -1,7 +1,9 @@
use imap_codec::imap_types::command::Command;
+use imap_codec::imap_types::core::Tag;
#[derive(Debug)]
pub enum Request {
ImapCommand(Command<'static>),
- Idle,
+ IdleStart(Tag<'static>),
+ IdlePoll,
}
diff --git a/src/imap/response.rs b/src/imap/response.rs
index 40e6927..b6a0e98 100644
--- a/src/imap/response.rs
+++ b/src/imap/response.rs
@@ -118,6 +118,7 @@ impl<'a> Response<'a> {
#[derive(Debug)]
pub enum ResponseOrIdle {
Response(Response<'static>),
- StartIdle(Arc<Notify>),
+ IdleAccept(Arc<Notify>),
+ IdleReject(Response<'static>),
IdleEvent(Vec<Body<'static>>),
}
diff --git a/src/imap/search.rs b/src/imap/search.rs
index d06c3bd..37a7e9e 100644
--- a/src/imap/search.rs
+++ b/src/imap/search.rs
@@ -1,13 +1,12 @@
use std::num::{NonZeroU32, NonZeroU64};
-use anyhow::Result;
-use imap_codec::imap_types::core::NonEmptyVec;
+use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::imap::index::MailIndex;
use crate::imap::mail_view::MailView;
-use crate::mail::query::{QueryResult, QueryScope};
+use crate::mail::query::QueryScope;
pub enum SeqType {
Undefined,
@@ -49,7 +48,7 @@ impl<'a> Criteria<'a> {
let mut new_vec = base.0.into_inner();
new_vec.extend_from_slice(ext.0.as_ref());
let seq = SequenceSet(
- NonEmptyVec::try_from(new_vec)
+ Vec1::try_from(new_vec)
.expect("merging non empty vec lead to non empty vec"),
);
(seq, x)
@@ -145,22 +144,6 @@ impl<'a> Criteria<'a> {
(to_keep, to_fetch)
}
- pub fn filter_on_query<'b>(
- &self,
- midx_list: &[&'b MailIndex<'b>],
- query_result: &'b Vec<QueryResult>,
- ) -> Result<Vec<&'b MailIndex<'b>>> {
- Ok(midx_list
- .iter()
- .zip(query_result.iter())
- .map(|(midx, qr)| MailView::new(qr, midx))
- .collect::<Result<Vec<_>, _>>()?
- .into_iter()
- .filter(|mail_view| self.is_keep_on_query(mail_view))
- .map(|mail_view| mail_view.in_idx)
- .collect())
- }
-
// ----
/// Here we are doing a partial filtering: we do not have access
@@ -213,7 +196,7 @@ impl<'a> Criteria<'a> {
/// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true,
/// we could simplify the request to just body(x) and truncate the first OR. Today, we are
/// not doing that, and thus we reevaluate everything.
- fn is_keep_on_query(&self, mail_view: &MailView) -> bool {
+ pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool {
use SearchKey::*;
match self.0 {
// Combinator logic
diff --git a/src/imap/session.rs b/src/imap/session.rs
index 12bbfee..fa3232a 100644
--- a/src/imap/session.rs
+++ b/src/imap/session.rs
@@ -4,8 +4,8 @@ use crate::imap::flow;
use crate::imap::request::Request;
use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider;
-use anyhow::{anyhow, bail, Result};
-use imap_codec::imap_types::command::Command;
+use anyhow::{anyhow, bail, Context, Result};
+use imap_codec::imap_types::{command::Command, core::Tag};
//-----
pub struct Instance {
@@ -27,13 +27,48 @@ impl Instance {
pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
match req {
- Request::Idle => self.idle().await,
+ Request::IdleStart(tag) => self.idle_init(tag),
+ Request::IdlePoll => self.idle_poll().await,
Request::ImapCommand(cmd) => self.command(cmd).await,
}
}
- pub async fn idle(&mut self) -> ResponseOrIdle {
- match self.idle_happy().await {
+ pub fn idle_init(&mut self, tag: Tag<'static>) -> ResponseOrIdle {
+ // Build transition
+ //@FIXME the notifier should be hidden inside the state and thus not part of the transition!
+ let transition = flow::Transition::Idle(tag.clone(), tokio::sync::Notify::new());
+
+ // Try to apply the transition and get the stop notifier
+ let maybe_stop = self
+ .state
+ .apply(transition)
+ .context("IDLE transition failed")
+ .and_then(|_| {
+ self.state
+ .notify()
+ .ok_or(anyhow!("IDLE state has no Notify object"))
+ });
+
+ // Build an appropriate response
+ match maybe_stop {
+ Ok(stop) => ResponseOrIdle::IdleAccept(stop),
+ Err(e) => {
+ tracing::error!(err=?e, "unable to init idle due to a transition error");
+ //ResponseOrIdle::IdleReject(tag)
+ let no = Response::build()
+ .tag(tag)
+ .message(
+ "Internal error, processing command triggered an illegal IMAP state transition",
+ )
+ .no()
+ .unwrap();
+ ResponseOrIdle::IdleReject(no)
+ }
+ }
+ }
+
+ pub async fn idle_poll(&mut self) -> ResponseOrIdle {
+ match self.idle_poll_happy().await {
Ok(r) => r,
Err(e) => {
tracing::error!(err=?e, "something bad happened in idle");
@@ -42,7 +77,7 @@ impl Instance {
}
}
- pub async fn idle_happy(&mut self) -> Result<ResponseOrIdle> {
+ pub async fn idle_poll_happy(&mut self) -> Result<ResponseOrIdle> {
let (mbx, tag, stop) = match &mut self.state {
flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()),
_ => bail!("Invalid session state, can't idle"),
@@ -128,10 +163,11 @@ impl Instance {
.bad()
.unwrap());
}
+ ResponseOrIdle::Response(resp)
- match &self.state {
+ /*match &self.state {
flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()),
_ => ResponseOrIdle::Response(resp),
- }
+ }*/
}
}
diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs
index e73e1dc..0af5676 100644
--- a/src/login/ldap_provider.rs
+++ b/src/login/ldap_provider.rs
@@ -21,6 +21,7 @@ pub struct LdapLoginProvider {
storage_specific: StorageSpecific,
in_memory_store: storage::in_memory::MemDb,
+ garage_store: storage::garage::GarageRoot,
}
enum BucketSource {
@@ -91,7 +92,11 @@ impl LdapLoginProvider {
mail_attr: config.mail_attr,
crypto_root_attr: config.crypto_root_attr,
storage_specific: specific,
+ //@FIXME should be created outside of the login provider
+ //Login provider should return only a cryptoroot + a storage URI
+ //storage URI that should be resolved outside...
in_memory_store: storage::in_memory::MemDb::new(),
+ garage_store: storage::garage::GarageRoot::new()?,
})
}
@@ -114,7 +119,7 @@ impl LdapLoginProvider {
BucketSource::Attr(a) => get_attr(user, &a)?,
};
- storage::garage::GarageBuilder::new(storage::garage::GarageConf {
+ self.garage_store.user(storage::garage::GarageConf {
region: from_config.aws_region.clone(),
s3_endpoint: from_config.s3_endpoint.clone(),
k2v_endpoint: from_config.k2v_endpoint.clone(),
diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs
index 1e1ecbf..79626df 100644
--- a/src/login/static_provider.rs
+++ b/src/login/static_provider.rs
@@ -25,6 +25,7 @@ pub struct UserDatabase {
pub struct StaticLoginProvider {
user_db: watch::Receiver<UserDatabase>,
in_memory_store: storage::in_memory::MemDb,
+ garage_store: storage::garage::GarageRoot,
}
pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> {
@@ -84,6 +85,7 @@ impl StaticLoginProvider {
Ok(Self {
user_db: rx,
in_memory_store: storage::in_memory::MemDb::new(),
+ garage_store: storage::garage::GarageRoot::new()?,
})
}
}
@@ -109,7 +111,7 @@ impl LoginProvider for StaticLoginProvider {
let storage: storage::Builder = match &user.config.storage {
StaticStorage::InMemory => self.in_memory_store.builder(username).await,
StaticStorage::Garage(grgconf) => {
- storage::garage::GarageBuilder::new(storage::garage::GarageConf {
+ self.garage_store.user(storage::garage::GarageConf {
region: grgconf.aws_region.clone(),
k2v_endpoint: grgconf.k2v_endpoint.clone(),
s3_endpoint: grgconf.s3_endpoint.clone(),
@@ -140,7 +142,7 @@ impl LoginProvider for StaticLoginProvider {
let storage: storage::Builder = match &user.config.storage {
StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await,
StaticStorage::Garage(grgconf) => {
- storage::garage::GarageBuilder::new(storage::garage::GarageConf {
+ self.garage_store.user(storage::garage::GarageConf {
region: grgconf.aws_region.clone(),
k2v_endpoint: grgconf.k2v_endpoint.clone(),
s3_endpoint: grgconf.s3_endpoint.clone(),
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index c20d815..9190883 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou<UidIndex>) {
/// The metadata of a message that is stored in K2V
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MailMeta {
/// INTERNALDATE field (milliseconds since epoch)
pub internaldate: u64,
diff --git a/src/mail/query.rs b/src/mail/query.rs
index a183c5a..3e6fe99 100644
--- a/src/mail/query.rs
+++ b/src/mail/query.rs
@@ -2,7 +2,8 @@ use super::mailbox::MailMeta;
use super::snapshot::FrozenMailbox;
use super::unique_ident::UniqueIdent;
use anyhow::Result;
-use futures::stream::{FuturesOrdered, StreamExt};
+use futures::future::FutureExt;
+use futures::stream::{BoxStream, Stream, StreamExt};
/// Query is in charge of fetching efficiently
/// requested data for a list of emails
@@ -28,64 +29,62 @@ impl QueryScope {
}
}
+//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>;
+
impl<'a, 'b> Query<'a, 'b> {
- pub async fn fetch(&self) -> Result<Vec<QueryResult>> {
+ pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
match self.scope {
- QueryScope::Index => Ok(self
- .emails
- .iter()
- .map(|&uuid| QueryResult::IndexResult { uuid })
- .collect()),
- QueryScope::Partial => self.partial().await,
- QueryScope::Full => self.full().await,
+ QueryScope::Index => Box::pin(
+ futures::stream::iter(self.emails)
+ .map(|&uuid| Ok(QueryResult::IndexResult { uuid })),
+ ),
+ QueryScope::Partial => Box::pin(self.partial()),
+ QueryScope::Full => Box::pin(self.full()),
}
}
// --- functions below are private *for reasons*
+ fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+ async move {
+ let maybe_meta_list: Result<Vec<MailMeta>> =
+ self.frozen.mailbox.fetch_meta(self.emails).await;
+ let list_res = maybe_meta_list
+ .map(|meta_list| {
+ meta_list
+ .into_iter()
+ .zip(self.emails)
+ .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
+ .collect()
+ })
+ .unwrap_or_else(|e| vec![Err(e)]);
- async fn partial(&self) -> Result<Vec<QueryResult>> {
- let meta = self.frozen.mailbox.fetch_meta(self.emails).await?;
- let result = meta
- .into_iter()
- .zip(self.emails.iter())
- .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata })
- .collect::<Vec<_>>();
-
- Ok(result)
+ futures::stream::iter(list_res)
+ }
+ .flatten_stream()
}
- /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY
- /// AND GENERATE SO MUCH NETWORK TRAFFIC.
- /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH
- /// SOMETHING LIKE AN ITERATOR
- async fn full(&self) -> Result<Vec<QueryResult>> {
- let meta_list = self.partial().await?;
- meta_list
- .into_iter()
- .map(|meta| async move {
- let content = self
- .frozen
- .mailbox
- .fetch_full(
- *meta.uuid(),
- &meta
- .metadata()
- .expect("meta to be PartialResult")
- .message_key,
- )
- .await?;
+ fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+ self.partial().then(move |maybe_meta| async move {
+ let meta = maybe_meta?;
- Ok(meta.into_full(content).expect("meta to be PartialResult"))
- })
- .collect::<FuturesOrdered<_>>()
- .collect::<Vec<_>>()
- .await
- .into_iter()
- .collect::<Result<Vec<_>, _>>()
+ let content = self
+ .frozen
+ .mailbox
+ .fetch_full(
+ *meta.uuid(),
+ &meta
+ .metadata()
+ .expect("meta to be PartialResult")
+ .message_key,
+ )
+ .await?;
+
+ Ok(meta.into_full(content).expect("meta to be PartialResult"))
+ })
}
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum QueryResult {
IndexResult {
uuid: UniqueIdent,
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 709e729..7152764 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -1,7 +1,45 @@
-use crate::storage::*;
use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError};
+use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
+use aws_smithy_runtime_api::client::http::SharedHttpClient;
+use hyper_rustls::HttpsConnector;
+use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
+use hyper_util::rt::TokioExecutor;
use serde::Serialize;
+use crate::storage::*;
+
+pub struct GarageRoot {
+ k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>,
+ aws_http: SharedHttpClient,
+}
+
+impl GarageRoot {
+ pub fn new() -> anyhow::Result<Self> {
+ let connector = hyper_rustls::HttpsConnectorBuilder::new()
+ .with_native_roots()?
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build();
+ let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector);
+ let aws_http = HyperClientBuilder::new().build_https();
+ Ok(Self { k2v_http, aws_http })
+ }
+
+ pub fn user(&self, conf: GarageConf) -> anyhow::Result<Arc<GarageUser>> {
+ let mut unicity: Vec<u8> = vec![];
+ unicity.extend_from_slice(file!().as_bytes());
+ unicity.append(&mut rmp_serde::to_vec(&conf)?);
+
+ Ok(Arc::new(GarageUser {
+ conf,
+ aws_http: self.aws_http.clone(),
+ k2v_http: self.k2v_http.clone(),
+ unicity,
+ }))
+ }
+}
+
#[derive(Clone, Debug, Serialize)]
pub struct GarageConf {
pub region: String,
@@ -12,23 +50,19 @@ pub struct GarageConf {
pub bucket: String,
}
+//@FIXME we should get rid of this builder
+//and allocate a S3 + K2V client only once per user
+//(and using a shared HTTP client)
#[derive(Clone, Debug)]
-pub struct GarageBuilder {
+pub struct GarageUser {
conf: GarageConf,
+ aws_http: SharedHttpClient,
+ k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>,
unicity: Vec<u8>,
}
-impl GarageBuilder {
- pub fn new(conf: GarageConf) -> anyhow::Result<Arc<Self>> {
- let mut unicity: Vec<u8> = vec![];
- unicity.extend_from_slice(file!().as_bytes());
- unicity.append(&mut rmp_serde::to_vec(&conf)?);
- Ok(Arc::new(Self { conf, unicity }))
- }
-}
-
#[async_trait]
-impl IBuilder for GarageBuilder {
+impl IBuilder for GarageUser {
async fn build(&self) -> Result<Store, StorageError> {
let s3_creds = s3::config::Credentials::new(
self.conf.aws_access_key_id.clone(),
@@ -41,6 +75,7 @@ impl IBuilder for GarageBuilder {
let sdk_config = aws_config::from_env()
.region(aws_config::Region::new(self.conf.region.clone()))
.credentials_provider(s3_creds)
+ .http_client(self.aws_http.clone())
.endpoint_url(self.conf.s3_endpoint.clone())
.load()
.await;
@@ -60,13 +95,14 @@ impl IBuilder for GarageBuilder {
user_agent: None,
};
- let k2v_client = match k2v_client::K2vClient::new(k2v_config) {
- Err(e) => {
- tracing::error!("unable to build k2v client: {}", e);
- return Err(StorageError::Internal);
- }
- Ok(v) => v,
- };
+ let k2v_client =
+ match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) {
+ Err(e) => {
+ tracing::error!("unable to build k2v client: {}", e);
+ return Err(StorageError::Internal);
+ }
+ Ok(v) => v,
+ };
Ok(Box::new(GarageStore {
bucket: self.conf.bucket.clone(),