From 1edf0b15ecaa73d55bb72c6f3c6e25d4f231f322 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 8 Mar 2024 08:43:28 +0100 Subject: Re-enable collections --- aero-collections/src/mail/uidindex.rs | 474 ++++++++++++++++++++++++++++++++++ 1 file changed, 474 insertions(+) create mode 100644 aero-collections/src/mail/uidindex.rs (limited to 'aero-collections/src/mail/uidindex.rs') diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs new file mode 100644 index 0000000..637a1ac --- /dev/null +++ b/aero-collections/src/mail/uidindex.rs @@ -0,0 +1,474 @@ +use std::num::{NonZeroU32, NonZeroU64}; + +use im::{HashMap, OrdMap, OrdSet}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use aero_bayou::*; +use crate::mail::unique_ident::UniqueIdent; + +pub type ModSeq = NonZeroU64; +pub type ImapUid = NonZeroU32; +pub type ImapUidvalidity = NonZeroU32; +pub type Flag = String; +pub type IndexEntry = (ImapUid, ModSeq, Vec); + +/// A UidIndex handles the mutable part of a mailbox +/// It is built by running the event log on it +/// Each applied log generates a new UidIndex by cloning the previous one +/// and applying the event. This is why we use immutable datastructures: +/// they are cheap to clone. +#[derive(Clone)] +pub struct UidIndex { + // Source of trust + pub table: OrdMap, + + // Indexes optimized for queries + pub idx_by_uid: OrdMap, + pub idx_by_modseq: OrdMap, + pub idx_by_flag: FlagIndex, + + // "Public" Counters + pub uidvalidity: ImapUidvalidity, + pub uidnext: ImapUid, + pub highestmodseq: ModSeq, + + // "Internal" Counters + pub internalseq: ImapUid, + pub internalmodseq: ModSeq, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum UidIndexOp { + MailAdd(UniqueIdent, ImapUid, ModSeq, Vec), + MailDel(UniqueIdent), + FlagAdd(UniqueIdent, ModSeq, Vec), + FlagDel(UniqueIdent, ModSeq, Vec), + FlagSet(UniqueIdent, ModSeq, Vec), + BumpUidvalidity(u32), +} + +impl UidIndex { + #[must_use] + pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp { + UidIndexOp::MailDel(ident) + } + + #[must_use] + pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagAdd(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagDel(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagSet(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp { + UidIndexOp::BumpUidvalidity(count) + } + + // INTERNAL functions to keep state consistent + + fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) { + // Insert the email in our table + self.table.insert(ident, (uid, modseq, flags.to_owned())); + + // Update the indexes/caches + self.idx_by_uid.insert(uid, ident); + self.idx_by_flag.insert(uid, flags); + self.idx_by_modseq.insert(modseq, ident); + } + + fn unreg_email(&mut self, ident: &UniqueIdent) { + // We do nothing if the mail does not exist + let (uid, modseq, flags) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + + // Delete all cache entries + self.idx_by_uid.remove(uid); + self.idx_by_flag.remove(*uid, flags); + self.idx_by_modseq.remove(modseq); + + // Remove from source of trust + self.table.remove(ident); + } +} + +impl Default for UidIndex { + fn default() -> Self { + Self { + table: OrdMap::new(), + + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + + uidvalidity: NonZeroU32::new(1).unwrap(), + uidnext: NonZeroU32::new(1).unwrap(), + highestmodseq: NonZeroU64::new(1).unwrap(), + + internalseq: NonZeroU32::new(1).unwrap(), + internalmodseq: NonZeroU64::new(1).unwrap(), + } + } +} + +impl BayouState for UidIndex { + type Op = UidIndexOp; + + fn apply(&self, op: &UidIndexOp) -> Self { + let mut new = self.clone(); + match op { + UidIndexOp::MailAdd(ident, uid, modseq, flags) => { + // Change UIDValidity if there is a UID conflict or a MODSEQ conflict + // @FIXME Need to prove that summing work + // The intuition: we increase the UIDValidity by the number of possible conflicts + if *uid < new.internalseq || *modseq < new.internalmodseq { + let bump_uid = new.internalseq.get() - uid.get(); + let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap(); + } + + // Assign the real uid of the email + let new_uid = new.internalseq; + + // Assign the real modseq of the email and its new flags + let new_modseq = new.internalmodseq; + + // Delete the previous entry if any. + // Our proof has no assumption on `ident` uniqueness, + // so we must handle this case even it is very unlikely + // In this case, we overwrite the email. + // Note: assigning a new UID is mandatory. + new.unreg_email(ident); + + // We record our email and update ou caches + new.reg_email(*ident, new_uid, new_modseq, flags); + + // Update counters + new.highestmodseq = new.internalmodseq; + + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + + new.uidnext = new.internalseq; + } + UidIndexOp::MailDel(ident) => { + // If the email is known locally, we remove its references in all our indexes + new.unreg_email(ident); + + // We update the counter + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + } + UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Add flags to the source of trust and the cache + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + new.idx_by_flag.insert(*uid, &to_add); + *email_modseq = new.internalmodseq; + new.idx_by_modseq.insert(new.internalmodseq, *ident); + existing_flags.append(&mut to_add); + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Remove flags from the source of trust and the cache + existing_flags.retain(|x| !rm_flags.contains(x)); + new.idx_by_flag.remove(*uid, rm_flags); + + // Register that email has been modified + new.idx_by_modseq.insert(new.internalmodseq, *ident); + *email_modseq = new.internalmodseq; + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Remove flags from the source of trust and the cache + let (keep_flags, rm_flags): (Vec, Vec) = existing_flags + .iter() + .cloned() + .partition(|x| new_flags.contains(x)); + *existing_flags = keep_flags; + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + existing_flags.append(&mut to_add); + new.idx_by_flag.remove(*uid, &rm_flags); + new.idx_by_flag.insert(*uid, &to_add); + + // Register that email has been modified + new.idx_by_modseq.insert(new.internalmodseq, *ident); + *email_modseq = new.internalmodseq; + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::BumpUidvalidity(count) => { + new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count) + .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); + } + } + new + } +} + +// ---- FlagIndex implementation ---- + +#[derive(Clone)] +pub struct FlagIndex(HashMap>); +pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>; + +impl FlagIndex { + fn new() -> Self { + Self(HashMap::new()) + } + fn insert(&mut self, uid: ImapUid, flags: &[Flag]) { + flags.iter().for_each(|flag| { + self.0 + .entry(flag.clone()) + .or_insert(OrdSet::new()) + .insert(uid); + }); + } + fn remove(&mut self, uid: ImapUid, flags: &[Flag]) { + for flag in flags.iter() { + if let Some(set) = self.0.get_mut(flag) { + set.remove(&uid); + if set.is_empty() { + self.0.remove(flag); + } + } + } + } + + pub fn get(&self, f: &Flag) -> Option<&OrdSet> { + self.0.get(f) + } + + pub fn flags(&self) -> FlagIter { + self.0.keys() + } +} + +// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- + +#[derive(Serialize, Deserialize)] +struct UidIndexSerializedRepr { + mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec)>, + + uidvalidity: ImapUidvalidity, + uidnext: ImapUid, + highestmodseq: ModSeq, + + internalseq: ImapUid, + internalmodseq: ModSeq, +} + +impl<'de> Deserialize<'de> for UidIndex { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; + + let mut uidindex = UidIndex { + table: OrdMap::new(), + + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + + uidvalidity: val.uidvalidity, + uidnext: val.uidnext, + highestmodseq: val.highestmodseq, + + internalseq: val.internalseq, + internalmodseq: val.internalmodseq, + }; + + val.mails + .iter() + .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags)); + + Ok(uidindex) + } +} + +impl Serialize for UidIndex { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut mails = vec![]; + for (ident, (uid, modseq, flags)) in self.table.iter() { + mails.push((*uid, *modseq, *ident, flags.clone())); + } + + let val = UidIndexSerializedRepr { + mails, + uidvalidity: self.uidvalidity, + uidnext: self.uidnext, + highestmodseq: self.highestmodseq, + internalseq: self.internalseq, + internalmodseq: self.internalmodseq, + }; + + val.serialize(serializer) + } +} + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uidindex() { + let mut state = UidIndex::default(); + + // Add message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + // Early checks + assert_eq!(state.table.len(), 1); + let (uid, modseq, flags) = state.table.get(&m).unwrap(); + assert_eq!(*uid, NonZeroU32::new(1).unwrap()); + assert_eq!(*modseq, NonZeroU64::new(1).unwrap()); + assert_eq!(flags.len(), 2); + let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); + assert_eq!(&m, ident); + let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); + assert_eq!(recent.len(), 1); + assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap()); + assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap()); + assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap()); + } + + // Add message 2 + { + let m = UniqueIdent([0x02; 24]); + let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Add flags to message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["Important".to_string(), "$cl_1".to_string()]; + let ev = state.op_flag_add(m, f); + state = state.apply(&ev); + } + + // Delete flags from message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["\\Recent".to_string()]; + let ev = state.op_flag_del(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Delete message 2 + { + let m = UniqueIdent([0x02; 24]); + let ev = state.op_mail_del(m); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 1); + } + + // Add a message 3 concurrent to message 1 (trigger a uid validity change) + { + let m = UniqueIdent([0x03; 24]); + let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; + let ev = UidIndexOp::MailAdd( + m, + NonZeroU32::new(1).unwrap(), + NonZeroU64::new(1).unwrap(), + f, + ); + state = state.apply(&ev); + } + + // Checks + { + assert_eq!(state.table.len(), 2); + assert!(state.uidvalidity > NonZeroU32::new(1).unwrap()); + + let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); + assert_eq!(ident, &UniqueIdent([0x03; 24])); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + let mut iter = archive.iter(); + assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap()); + assert_eq!(iter.next().unwrap(), last_uid); + } + } +} -- cgit v1.2.3