aboutsummaryrefslogtreecommitdiff
path: root/src/mail
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2022-06-27 16:56:20 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2022-06-27 16:56:20 +0200
commit390bad0ec451a571e119903054b581a9d9a00cbe (patch)
tree467fdf3ade0325b8c69ea8350a2f15fa181d8981 /src/mail
parentd3f8a6627ca13a020bac9936d1f40a18239b6d6d (diff)
downloadaerogramme-390bad0ec451a571e119903054b581a9d9a00cbe.tar.gz
aerogramme-390bad0ec451a571e119903054b581a9d9a00cbe.zip
Refactor files in a "mail" crate
Diffstat (limited to 'src/mail')
-rw-r--r--src/mail/mail_ident.rs95
-rw-r--r--src/mail/mod.rs130
-rw-r--r--src/mail/uidindex.rs345
3 files changed, 570 insertions, 0 deletions
diff --git a/src/mail/mail_ident.rs b/src/mail/mail_ident.rs
new file mode 100644
index 0000000..07e053a
--- /dev/null
+++ b/src/mail/mail_ident.rs
@@ -0,0 +1,95 @@
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+
+use lazy_static::lazy_static;
+use rand::prelude::*;
+use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::time::now_msec;
+
+/// An internal Mail Identifier is composed of two components:
+/// - a process identifier, 128 bits, itself composed of:
+/// - the timestamp of when the process started, 64 bits
+/// - a 64-bit random number
+/// - a sequence number, 64 bits
+/// They are not part of the protocol but an internal representation
+/// required by Mailrage/Aerogramme.
+/// Their main property is to be unique without having to rely
+/// on synchronization between IMAP processes.
+#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)]
+pub struct MailIdent(pub [u8; 24]);
+
+struct IdentGenerator {
+ pid: u128,
+ sn: AtomicU64,
+}
+
+impl IdentGenerator {
+ fn new() -> Self {
+ let time = now_msec() as u128;
+ let rand = thread_rng().gen::<u64>() as u128;
+ Self {
+ pid: (time << 64) | rand,
+ sn: AtomicU64::new(0),
+ }
+ }
+
+ fn gen(&self) -> MailIdent {
+ let sn = self.sn.fetch_add(1, Ordering::Relaxed);
+ let mut res = [0u8; 24];
+ res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid));
+ res[16..24].copy_from_slice(&u64::to_be_bytes(sn));
+ MailIdent(res)
+ }
+}
+
+lazy_static! {
+ static ref GENERATOR: IdentGenerator = IdentGenerator::new();
+}
+
+pub fn gen_ident() -> MailIdent {
+ GENERATOR.gen()
+}
+
+// -- serde --
+
+impl<'de> Deserialize<'de> for MailIdent {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let v = String::deserialize(d)?;
+ MailIdent::from_str(&v).map_err(D::Error::custom)
+ }
+}
+
+impl Serialize for MailIdent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&self.to_string())
+ }
+}
+
+impl ToString for MailIdent {
+ fn to_string(&self) -> String {
+ hex::encode(self.0)
+ }
+}
+
+impl FromStr for MailIdent {
+ type Err = &'static str;
+
+ fn from_str(s: &str) -> Result<MailIdent, &'static str> {
+ let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
+
+ if bytes.len() != 24 {
+ return Err("bad length");
+ }
+
+ let mut tmp = [0u8; 24];
+ tmp[..].copy_from_slice(&bytes);
+ Ok(MailIdent(tmp))
+ }
+}
diff --git a/src/mail/mod.rs b/src/mail/mod.rs
new file mode 100644
index 0000000..2edcaa7
--- /dev/null
+++ b/src/mail/mod.rs
@@ -0,0 +1,130 @@
+pub mod mail_ident;
+mod uidindex;
+
+use std::convert::TryFrom;
+
+use anyhow::Result;
+use k2v_client::K2vClient;
+use rusoto_s3::S3Client;
+
+use crate::bayou::Bayou;
+use crate::cryptoblob::Key;
+use crate::login::Credentials;
+use crate::mail::mail_ident::*;
+use crate::mail::uidindex::*;
+
+pub struct Summary<'a> {
+ pub validity: ImapUidvalidity,
+ pub next: ImapUid,
+ pub exists: u32,
+ pub recent: u32,
+ pub flags: FlagIter<'a>,
+ pub unseen: Option<&'a ImapUid>,
+}
+impl std::fmt::Display for Summary<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "uidvalidity: {}, uidnext: {}, exists: {}",
+ self.validity, self.next, self.exists
+ )
+ }
+}
+
+
+// Non standard but common flags:
+// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
+pub struct Mailbox {
+ bucket: String,
+ pub name: String,
+ key: Key,
+
+ k2v: K2vClient,
+ s3: S3Client,
+
+ uid_index: Bayou<UidIndex>,
+}
+
+// IDEA: We store a specific flag named $unseen.
+// If it is not present, we add the virtual flag \Seen
+impl Mailbox {
+ pub fn new(creds: &Credentials, name: String) -> Result<Self> {
+ let uid_index = Bayou::<UidIndex>::new(creds, name.clone())?;
+
+ Ok(Self {
+ bucket: creds.bucket().to_string(),
+ name,
+ key: creds.keys.master.clone(),
+ k2v: creds.k2v_client()?,
+ s3: creds.s3_client()?,
+ uid_index,
+ })
+ }
+
+ pub async fn summary(&mut self) -> Result<Summary> {
+ self.uid_index.sync().await?;
+ let state = self.uid_index.state();
+
+ let unseen = state.idx_by_flag.get(&"$unseen".to_string()).and_then(|os| os.get_min());
+ let recent = state.idx_by_flag.get(&"\\Recent".to_string()).map(|os| os.len()).unwrap_or(0);
+
+ return Ok(Summary {
+ validity: state.uidvalidity,
+ next: state.uidnext,
+ exists: u32::try_from(state.idx_by_uid.len())?,
+ recent: u32::try_from(recent)?,
+ flags: state.idx_by_flag.flags(),
+ unseen,
+ });
+ }
+
+ pub async fn test(&mut self) -> Result<()> {
+ self.uid_index.sync().await?;
+
+ dump(&self.uid_index);
+
+ let add_mail_op = self
+ .uid_index
+ .state()
+ .op_mail_add(gen_ident(), vec!["\\Unseen".into()]);
+ self.uid_index.push(add_mail_op).await?;
+
+ dump(&self.uid_index);
+
+ if self.uid_index.state().idx_by_uid.len() > 6 {
+ for i in 0..2 {
+ let (_, ident) = self
+ .uid_index
+ .state()
+ .idx_by_uid
+ .iter()
+ .skip(3 + i)
+ .next()
+ .unwrap();
+ let del_mail_op = self.uid_index.state().op_mail_del(*ident);
+ self.uid_index.push(del_mail_op).await?;
+
+ dump(&self.uid_index);
+ }
+ }
+
+ Ok(())
+ }
+}
+
+fn dump(uid_index: &Bayou<UidIndex>) {
+ let s = uid_index.state();
+ println!("---- MAILBOX STATE ----");
+ println!("UIDVALIDITY {}", s.uidvalidity);
+ println!("UIDNEXT {}", s.uidnext);
+ println!("INTERNALSEQ {}", s.internalseq);
+ for (uid, ident) in s.idx_by_uid.iter() {
+ println!(
+ "{} {} {}",
+ uid,
+ hex::encode(ident.0),
+ s.table.get(ident).cloned().unwrap().1.join(", ")
+ );
+ }
+ println!("");
+}
diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs
new file mode 100644
index 0000000..49dbba5
--- /dev/null
+++ b/src/mail/uidindex.rs
@@ -0,0 +1,345 @@
+use std::num::NonZeroU32;
+
+use im::{HashMap, HashSet, OrdMap, OrdSet};
+use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::bayou::*;
+use crate::mail::mail_ident::MailIdent;
+
+pub type ImapUid = NonZeroU32;
+pub type ImapUidvalidity = NonZeroU32;
+pub type Flag = String;
+
+#[derive(Clone)]
+/// A UidIndex handles the mutable part of a mailbox
+/// It is built by running the event log on it
+/// Each applied log generates a new UidIndex by cloning the previous one
+/// and applying the event. This is why we use immutable datastructures:
+/// they are cheap to clone.
+pub struct UidIndex {
+ // Source of trust
+ pub table: OrdMap<MailIdent, (ImapUid, Vec<Flag>)>,
+
+ // Indexes optimized for queries
+ pub idx_by_uid: OrdMap<ImapUid, MailIdent>,
+ pub idx_by_flag: FlagIndex,
+
+ // Counters
+ pub uidvalidity: ImapUidvalidity,
+ pub uidnext: ImapUid,
+ pub internalseq: ImapUid,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub enum UidIndexOp {
+ MailAdd(MailIdent, ImapUid, Vec<Flag>),
+ MailDel(MailIdent),
+ FlagAdd(MailIdent, Vec<Flag>),
+ FlagDel(MailIdent, Vec<Flag>),
+}
+
+impl UidIndex {
+ #[must_use]
+ pub fn op_mail_add(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::MailAdd(ident, self.internalseq, flags)
+ }
+
+ #[must_use]
+ pub fn op_mail_del(&self, ident: MailIdent) -> UidIndexOp {
+ UidIndexOp::MailDel(ident)
+ }
+
+ #[must_use]
+ pub fn op_flag_add(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagAdd(ident, flags)
+ }
+
+ #[must_use]
+ pub fn op_flag_del(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagDel(ident, flags)
+ }
+
+ // INTERNAL functions to keep state consistent
+
+ fn reg_email(&mut self, ident: MailIdent, uid: ImapUid, flags: &Vec<Flag>) {
+ // Insert the email in our table
+ self.table.insert(ident, (uid, flags.clone()));
+
+ // Update the indexes/caches
+ self.idx_by_uid.insert(uid, ident);
+ self.idx_by_flag.insert(uid, flags);
+ }
+
+ fn unreg_email(&mut self, ident: &MailIdent) {
+ // We do nothing if the mail does not exist
+ let (uid, flags) = match self.table.get(ident) {
+ Some(v) => v,
+ None => return,
+ };
+
+ // Delete all cache entries
+ self.idx_by_uid.remove(uid);
+ self.idx_by_flag.remove(*uid, flags);
+
+ // Remove from source of trust
+ self.table.remove(ident);
+ }
+}
+
+impl Default for UidIndex {
+ fn default() -> Self {
+ Self {
+ table: OrdMap::new(),
+ idx_by_uid: OrdMap::new(),
+ idx_by_flag: FlagIndex::new(),
+ uidvalidity: NonZeroU32::new(1).unwrap(),
+ uidnext: NonZeroU32::new(1).unwrap(),
+ internalseq: NonZeroU32::new(1).unwrap(),
+ }
+ }
+}
+
+impl BayouState for UidIndex {
+ type Op = UidIndexOp;
+
+ fn apply(&self, op: &UidIndexOp) -> Self {
+ let mut new = self.clone();
+ match op {
+ UidIndexOp::MailAdd(ident, uid, flags) => {
+ // Change UIDValidity if there is a conflict
+ if *uid < new.internalseq {
+ new.uidvalidity =
+ NonZeroU32::new(new.uidvalidity.get() + new.internalseq.get() - uid.get())
+ .unwrap();
+ }
+
+ // Assign the real uid of the email
+ let new_uid = new.internalseq;
+
+ // Delete the previous entry if any.
+ // Our proof has no assumption on `ident` uniqueness,
+ // so we must handle this case even it is very unlikely
+ // In this case, we overwrite the email.
+ // Note: assigning a new UID is mandatory.
+ new.unreg_email(ident);
+
+ // We record our email and update ou caches
+ new.reg_email(*ident, new_uid, flags);
+
+ // Update counters
+ new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
+ new.uidnext = new.internalseq;
+ }
+ UidIndexOp::MailDel(ident) => {
+ // If the email is known locally, we remove its references in all our indexes
+ new.unreg_email(ident);
+
+ // We update the counter
+ new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
+ }
+ UidIndexOp::FlagAdd(ident, new_flags) => {
+ if let Some((uid, existing_flags)) = new.table.get_mut(ident) {
+ // Add flags to the source of trust and the cache
+ let mut to_add: Vec<Flag> = new_flags
+ .iter()
+ .filter(|f| !existing_flags.contains(f))
+ .cloned()
+ .collect();
+ new.idx_by_flag.insert(*uid, &to_add);
+ existing_flags.append(&mut to_add);
+ }
+ }
+ UidIndexOp::FlagDel(ident, rm_flags) => {
+ if let Some((uid, existing_flags)) = new.table.get_mut(ident) {
+ // Remove flags from the source of trust and the cache
+ existing_flags.retain(|x| !rm_flags.contains(x));
+ new.idx_by_flag.remove(*uid, rm_flags);
+ }
+ }
+ }
+ new
+ }
+}
+
+// ---- FlagIndex implementation ----
+#[derive(Clone)]
+pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>);
+pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet<ImapUid>>;
+
+impl FlagIndex {
+ fn new() -> Self {
+ Self(HashMap::new())
+ }
+ fn insert(&mut self, uid: ImapUid, flags: &Vec<Flag>) {
+ flags.iter().for_each(|flag| {
+ self.0
+ .entry(flag.clone())
+ .or_insert(OrdSet::new())
+ .insert(uid);
+ });
+ }
+ fn remove(&mut self, uid: ImapUid, flags: &Vec<Flag>) -> () {
+ flags.iter().for_each(|flag| {
+ self.0.get_mut(flag).and_then(|set| set.remove(&uid));
+ });
+ }
+
+ pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> {
+ self.0.get(f)
+ }
+
+ pub fn flags(&self) -> FlagIter {
+ self.0.keys()
+ }
+}
+
+// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ----
+
+#[derive(Serialize, Deserialize)]
+struct UidIndexSerializedRepr {
+ mails: Vec<(ImapUid, MailIdent, Vec<Flag>)>,
+ uidvalidity: ImapUidvalidity,
+ uidnext: ImapUid,
+ internalseq: ImapUid,
+}
+
+impl<'de> Deserialize<'de> for UidIndex {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?;
+
+ let mut uidindex = UidIndex {
+ table: OrdMap::new(),
+ idx_by_uid: OrdMap::new(),
+ idx_by_flag: FlagIndex::new(),
+ uidvalidity: val.uidvalidity,
+ uidnext: val.uidnext,
+ internalseq: val.internalseq,
+ };
+
+ val.mails
+ .iter()
+ .for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f));
+
+ Ok(uidindex)
+ }
+}
+
+impl Serialize for UidIndex {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let mut mails = vec![];
+ for (ident, (uid, flags)) in self.table.iter() {
+ mails.push((*uid, *ident, flags.clone()));
+ }
+
+ let val = UidIndexSerializedRepr {
+ mails,
+ uidvalidity: self.uidvalidity,
+ uidnext: self.uidnext,
+ internalseq: self.internalseq,
+ };
+
+ val.serialize(serializer)
+ }
+}
+
+// ---- TESTS ----
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_uidindex() {
+ let mut state = UidIndex::default();
+
+ // Add message 1
+ {
+ let m = MailIdent([0x01; 24]);
+ let f = vec!["\\Recent".to_string(), "\\Archive".to_string()];
+ let ev = state.op_mail_add(m, f);
+ state = state.apply(&ev);
+
+ // Early checks
+ assert_eq!(state.table.len(), 1);
+ let (uid, flags) = state.table.get(&m).unwrap();
+ assert_eq!(*uid, 1);
+ assert_eq!(flags.len(), 2);
+ let ident = state.idx_by_uid.get(&1).unwrap();
+ assert_eq!(&m, ident);
+ let recent = state.idx_by_flag.0.get("\\Recent").unwrap();
+ assert_eq!(recent.len(), 1);
+ assert_eq!(recent.iter().next().unwrap(), &1);
+ assert_eq!(state.uidnext, 2);
+ assert_eq!(state.uidvalidity, 1);
+ }
+
+ // Add message 2
+ {
+ let m = MailIdent([0x02; 24]);
+ let f = vec!["\\Seen".to_string(), "\\Archive".to_string()];
+ let ev = state.op_mail_add(m, f);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ }
+
+ // Add flags to message 1
+ {
+ let m = MailIdent([0x01; 24]);
+ let f = vec!["Important".to_string(), "$cl_1".to_string()];
+ let ev = state.op_flag_add(m, f);
+ state = state.apply(&ev);
+ }
+
+ // Delete flags from message 1
+ {
+ let m = MailIdent([0x01; 24]);
+ let f = vec!["\\Recent".to_string()];
+ let ev = state.op_flag_del(m, f);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ }
+
+ // Delete message 2
+ {
+ let m = MailIdent([0x02; 24]);
+ let ev = state.op_mail_del(m);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 1);
+ }
+
+ // Add a message 3 concurrent to message 1 (trigger a uid validity change)
+ {
+ let m = MailIdent([0x03; 24]);
+ let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
+ let ev = UidIndexOp::MailAdd(m, 1, f);
+ state = state.apply(&ev);
+ }
+
+ // Checks
+ {
+ assert_eq!(state.table.len(), 2);
+ assert!(state.uidvalidity > 1);
+
+ let (last_uid, ident) = state.idx_by_uid.get_max().unwrap();
+ assert_eq!(ident, &MailIdent([0x03; 24]));
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ let mut iter = archive.iter();
+ assert_eq!(iter.next().unwrap(), &1);
+ assert_eq!(iter.next().unwrap(), last_uid);
+ }
+ }
+}