From 3c7186ab5ac5a66f782a038d937b6679780df458 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 11 Jan 2024 23:02:03 +0100 Subject: Finalize implementation of CONDSTORE --- src/imap/attributes.rs | 11 +++++- src/imap/command/examined.rs | 15 ++++++-- src/imap/command/selected.rs | 28 +++++++++++--- src/imap/index.rs | 32 +++++++++++++++- src/imap/mailbox_view.rs | 91 +++++++++++++++++++++++++++++++++++--------- 5 files changed, 147 insertions(+), 30 deletions(-) (limited to 'src/imap') diff --git a/src/imap/attributes.rs b/src/imap/attributes.rs index 6cf52b5..d094f1a 100644 --- a/src/imap/attributes.rs +++ b/src/imap/attributes.rs @@ -1,4 +1,5 @@ use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section}; +use imap_codec::imap_types::command::FetchModifier; /// Internal decisions based on fetched attributes /// passed by the client @@ -7,7 +8,7 @@ pub struct AttributesProxy { pub attrs: Vec>, } impl AttributesProxy { - pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self { + pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self { // Expand macros let mut fetch_attrs = match attrs { MacroOrMessageDataItemNames::Macro(m) => { @@ -31,6 +32,14 @@ impl AttributesProxy { fetch_attrs.push(MessageDataItemName::Uid); } + // Handle inferred MODSEQ tag + let is_changed_since = modifiers + .iter() + .any(|m| matches!(m, FetchModifier::ChangedSince(..))); + if is_changed_since && !fetch_attrs.contains(&MessageDataItemName::ModSeq) { + fetch_attrs.push(MessageDataItemName::ModSeq); + } + Self { attrs: fetch_attrs } } diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs index cdebc6d..9fc0990 100644 --- a/src/imap/command/examined.rs +++ b/src/imap/command/examined.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::num::NonZeroU64; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier}; @@ -11,7 +12,7 @@ use crate::imap::attributes::AttributesProxy; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, authenticated}; use crate::imap::flow; -use crate::imap::mailbox_view::MailboxView; +use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::response::Response; use crate::mail::user::User; @@ -93,9 +94,15 @@ impl<'a> ExaminedContext<'a> { modifiers: &[FetchModifier], uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { - let ap = AttributesProxy::new(attributes, *uid); + let ap = AttributesProxy::new(attributes, modifiers, *uid); + let mut changed_since: Option = None; + modifiers.iter().for_each(|m| match m { + FetchModifier::ChangedSince(val) => { + changed_since = Some(*val); + }, + }); - match self.mailbox.fetch(sequence_set, &ap, uid).await { + match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await { Ok(resp) => { // Capabilities enabling logic only on successful command // (according to my understanding of the spec) @@ -144,7 +151,7 @@ impl<'a> ExaminedContext<'a> { pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { self.mailbox.internal.mailbox.force_sync().await?; - let updates = self.mailbox.update().await?; + let updates = self.mailbox.update(UpdateParameters::default()).await?; Ok(( Response::build() .to_req(self.req) diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index d7aa94f..d694fd1 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::num::NonZeroU64; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; @@ -13,7 +14,7 @@ use imap_codec::imap_types::sequence::SequenceSet; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, authenticated, MailboxName}; use crate::imap::flow; -use crate::imap::mailbox_view::MailboxView; +use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::response::Response; use crate::imap::attributes::AttributesProxy; use crate::mail::user::User; @@ -118,9 +119,15 @@ impl<'a> SelectedContext<'a> { modifiers: &[FetchModifier], uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { - let ap = AttributesProxy::new(attributes, *uid); + let ap = AttributesProxy::new(attributes, modifiers, *uid); + let mut changed_since: Option = None; + modifiers.iter().for_each(|m| match m { + FetchModifier::ChangedSince(val) => { + changed_since = Some(*val); + }, + }); - match self.mailbox.fetch(sequence_set, &ap, uid).await { + match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await { Ok(resp) => { // Capabilities enabling logic only on successful command // (according to my understanding of the spec) @@ -170,7 +177,7 @@ impl<'a> SelectedContext<'a> { pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { self.mailbox.internal.mailbox.force_sync().await?; - let updates = self.mailbox.update().await?; + let updates = self.mailbox.update(UpdateParameters::default()).await?; Ok(( Response::build() .to_req(self.req) @@ -204,10 +211,18 @@ impl<'a> SelectedContext<'a> { modifiers: &[StoreModifier], uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { - let data = self + let mut unchanged_since: Option = None; + modifiers.iter().for_each(|m| match m { + StoreModifier::UnchangedSince(val) => { + unchanged_since = Some(*val); + }, + }); + + let (data, modified) = self .mailbox - .store(sequence_set, kind, response, flags, uid) + .store(sequence_set, kind, response, flags, unchanged_since, uid) .await?; + let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::>().join(",")); self.client_capabilities.store_modifiers_enable(modifiers); @@ -215,6 +230,7 @@ impl<'a> SelectedContext<'a> { Response::build() .to_req(self.req) .message("STORE completed") + .code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes()))) .set_body(data) .ok()?, flow::Transition::None, diff --git a/src/imap/index.rs b/src/imap/index.rs index 44109d5..9b794b8 100644 --- a/src/imap/index.rs +++ b/src/imap/index.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU32; +use std::num::{NonZeroU32, NonZeroU64}; use anyhow::{anyhow, Result}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; @@ -126,6 +126,36 @@ impl<'a> Index<'a> { _ => self.fetch_on_id(sequence_set), } } + + pub fn fetch_changed_since( + self: &'a Index<'a>, + sequence_set: &SequenceSet, + maybe_modseq: Option, + by_uid: bool, + ) -> Result>> { + let raw = self.fetch(sequence_set, by_uid)?; + let res = match maybe_modseq { + Some(pit) => raw.into_iter().filter(|midx| midx.modseq > pit).collect(), + None => raw, + }; + + Ok(res) + } + + pub fn fetch_unchanged_since( + self: &'a Index<'a>, + sequence_set: &SequenceSet, + maybe_modseq: Option, + by_uid: bool, + ) -> Result<(Vec<&'a MailIndex<'a>>, Vec<&'a MailIndex<'a>>)> { + let raw = self.fetch(sequence_set, by_uid)?; + let res = match maybe_modseq { + Some(pit) => raw.into_iter().partition(|midx| midx.modseq <= pit), + None => (raw, vec![]), + }; + + Ok(res) + } } #[derive(Clone, Debug)] diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index c3900cf..683e209 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -1,5 +1,6 @@ use std::num::{NonZeroU32, NonZeroU64}; use std::sync::Arc; +use std::collections::HashSet; use anyhow::{anyhow, Error, Result}; @@ -12,6 +13,7 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status}; use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::sequence::SequenceSet; +use crate::mail::unique_ident::UniqueIdent; use crate::mail::mailbox::Mailbox; use crate::mail::query::QueryScope; use crate::mail::snapshot::FrozenMailbox; @@ -32,6 +34,21 @@ const DEFAULT_FLAGS: [Flag; 5] = [ Flag::Draft, ]; +pub struct UpdateParameters { + pub silence: HashSet, + pub with_modseq: bool, + pub with_uid: bool, +} +impl Default for UpdateParameters { + fn default() -> Self { + Self { + silence: HashSet::new(), + with_modseq: false, + with_uid: false, + } + } +} + /// A MailboxView is responsible for giving the client the information /// it needs about a mailbox, such as an initial summary of the mailbox's /// content and continuous updates indicating when the content @@ -59,7 +76,7 @@ impl MailboxView { /// what the client knows and what is actually in the mailbox. /// This does NOT trigger a sync, it bases itself on what is currently /// loaded in RAM by Bayou. - pub async fn update(&mut self) -> Result>> { + pub async fn update(&mut self, params: UpdateParameters) -> Result>> { let old_snapshot = self.internal.update().await; let new_snapshot = &self.internal.snapshot; @@ -105,19 +122,31 @@ impl MailboxView { } else { // - if flags changed for existing mails, tell client for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() { + if params.silence.contains(uuid) { + continue; + } + let old_mail = old_snapshot.table.get(uuid); let new_mail = new_snapshot.table.get(uuid); if old_mail.is_some() && old_mail != new_mail { - if let Some((uid, _modseq, flags)) = new_mail { + if let Some((uid, modseq, flags)) = new_mail { + let mut items = vec![ + MessageDataItem::Flags( + flags.iter().filter_map(|f| flags::from_str(f)).collect(), + ), + ]; + + if params.with_uid { + items.push(MessageDataItem::Uid(*uid)); + } + + if params.with_modseq { + items.push(MessageDataItem::ModSeq(*modseq)); + } + data.push(Body::Data(Data::Fetch { seq: NonZeroU32::try_from((i + 1) as u32).unwrap(), - items: vec![ - MessageDataItem::Uid(*uid), - MessageDataItem::Flags( - flags.iter().filter_map(|f| flags::from_str(f)).collect(), - ), - ] - .try_into()?, + items: items.try_into()?, })); } } @@ -149,17 +178,20 @@ impl MailboxView { &mut self, sequence_set: &SequenceSet, kind: &StoreType, - _response: &StoreResponse, + response: &StoreResponse, flags: &[Flag<'a>], + unchanged_since: Option, is_uid_store: &bool, - ) -> Result>> { + ) -> Result<(Vec>, Vec)> { self.internal.sync().await?; let flags = flags.iter().map(|x| x.to_string()).collect::>(); let idx = self.index()?; - let mails = idx.fetch(sequence_set, *is_uid_store)?; - for mi in mails.iter() { + let (editable, in_conflict) = idx + .fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?; + + for mi in editable.iter() { match kind { StoreType::Add => { self.internal.mailbox.add_flags(mi.uuid, &flags[..]).await?; @@ -173,8 +205,23 @@ impl MailboxView { } } - // @TODO: handle _response - self.update().await + let silence = match response { + StoreResponse::Answer => HashSet::new(), + StoreResponse::Silent => editable.iter().map(|midx| midx.uuid).collect(), + }; + + let conflict_id_or_uid = match is_uid_store { + true => in_conflict.into_iter().map(|midx| midx.uid).collect(), + _ => in_conflict.into_iter().map(|midx| midx.i).collect(), + }; + + let summary = self.update(UpdateParameters { + with_uid: *is_uid_store, + with_modseq: unchanged_since.is_some(), + silence, + }).await?; + + Ok((summary, conflict_id_or_uid)) } pub async fn expunge(&mut self) -> Result>> { @@ -192,7 +239,7 @@ impl MailboxView { self.internal.mailbox.delete(msg).await?; } - self.update().await + self.update(UpdateParameters::default()).await } pub async fn copy( @@ -247,7 +294,10 @@ impl MailboxView { ret.push((mi.uid, dest_uid)); } - let update = self.update().await?; + let update = self.update(UpdateParameters { + with_uid: *is_uid_copy, + ..UpdateParameters::default() + }).await?; Ok((to_state.uidvalidity, ret, update)) } @@ -258,6 +308,7 @@ impl MailboxView { &self, sequence_set: &SequenceSet, ap: &AttributesProxy, + changed_since: Option, is_uid_fetch: &bool, ) -> Result>> { // [1/6] Pre-compute data @@ -270,7 +321,11 @@ impl MailboxView { }; tracing::debug!("Query scope {:?}", query_scope); let idx = self.index()?; - let mail_idx_list = idx.fetch(sequence_set, *is_uid_fetch)?; + let mail_idx_list = idx.fetch_changed_since( + sequence_set, + changed_since, + *is_uid_fetch + )?; // [2/6] Fetch the emails let uuids = mail_idx_list -- cgit v1.2.3