From 390bad0ec451a571e119903054b581a9d9a00cbe Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 27 Jun 2022 16:56:20 +0200 Subject: Refactor files in a "mail" crate --- src/imap/command/authenticated.rs | 2 +- src/imap/command/selected.rs | 2 +- src/imap/flow.rs | 2 +- src/lmtp.rs | 2 +- src/mail/mail_ident.rs | 95 +++++++++++ src/mail/mod.rs | 130 ++++++++++++++ src/mail/uidindex.rs | 345 ++++++++++++++++++++++++++++++++++++++ src/mail_ident.rs | 95 ----------- src/mailbox.rs | 127 -------------- src/main.rs | 4 +- src/uidindex.rs | 345 -------------------------------------- 11 files changed, 575 insertions(+), 574 deletions(-) create mode 100644 src/mail/mail_ident.rs create mode 100644 src/mail/mod.rs create mode 100644 src/mail/uidindex.rs delete mode 100644 src/mail_ident.rs delete mode 100644 src/mailbox.rs delete mode 100644 src/uidindex.rs (limited to 'src') diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 5924f53..0b1f01e 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -9,7 +9,7 @@ use imap_codec::types::response::{Code, Data, Response as ImapRes, Status}; use crate::imap::command::anonymous; use crate::imap::flow; use crate::imap::session::InnerContext; -use crate::mailbox::Mailbox; +use crate::mail::Mailbox; const DEFAULT_FLAGS: [Flag; 5] = [ Flag::Seen, diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index fb6a75d..e44bf36 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -9,7 +9,7 @@ use imap_codec::types::sequence::SequenceSet; use crate::imap::command::authenticated; use crate::imap::flow; use crate::imap::session::InnerContext; -use crate::mailbox::Mailbox; +use crate::mail::Mailbox; pub async fn dispatch<'a>( inner: InnerContext<'a>, diff --git a/src/imap/flow.rs b/src/imap/flow.rs index f0ec7d1..bd4c484 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -2,7 +2,7 @@ use std::error::Error as StdError; use std::fmt; use crate::login::Credentials; -use crate::mailbox::Mailbox; +use crate::mail::Mailbox; pub struct User { pub name: String, diff --git a/src/lmtp.rs b/src/lmtp.rs index 55da3ee..a0dafa5 100644 --- a/src/lmtp.rs +++ b/src/lmtp.rs @@ -20,7 +20,7 @@ use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Pro use crate::config::*; use crate::cryptoblob::*; use crate::login::*; -use crate::mail_ident::*; +use crate::mail::mail_ident::*; pub struct LmtpServer { bind_addr: SocketAddr, diff --git a/src/mail/mail_ident.rs b/src/mail/mail_ident.rs new file mode 100644 index 0000000..07e053a --- /dev/null +++ b/src/mail/mail_ident.rs @@ -0,0 +1,95 @@ +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; + +use lazy_static::lazy_static; +use rand::prelude::*; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use crate::time::now_msec; + +/// An internal Mail Identifier is composed of two components: +/// - a process identifier, 128 bits, itself composed of: +/// - the timestamp of when the process started, 64 bits +/// - a 64-bit random number +/// - a sequence number, 64 bits +/// They are not part of the protocol but an internal representation +/// required by Mailrage/Aerogramme. +/// Their main property is to be unique without having to rely +/// on synchronization between IMAP processes. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)] +pub struct MailIdent(pub [u8; 24]); + +struct IdentGenerator { + pid: u128, + sn: AtomicU64, +} + +impl IdentGenerator { + fn new() -> Self { + let time = now_msec() as u128; + let rand = thread_rng().gen::() as u128; + Self { + pid: (time << 64) | rand, + sn: AtomicU64::new(0), + } + } + + fn gen(&self) -> MailIdent { + let sn = self.sn.fetch_add(1, Ordering::Relaxed); + let mut res = [0u8; 24]; + res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); + res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); + MailIdent(res) + } +} + +lazy_static! { + static ref GENERATOR: IdentGenerator = IdentGenerator::new(); +} + +pub fn gen_ident() -> MailIdent { + GENERATOR.gen() +} + +// -- serde -- + +impl<'de> Deserialize<'de> for MailIdent { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + MailIdent::from_str(&v).map_err(D::Error::custom) + } +} + +impl Serialize for MailIdent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl ToString for MailIdent { + fn to_string(&self) -> String { + hex::encode(self.0) + } +} + +impl FromStr for MailIdent { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + + if bytes.len() != 24 { + return Err("bad length"); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(MailIdent(tmp)) + } +} diff --git a/src/mail/mod.rs b/src/mail/mod.rs new file mode 100644 index 0000000..2edcaa7 --- /dev/null +++ b/src/mail/mod.rs @@ -0,0 +1,130 @@ +pub mod mail_ident; +mod uidindex; + +use std::convert::TryFrom; + +use anyhow::Result; +use k2v_client::K2vClient; +use rusoto_s3::S3Client; + +use crate::bayou::Bayou; +use crate::cryptoblob::Key; +use crate::login::Credentials; +use crate::mail::mail_ident::*; +use crate::mail::uidindex::*; + +pub struct Summary<'a> { + pub validity: ImapUidvalidity, + pub next: ImapUid, + pub exists: u32, + pub recent: u32, + pub flags: FlagIter<'a>, + pub unseen: Option<&'a ImapUid>, +} +impl std::fmt::Display for Summary<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "uidvalidity: {}, uidnext: {}, exists: {}", + self.validity, self.next, self.exists + ) + } +} + + +// Non standard but common flags: +// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml +pub struct Mailbox { + bucket: String, + pub name: String, + key: Key, + + k2v: K2vClient, + s3: S3Client, + + uid_index: Bayou, +} + +// IDEA: We store a specific flag named $unseen. +// If it is not present, we add the virtual flag \Seen +impl Mailbox { + pub fn new(creds: &Credentials, name: String) -> Result { + let uid_index = Bayou::::new(creds, name.clone())?; + + Ok(Self { + bucket: creds.bucket().to_string(), + name, + key: creds.keys.master.clone(), + k2v: creds.k2v_client()?, + s3: creds.s3_client()?, + uid_index, + }) + } + + pub async fn summary(&mut self) -> Result { + self.uid_index.sync().await?; + let state = self.uid_index.state(); + + let unseen = state.idx_by_flag.get(&"$unseen".to_string()).and_then(|os| os.get_min()); + let recent = state.idx_by_flag.get(&"\\Recent".to_string()).map(|os| os.len()).unwrap_or(0); + + return Ok(Summary { + validity: state.uidvalidity, + next: state.uidnext, + exists: u32::try_from(state.idx_by_uid.len())?, + recent: u32::try_from(recent)?, + flags: state.idx_by_flag.flags(), + unseen, + }); + } + + pub async fn test(&mut self) -> Result<()> { + self.uid_index.sync().await?; + + dump(&self.uid_index); + + let add_mail_op = self + .uid_index + .state() + .op_mail_add(gen_ident(), vec!["\\Unseen".into()]); + self.uid_index.push(add_mail_op).await?; + + dump(&self.uid_index); + + if self.uid_index.state().idx_by_uid.len() > 6 { + for i in 0..2 { + let (_, ident) = self + .uid_index + .state() + .idx_by_uid + .iter() + .skip(3 + i) + .next() + .unwrap(); + let del_mail_op = self.uid_index.state().op_mail_del(*ident); + self.uid_index.push(del_mail_op).await?; + + dump(&self.uid_index); + } + } + + Ok(()) + } +} + +fn dump(uid_index: &Bayou) { + let s = uid_index.state(); + println!("---- MAILBOX STATE ----"); + println!("UIDVALIDITY {}", s.uidvalidity); + println!("UIDNEXT {}", s.uidnext); + println!("INTERNALSEQ {}", s.internalseq); + for (uid, ident) in s.idx_by_uid.iter() { + println!( + "{} {} {}", + uid, + hex::encode(ident.0), + s.table.get(ident).cloned().unwrap().1.join(", ") + ); + } + println!(""); +} diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs new file mode 100644 index 0000000..49dbba5 --- /dev/null +++ b/src/mail/uidindex.rs @@ -0,0 +1,345 @@ +use std::num::NonZeroU32; + +use im::{HashMap, HashSet, OrdMap, OrdSet}; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use crate::bayou::*; +use crate::mail::mail_ident::MailIdent; + +pub type ImapUid = NonZeroU32; +pub type ImapUidvalidity = NonZeroU32; +pub type Flag = String; + +#[derive(Clone)] +/// A UidIndex handles the mutable part of a mailbox +/// It is built by running the event log on it +/// Each applied log generates a new UidIndex by cloning the previous one +/// and applying the event. This is why we use immutable datastructures: +/// they are cheap to clone. +pub struct UidIndex { + // Source of trust + pub table: OrdMap)>, + + // Indexes optimized for queries + pub idx_by_uid: OrdMap, + pub idx_by_flag: FlagIndex, + + // Counters + pub uidvalidity: ImapUidvalidity, + pub uidnext: ImapUid, + pub internalseq: ImapUid, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum UidIndexOp { + MailAdd(MailIdent, ImapUid, Vec), + MailDel(MailIdent), + FlagAdd(MailIdent, Vec), + FlagDel(MailIdent, Vec), +} + +impl UidIndex { + #[must_use] + pub fn op_mail_add(&self, ident: MailIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::MailAdd(ident, self.internalseq, flags) + } + + #[must_use] + pub fn op_mail_del(&self, ident: MailIdent) -> UidIndexOp { + UidIndexOp::MailDel(ident) + } + + #[must_use] + pub fn op_flag_add(&self, ident: MailIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagAdd(ident, flags) + } + + #[must_use] + pub fn op_flag_del(&self, ident: MailIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagDel(ident, flags) + } + + // INTERNAL functions to keep state consistent + + fn reg_email(&mut self, ident: MailIdent, uid: ImapUid, flags: &Vec) { + // Insert the email in our table + self.table.insert(ident, (uid, flags.clone())); + + // Update the indexes/caches + self.idx_by_uid.insert(uid, ident); + self.idx_by_flag.insert(uid, flags); + } + + fn unreg_email(&mut self, ident: &MailIdent) { + // We do nothing if the mail does not exist + let (uid, flags) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + + // Delete all cache entries + self.idx_by_uid.remove(uid); + self.idx_by_flag.remove(*uid, flags); + + // Remove from source of trust + self.table.remove(ident); + } +} + +impl Default for UidIndex { + fn default() -> Self { + Self { + table: OrdMap::new(), + idx_by_uid: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + uidvalidity: NonZeroU32::new(1).unwrap(), + uidnext: NonZeroU32::new(1).unwrap(), + internalseq: NonZeroU32::new(1).unwrap(), + } + } +} + +impl BayouState for UidIndex { + type Op = UidIndexOp; + + fn apply(&self, op: &UidIndexOp) -> Self { + let mut new = self.clone(); + match op { + UidIndexOp::MailAdd(ident, uid, flags) => { + // Change UIDValidity if there is a conflict + if *uid < new.internalseq { + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + new.internalseq.get() - uid.get()) + .unwrap(); + } + + // Assign the real uid of the email + let new_uid = new.internalseq; + + // Delete the previous entry if any. + // Our proof has no assumption on `ident` uniqueness, + // so we must handle this case even it is very unlikely + // In this case, we overwrite the email. + // Note: assigning a new UID is mandatory. + new.unreg_email(ident); + + // We record our email and update ou caches + new.reg_email(*ident, new_uid, flags); + + // Update counters + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + new.uidnext = new.internalseq; + } + UidIndexOp::MailDel(ident) => { + // If the email is known locally, we remove its references in all our indexes + new.unreg_email(ident); + + // We update the counter + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + } + UidIndexOp::FlagAdd(ident, new_flags) => { + if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + // Add flags to the source of trust and the cache + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + new.idx_by_flag.insert(*uid, &to_add); + existing_flags.append(&mut to_add); + } + } + UidIndexOp::FlagDel(ident, rm_flags) => { + if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + // Remove flags from the source of trust and the cache + existing_flags.retain(|x| !rm_flags.contains(x)); + new.idx_by_flag.remove(*uid, rm_flags); + } + } + } + new + } +} + +// ---- FlagIndex implementation ---- +#[derive(Clone)] +pub struct FlagIndex(HashMap>); +pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>; + +impl FlagIndex { + fn new() -> Self { + Self(HashMap::new()) + } + fn insert(&mut self, uid: ImapUid, flags: &Vec) { + flags.iter().for_each(|flag| { + self.0 + .entry(flag.clone()) + .or_insert(OrdSet::new()) + .insert(uid); + }); + } + fn remove(&mut self, uid: ImapUid, flags: &Vec) -> () { + flags.iter().for_each(|flag| { + self.0.get_mut(flag).and_then(|set| set.remove(&uid)); + }); + } + + pub fn get(&self, f: &Flag) -> Option<&OrdSet> { + self.0.get(f) + } + + pub fn flags(&self) -> FlagIter { + self.0.keys() + } +} + +// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- + +#[derive(Serialize, Deserialize)] +struct UidIndexSerializedRepr { + mails: Vec<(ImapUid, MailIdent, Vec)>, + uidvalidity: ImapUidvalidity, + uidnext: ImapUid, + internalseq: ImapUid, +} + +impl<'de> Deserialize<'de> for UidIndex { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; + + let mut uidindex = UidIndex { + table: OrdMap::new(), + idx_by_uid: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + uidvalidity: val.uidvalidity, + uidnext: val.uidnext, + internalseq: val.internalseq, + }; + + val.mails + .iter() + .for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f)); + + Ok(uidindex) + } +} + +impl Serialize for UidIndex { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut mails = vec![]; + for (ident, (uid, flags)) in self.table.iter() { + mails.push((*uid, *ident, flags.clone())); + } + + let val = UidIndexSerializedRepr { + mails, + uidvalidity: self.uidvalidity, + uidnext: self.uidnext, + internalseq: self.internalseq, + }; + + val.serialize(serializer) + } +} + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uidindex() { + let mut state = UidIndex::default(); + + // Add message 1 + { + let m = MailIdent([0x01; 24]); + let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + // Early checks + assert_eq!(state.table.len(), 1); + let (uid, flags) = state.table.get(&m).unwrap(); + assert_eq!(*uid, 1); + assert_eq!(flags.len(), 2); + let ident = state.idx_by_uid.get(&1).unwrap(); + assert_eq!(&m, ident); + let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); + assert_eq!(recent.len(), 1); + assert_eq!(recent.iter().next().unwrap(), &1); + assert_eq!(state.uidnext, 2); + assert_eq!(state.uidvalidity, 1); + } + + // Add message 2 + { + let m = MailIdent([0x02; 24]); + let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Add flags to message 1 + { + let m = MailIdent([0x01; 24]); + let f = vec!["Important".to_string(), "$cl_1".to_string()]; + let ev = state.op_flag_add(m, f); + state = state.apply(&ev); + } + + // Delete flags from message 1 + { + let m = MailIdent([0x01; 24]); + let f = vec!["\\Recent".to_string()]; + let ev = state.op_flag_del(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Delete message 2 + { + let m = MailIdent([0x02; 24]); + let ev = state.op_mail_del(m); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 1); + } + + // Add a message 3 concurrent to message 1 (trigger a uid validity change) + { + let m = MailIdent([0x03; 24]); + let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; + let ev = UidIndexOp::MailAdd(m, 1, f); + state = state.apply(&ev); + } + + // Checks + { + assert_eq!(state.table.len(), 2); + assert!(state.uidvalidity > 1); + + let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); + assert_eq!(ident, &MailIdent([0x03; 24])); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + let mut iter = archive.iter(); + assert_eq!(iter.next().unwrap(), &1); + assert_eq!(iter.next().unwrap(), last_uid); + } + } +} diff --git a/src/mail_ident.rs b/src/mail_ident.rs deleted file mode 100644 index 07e053a..0000000 --- a/src/mail_ident.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; - -use lazy_static::lazy_static; -use rand::prelude::*; -use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; - -use crate::time::now_msec; - -/// An internal Mail Identifier is composed of two components: -/// - a process identifier, 128 bits, itself composed of: -/// - the timestamp of when the process started, 64 bits -/// - a 64-bit random number -/// - a sequence number, 64 bits -/// They are not part of the protocol but an internal representation -/// required by Mailrage/Aerogramme. -/// Their main property is to be unique without having to rely -/// on synchronization between IMAP processes. -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)] -pub struct MailIdent(pub [u8; 24]); - -struct IdentGenerator { - pid: u128, - sn: AtomicU64, -} - -impl IdentGenerator { - fn new() -> Self { - let time = now_msec() as u128; - let rand = thread_rng().gen::() as u128; - Self { - pid: (time << 64) | rand, - sn: AtomicU64::new(0), - } - } - - fn gen(&self) -> MailIdent { - let sn = self.sn.fetch_add(1, Ordering::Relaxed); - let mut res = [0u8; 24]; - res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); - res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); - MailIdent(res) - } -} - -lazy_static! { - static ref GENERATOR: IdentGenerator = IdentGenerator::new(); -} - -pub fn gen_ident() -> MailIdent { - GENERATOR.gen() -} - -// -- serde -- - -impl<'de> Deserialize<'de> for MailIdent { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let v = String::deserialize(d)?; - MailIdent::from_str(&v).map_err(D::Error::custom) - } -} - -impl Serialize for MailIdent { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&self.to_string()) - } -} - -impl ToString for MailIdent { - fn to_string(&self) -> String { - hex::encode(self.0) - } -} - -impl FromStr for MailIdent { - type Err = &'static str; - - fn from_str(s: &str) -> Result { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - - if bytes.len() != 24 { - return Err("bad length"); - } - - let mut tmp = [0u8; 24]; - tmp[..].copy_from_slice(&bytes); - Ok(MailIdent(tmp)) - } -} diff --git a/src/mailbox.rs b/src/mailbox.rs deleted file mode 100644 index 7945cba..0000000 --- a/src/mailbox.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::convert::TryFrom; - -use anyhow::Result; -use k2v_client::K2vClient; -use rusoto_s3::S3Client; - -use crate::bayou::Bayou; -use crate::cryptoblob::Key; -use crate::login::Credentials; -use crate::mail_ident::*; -use crate::uidindex::*; - -pub struct Summary<'a> { - pub validity: ImapUidvalidity, - pub next: ImapUid, - pub exists: u32, - pub recent: u32, - pub flags: FlagIter<'a>, - pub unseen: Option<&'a ImapUid>, -} -impl std::fmt::Display for Summary<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "uidvalidity: {}, uidnext: {}, exists: {}", - self.validity, self.next, self.exists - ) - } -} - - -// Non standard but common flags: -// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml -pub struct Mailbox { - bucket: String, - pub name: String, - key: Key, - - k2v: K2vClient, - s3: S3Client, - - uid_index: Bayou, -} - -// IDEA: We store a specific flag named $unseen. -// If it is not present, we add the virtual flag \Seen -impl Mailbox { - pub fn new(creds: &Credentials, name: String) -> Result { - let uid_index = Bayou::::new(creds, name.clone())?; - - Ok(Self { - bucket: creds.bucket().to_string(), - name, - key: creds.keys.master.clone(), - k2v: creds.k2v_client()?, - s3: creds.s3_client()?, - uid_index, - }) - } - - pub async fn summary(&mut self) -> Result { - self.uid_index.sync().await?; - let state = self.uid_index.state(); - - let unseen = state.idx_by_flag.get(&"$unseen".to_string()).and_then(|os| os.get_min()); - let recent = state.idx_by_flag.get(&"\\Recent".to_string()).map(|os| os.len()).unwrap_or(0); - - return Ok(Summary { - validity: state.uidvalidity, - next: state.uidnext, - exists: u32::try_from(state.idx_by_uid.len())?, - recent: u32::try_from(recent)?, - flags: state.idx_by_flag.flags(), - unseen, - }); - } - - pub async fn test(&mut self) -> Result<()> { - self.uid_index.sync().await?; - - dump(&self.uid_index); - - let add_mail_op = self - .uid_index - .state() - .op_mail_add(gen_ident(), vec!["\\Unseen".into()]); - self.uid_index.push(add_mail_op).await?; - - dump(&self.uid_index); - - if self.uid_index.state().idx_by_uid.len() > 6 { - for i in 0..2 { - let (_, ident) = self - .uid_index - .state() - .idx_by_uid - .iter() - .skip(3 + i) - .next() - .unwrap(); - let del_mail_op = self.uid_index.state().op_mail_del(*ident); - self.uid_index.push(del_mail_op).await?; - - dump(&self.uid_index); - } - } - - Ok(()) - } -} - -fn dump(uid_index: &Bayou) { - let s = uid_index.state(); - println!("---- MAILBOX STATE ----"); - println!("UIDVALIDITY {}", s.uidvalidity); - println!("UIDNEXT {}", s.uidnext); - println!("INTERNALSEQ {}", s.internalseq); - for (uid, ident) in s.idx_by_uid.iter() { - println!( - "{} {} {}", - uid, - hex::encode(ident.0), - s.table.get(ident).cloned().unwrap().1.join(", ") - ); - } - println!(""); -} diff --git a/src/main.rs b/src/main.rs index 9270817..e328fb1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,11 +4,9 @@ mod cryptoblob; mod imap; mod lmtp; mod login; -mod mail_ident; -mod mailbox; +mod mail; mod server; mod time; -mod uidindex; use std::path::PathBuf; diff --git a/src/uidindex.rs b/src/uidindex.rs deleted file mode 100644 index d78d8e4..0000000 --- a/src/uidindex.rs +++ /dev/null @@ -1,345 +0,0 @@ -use std::num::NonZeroU32; - -use im::{HashMap, HashSet, OrdMap, OrdSet}; -use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; - -use crate::bayou::*; -use crate::mail_ident::MailIdent; - -pub type ImapUid = NonZeroU32; -pub type ImapUidvalidity = NonZeroU32; -pub type Flag = String; - -#[derive(Clone)] -/// A UidIndex handles the mutable part of a mailbox -/// It is built by running the event log on it -/// Each applied log generates a new UidIndex by cloning the previous one -/// and applying the event. This is why we use immutable datastructures: -/// they are cheap to clone. -pub struct UidIndex { - // Source of trust - pub table: OrdMap)>, - - // Indexes optimized for queries - pub idx_by_uid: OrdMap, - pub idx_by_flag: FlagIndex, - - // Counters - pub uidvalidity: ImapUidvalidity, - pub uidnext: ImapUid, - pub internalseq: ImapUid, -} - -#[derive(Clone, Serialize, Deserialize, Debug)] -pub enum UidIndexOp { - MailAdd(MailIdent, ImapUid, Vec), - MailDel(MailIdent), - FlagAdd(MailIdent, Vec), - FlagDel(MailIdent, Vec), -} - -impl UidIndex { - #[must_use] - pub fn op_mail_add(&self, ident: MailIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::MailAdd(ident, self.internalseq, flags) - } - - #[must_use] - pub fn op_mail_del(&self, ident: MailIdent) -> UidIndexOp { - UidIndexOp::MailDel(ident) - } - - #[must_use] - pub fn op_flag_add(&self, ident: MailIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagAdd(ident, flags) - } - - #[must_use] - pub fn op_flag_del(&self, ident: MailIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagDel(ident, flags) - } - - // INTERNAL functions to keep state consistent - - fn reg_email(&mut self, ident: MailIdent, uid: ImapUid, flags: &Vec) { - // Insert the email in our table - self.table.insert(ident, (uid, flags.clone())); - - // Update the indexes/caches - self.idx_by_uid.insert(uid, ident); - self.idx_by_flag.insert(uid, flags); - } - - fn unreg_email(&mut self, ident: &MailIdent) { - // We do nothing if the mail does not exist - let (uid, flags) = match self.table.get(ident) { - Some(v) => v, - None => return, - }; - - // Delete all cache entries - self.idx_by_uid.remove(uid); - self.idx_by_flag.remove(*uid, flags); - - // Remove from source of trust - self.table.remove(ident); - } -} - -impl Default for UidIndex { - fn default() -> Self { - Self { - table: OrdMap::new(), - idx_by_uid: OrdMap::new(), - idx_by_flag: FlagIndex::new(), - uidvalidity: NonZeroU32::new(1).unwrap(), - uidnext: NonZeroU32::new(1).unwrap(), - internalseq: NonZeroU32::new(1).unwrap(), - } - } -} - -impl BayouState for UidIndex { - type Op = UidIndexOp; - - fn apply(&self, op: &UidIndexOp) -> Self { - let mut new = self.clone(); - match op { - UidIndexOp::MailAdd(ident, uid, flags) => { - // Change UIDValidity if there is a conflict - if *uid < new.internalseq { - new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + new.internalseq.get() - uid.get()) - .unwrap(); - } - - // Assign the real uid of the email - let new_uid = new.internalseq; - - // Delete the previous entry if any. - // Our proof has no assumption on `ident` uniqueness, - // so we must handle this case even it is very unlikely - // In this case, we overwrite the email. - // Note: assigning a new UID is mandatory. - new.unreg_email(ident); - - // We record our email and update ou caches - new.reg_email(*ident, new_uid, flags); - - // Update counters - new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); - new.uidnext = new.internalseq; - } - UidIndexOp::MailDel(ident) => { - // If the email is known locally, we remove its references in all our indexes - new.unreg_email(ident); - - // We update the counter - new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); - } - UidIndexOp::FlagAdd(ident, new_flags) => { - if let Some((uid, existing_flags)) = new.table.get_mut(ident) { - // Add flags to the source of trust and the cache - let mut to_add: Vec = new_flags - .iter() - .filter(|f| !existing_flags.contains(f)) - .cloned() - .collect(); - new.idx_by_flag.insert(*uid, &to_add); - existing_flags.append(&mut to_add); - } - } - UidIndexOp::FlagDel(ident, rm_flags) => { - if let Some((uid, existing_flags)) = new.table.get_mut(ident) { - // Remove flags from the source of trust and the cache - existing_flags.retain(|x| !rm_flags.contains(x)); - new.idx_by_flag.remove(*uid, rm_flags); - } - } - } - new - } -} - -// ---- FlagIndex implementation ---- -#[derive(Clone)] -pub struct FlagIndex(HashMap>); -pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>; - -impl FlagIndex { - fn new() -> Self { - Self(HashMap::new()) - } - fn insert(&mut self, uid: ImapUid, flags: &Vec) { - flags.iter().for_each(|flag| { - self.0 - .entry(flag.clone()) - .or_insert(OrdSet::new()) - .insert(uid); - }); - } - fn remove(&mut self, uid: ImapUid, flags: &Vec) -> () { - flags.iter().for_each(|flag| { - self.0.get_mut(flag).and_then(|set| set.remove(&uid)); - }); - } - - pub fn get(&self, f: &Flag) -> Option<&OrdSet> { - self.0.get(f) - } - - pub fn flags(&self) -> FlagIter { - self.0.keys() - } -} - -// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- - -#[derive(Serialize, Deserialize)] -struct UidIndexSerializedRepr { - mails: Vec<(ImapUid, MailIdent, Vec)>, - uidvalidity: ImapUidvalidity, - uidnext: ImapUid, - internalseq: ImapUid, -} - -impl<'de> Deserialize<'de> for UidIndex { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; - - let mut uidindex = UidIndex { - table: OrdMap::new(), - idx_by_uid: OrdMap::new(), - idx_by_flag: FlagIndex::new(), - uidvalidity: val.uidvalidity, - uidnext: val.uidnext, - internalseq: val.internalseq, - }; - - val.mails - .iter() - .for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f)); - - Ok(uidindex) - } -} - -impl Serialize for UidIndex { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut mails = vec![]; - for (ident, (uid, flags)) in self.table.iter() { - mails.push((*uid, *ident, flags.clone())); - } - - let val = UidIndexSerializedRepr { - mails, - uidvalidity: self.uidvalidity, - uidnext: self.uidnext, - internalseq: self.internalseq, - }; - - val.serialize(serializer) - } -} - -// ---- TESTS ---- - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_uidindex() { - let mut state = UidIndex::default(); - - // Add message 1 - { - let m = MailIdent([0x01; 24]); - let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; - let ev = state.op_mail_add(m, f); - state = state.apply(&ev); - - // Early checks - assert_eq!(state.table.len(), 1); - let (uid, flags) = state.table.get(&m).unwrap(); - assert_eq!(*uid, 1); - assert_eq!(flags.len(), 2); - let ident = state.idx_by_uid.get(&1).unwrap(); - assert_eq!(&m, ident); - let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); - assert_eq!(recent.len(), 1); - assert_eq!(recent.iter().next().unwrap(), &1); - assert_eq!(state.uidnext, 2); - assert_eq!(state.uidvalidity, 1); - } - - // Add message 2 - { - let m = MailIdent([0x02; 24]); - let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; - let ev = state.op_mail_add(m, f); - state = state.apply(&ev); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 2); - } - - // Add flags to message 1 - { - let m = MailIdent([0x01; 24]); - let f = vec!["Important".to_string(), "$cl_1".to_string()]; - let ev = state.op_flag_add(m, f); - state = state.apply(&ev); - } - - // Delete flags from message 1 - { - let m = MailIdent([0x01; 24]); - let f = vec!["\\Recent".to_string()]; - let ev = state.op_flag_del(m, f); - state = state.apply(&ev); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 2); - } - - // Delete message 2 - { - let m = MailIdent([0x02; 24]); - let ev = state.op_mail_del(m); - state = state.apply(&ev); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 1); - } - - // Add a message 3 concurrent to message 1 (trigger a uid validity change) - { - let m = MailIdent([0x03; 24]); - let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; - let ev = UidIndexOp::MailAdd(m, 1, f); - state = state.apply(&ev); - } - - // Checks - { - assert_eq!(state.table.len(), 2); - assert!(state.uidvalidity > 1); - - let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); - assert_eq!(ident, &MailIdent([0x03; 24])); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 2); - let mut iter = archive.iter(); - assert_eq!(iter.next().unwrap(), &1); - assert_eq!(iter.next().unwrap(), last_uid); - } - } -} -- cgit v1.2.3