aboutsummaryrefslogtreecommitdiff
path: root/src/mail
diff options
context:
space:
mode:
Diffstat (limited to 'src/mail')
-rw-r--r--src/mail/incoming.rs445
-rw-r--r--src/mail/mailbox.rs524
-rw-r--r--src/mail/mod.rs27
-rw-r--r--src/mail/namespace.rs209
-rw-r--r--src/mail/query.rs137
-rw-r--r--src/mail/snapshot.rs60
-rw-r--r--src/mail/uidindex.rs474
-rw-r--r--src/mail/unique_ident.rs101
8 files changed, 0 insertions, 1977 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
deleted file mode 100644
index e2ad97d..0000000
--- a/src/mail/incoming.rs
+++ /dev/null
@@ -1,445 +0,0 @@
-//use std::collections::HashMap;
-use std::convert::TryFrom;
-
-use std::sync::{Arc, Weak};
-use std::time::Duration;
-
-use anyhow::{anyhow, bail, Result};
-use base64::Engine;
-use futures::{future::BoxFuture, FutureExt};
-//use tokio::io::AsyncReadExt;
-use tokio::sync::watch;
-use tracing::{debug, error, info, warn};
-
-use crate::cryptoblob;
-use crate::login::{Credentials, PublicCredentials};
-use crate::mail::mailbox::Mailbox;
-use crate::mail::uidindex::ImapUidvalidity;
-use crate::mail::unique_ident::*;
-use crate::user::User;
-use crate::mail::IMF;
-use crate::storage;
-use crate::timestamp::now_msec;
-
-const INCOMING_PK: &str = "incoming";
-const INCOMING_LOCK_SK: &str = "lock";
-const INCOMING_WATCH_SK: &str = "watch";
-
-const MESSAGE_KEY: &str = "message-key";
-
-// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
-// It is renewed every LOCK_DURATION/3
-// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
-// lost the lock.
-const LOCK_DURATION: Duration = Duration::from_secs(300);
-
-// In addition to checking when notified, also check for new mail every 10 minutes
-const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
-
-pub async fn incoming_mail_watch_process(
- user: Weak<User>,
- creds: Credentials,
- rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
-) {
- if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
- error!("Error in incoming mail watch process: {}", e);
- }
-}
-
-async fn incoming_mail_watch_process_internal(
- user: Weak<User>,
- creds: Credentials,
- mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
-) -> Result<()> {
- let mut lock_held = k2v_lock_loop(
- creds.storage.build().await?,
- storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
- );
- let storage = creds.storage.build().await?;
-
- let mut inbox: Option<Arc<Mailbox>> = None;
- let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
-
- loop {
- let maybe_updated_incoming_key = if *lock_held.borrow() {
- debug!("incoming lock held");
-
- let wait_new_mail = async {
- loop {
- match storage.row_poll(&incoming_key).await {
- Ok(row_val) => break row_val.row_ref,
- Err(e) => {
- error!("Error in wait_new_mail: {}", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- }
- }
- };
-
- tokio::select! {
- inc_k = wait_new_mail => Some(inc_k),
- _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
- _ = lock_held.changed() => None,
- _ = rx_inbox_id.changed() => None,
- }
- } else {
- debug!("incoming lock not held");
- tokio::select! {
- _ = lock_held.changed() => None,
- _ = rx_inbox_id.changed() => None,
- }
- };
-
- let user = match Weak::upgrade(&user) {
- Some(user) => user,
- None => {
- debug!("User no longer available, exiting incoming loop.");
- break;
- }
- };
- debug!("User still available");
-
- // If INBOX no longer is same mailbox, open new mailbox
- let inbox_id = *rx_inbox_id.borrow();
- if let Some((id, uidvalidity)) = inbox_id {
- if Some(id) != inbox.as_ref().map(|b| b.id) {
- match user.open_mailbox_by_id(id, uidvalidity).await {
- Ok(mb) => {
- inbox = Some(mb);
- }
- Err(e) => {
- inbox = None;
- error!("Error when opening inbox ({}): {}", id, e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- continue;
- }
- }
- }
- }
-
- // If we were able to open INBOX, and we have mail,
- // fetch new mail
- if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
- match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
- Ok(()) => {
- incoming_key = updated_incoming_key;
- }
- Err(e) => {
- error!("Could not fetch incoming mail: {}", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- }
- }
- }
- drop(rx_inbox_id);
- Ok(())
-}
-
-async fn handle_incoming_mail(
- user: &Arc<User>,
- storage: &storage::Store,
- inbox: &Arc<Mailbox>,
- lock_held: &watch::Receiver<bool>,
-) -> Result<()> {
- let mails_res = storage.blob_list("incoming/").await?;
-
- for object in mails_res {
- if !*lock_held.borrow() {
- break;
- }
- let key = object.0;
- if let Some(mail_id) = key.strip_prefix("incoming/") {
- if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
- move_incoming_message(user, storage, inbox, mail_id).await?;
- }
- }
- }
-
- Ok(())
-}
-
-async fn move_incoming_message(
- user: &Arc<User>,
- storage: &storage::Store,
- inbox: &Arc<Mailbox>,
- id: UniqueIdent,
-) -> Result<()> {
- info!("Moving incoming message: {}", id);
-
- let object_key = format!("incoming/{}", id);
-
- // 1. Fetch message from S3
- let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
-
- // 1.a decrypt message key from headers
- //info!("Object metadata: {:?}", get_result.metadata);
- let key_encrypted_b64 = object
- .meta
- .get(MESSAGE_KEY)
- .ok_or(anyhow!("Missing key in metadata"))?;
- let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
- let message_key = sodiumoxide::crypto::sealedbox::open(
- &key_encrypted,
- &user.creds.keys.public,
- &user.creds.keys.secret,
- )
- .map_err(|_| anyhow!("Cannot decrypt message key"))?;
- let message_key =
- cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
-
- // 1.b retrieve message body
- let obj_body = object.value;
- let plain_mail = cryptoblob::open(&obj_body, &message_key)
- .map_err(|_| anyhow!("Cannot decrypt email content"))?;
-
- // 2 parse mail and add to inbox
- let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
- inbox
- .append_from_s3(msg, id, object.blob_ref.clone(), message_key)
- .await?;
-
- // 3 delete from incoming
- storage.blob_rm(&object.blob_ref).await?;
-
- Ok(())
-}
-
-// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
-
-fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
- let (held_tx, held_rx) = watch::channel(false);
-
- tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
-
- held_rx
-}
-
-#[derive(Clone, Debug)]
-enum LockState {
- Unknown,
- Empty,
- Held(UniqueIdent, u64, storage::RowRef),
-}
-
-async fn k2v_lock_loop_internal(
- storage: storage::Store,
- row_ref: storage::RowRef,
- held_tx: watch::Sender<bool>,
-) {
- let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
- let mut state_rx_2 = state_rx.clone();
-
- let our_pid = gen_ident();
-
- // Loop 1: watch state of lock in K2V, save that in corresponding watch channel
- let watch_lock_loop: BoxFuture<Result<()>> = async {
- let mut ct = row_ref.clone();
- loop {
- debug!("k2v watch lock loop iter: ct = {:?}", ct);
- match storage.row_poll(&ct).await {
- Err(e) => {
- error!(
- "Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
- e
- );
- state_tx.send(LockState::Unknown)?;
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
- Ok(cv) => {
- let mut lock_state = None;
- for v in cv.value.iter() {
- if let storage::Alternative::Value(vbytes) = v {
- if vbytes.len() == 32 {
- let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
- let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
- if lock_state
- .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
- .unwrap_or(true)
- {
- lock_state = Some((pid, ts));
- }
- }
- }
- }
- let new_ct = cv.row_ref;
-
- debug!(
- "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
- ct, new_ct, lock_state
- );
- state_tx.send(
- lock_state
- .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
- .unwrap_or(LockState::Empty),
- )?;
- ct = new_ct;
- }
- }
- }
- }
- .boxed();
-
- // Loop 2: notify user whether we are holding the lock or not
- let lock_notify_loop: BoxFuture<Result<()>> = async {
- loop {
- let now = now_msec();
- let held_with_expiration_time = match &*state_rx.borrow_and_update() {
- LockState::Held(pid, ts, _ct) if *pid == our_pid => {
- let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
- if now < expiration_time {
- Some(expiration_time)
- } else {
- None
- }
- }
- _ => None,
- };
- let held = held_with_expiration_time.is_some();
- if held != *held_tx.borrow() {
- held_tx.send(held)?;
- }
-
- let await_expired = async {
- match held_with_expiration_time {
- None => futures::future::pending().await,
- Some(expiration_time) => {
- tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
- }
- };
- };
-
- tokio::select!(
- r = state_rx.changed() => {
- r?;
- }
- _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
- _ = await_expired => continue,
- );
- }
- }
- .boxed();
-
- // Loop 3: acquire lock when relevant
- let take_lock_loop: BoxFuture<Result<()>> = async {
- loop {
- let now = now_msec();
- let state: LockState = state_rx_2.borrow_and_update().clone();
- let (acquire_at, ct) = match state {
- LockState::Unknown => {
- // If state of the lock is unknown, don't try to acquire
- state_rx_2.changed().await?;
- continue;
- }
- LockState::Empty => (now, None),
- LockState::Held(pid, ts, ct) => {
- if pid == our_pid {
- (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
- } else {
- (ts, Some(ct))
- }
- }
- };
-
- // Wait until it is time to acquire lock
- if acquire_at > now {
- tokio::select!(
- r = state_rx_2.changed() => {
- // If lock state changed in the meantime, don't acquire and loop around
- r?;
- continue;
- }
- _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
- );
- }
-
- // Acquire lock
- let mut lock = vec![0u8; 32];
- lock[..8].copy_from_slice(&u64::to_be_bytes(
- now_msec() + LOCK_DURATION.as_millis() as u64,
- ));
- lock[8..].copy_from_slice(&our_pid.0);
- let row = match ct {
- Some(existing) => existing,
- None => row_ref.clone(),
- };
- if let Err(e) = storage
- .row_insert(vec![storage::RowVal::new(row, lock)])
- .await
- {
- error!("Could not take lock: {}", e);
- tokio::time::sleep(Duration::from_secs(30)).await;
- }
-
- // Wait for new information to loop back
- state_rx_2.changed().await?;
- }
- }
- .boxed();
-
- let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
-
- debug!("lock loop exited, releasing");
-
- if !held_tx.is_closed() {
- warn!("weird...");
- let _ = held_tx.send(false);
- }
-
- // If lock is ours, release it
- let release = match &*state_rx.borrow() {
- LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
- _ => None,
- };
- if let Some(ct) = release {
- match storage.row_rm(&storage::Selector::Single(&ct)).await {
- Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
- Ok(_) => (),
- };
- }
-}
-
-// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
-
-pub struct EncryptedMessage {
- key: cryptoblob::Key,
- encrypted_body: Vec<u8>,
-}
-
-impl EncryptedMessage {
- pub fn new(body: Vec<u8>) -> Result<Self> {
- let key = cryptoblob::gen_key();
- let encrypted_body = cryptoblob::seal(&body, &key)?;
- Ok(Self {
- key,
- encrypted_body,
- })
- }
-
- pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
- let storage = creds.storage.build().await?;
-
- // Get causality token of previous watch key
- let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
- let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
- Err(_) => query,
- Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
- };
-
- // Write mail to encrypted storage
- let encrypted_key =
- sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
- let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
-
- let blob_val = storage::BlobVal::new(
- storage::BlobRef(format!("incoming/{}", gen_ident())),
- self.encrypted_body.clone().into(),
- )
- .with_meta(MESSAGE_KEY.to_string(), key_header);
- storage.blob_insert(blob_val).await?;
-
- // Update watch key to signal new mail
- let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
- storage.row_insert(vec![watch_val]).await?;
- Ok(())
- }
-}
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
deleted file mode 100644
index d1a5473..0000000
--- a/src/mail/mailbox.rs
+++ /dev/null
@@ -1,524 +0,0 @@
-use anyhow::{anyhow, bail, Result};
-use serde::{Deserialize, Serialize};
-use tokio::sync::RwLock;
-
-use crate::bayou::Bayou;
-use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
-use crate::login::Credentials;
-use crate::mail::uidindex::*;
-use crate::mail::unique_ident::*;
-use crate::mail::IMF;
-use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
-use crate::timestamp::now_msec;
-
-pub struct Mailbox {
- pub(super) id: UniqueIdent,
- mbox: RwLock<MailboxInternal>,
-}
-
-impl Mailbox {
- pub(crate) async fn open(
- creds: &Credentials,
- id: UniqueIdent,
- min_uidvalidity: ImapUidvalidity,
- ) -> Result<Self> {
- let index_path = format!("index/{}", id);
- let mail_path = format!("mail/{}", id);
-
- let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
- uid_index.sync().await?;
-
- let uidvalidity = uid_index.state().uidvalidity;
- if uidvalidity < min_uidvalidity {
- uid_index
- .push(
- uid_index
- .state()
- .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
- )
- .await?;
- }
-
- // @FIXME reporting through opentelemetry or some logs
- // info on the "shape" of the mailbox would be welcomed
- /*
- dump(&uid_index);
- */
-
- let mbox = RwLock::new(MailboxInternal {
- id,
- encryption_key: creds.keys.master.clone(),
- storage: creds.storage.build().await?,
- uid_index,
- mail_path,
- });
-
- Ok(Self { id, mbox })
- }
-
- /// Sync data with backing store
- pub async fn force_sync(&self) -> Result<()> {
- self.mbox.write().await.force_sync().await
- }
-
- /// Sync data with backing store only if changes are detected
- /// or last sync is too old
- pub async fn opportunistic_sync(&self) -> Result<()> {
- self.mbox.write().await.opportunistic_sync().await
- }
-
- /// Block until a sync has been done (due to changes in the event log)
- pub async fn notify(&self) -> std::sync::Weak<tokio::sync::Notify> {
- self.mbox.read().await.notifier()
- }
-
- // ---- Functions for reading the mailbox ----
-
- /// Get a clone of the current UID Index of this mailbox
- /// (cloning is cheap so don't hesitate to use this)
- pub async fn current_uid_index(&self) -> UidIndex {
- self.mbox.read().await.uid_index.state().clone()
- }
-
- /// Fetch the metadata (headers + some more info) of the specified
- /// mail IDs
- pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
- self.mbox.read().await.fetch_meta(ids).await
- }
-
- /// Fetch an entire e-mail
- pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- self.mbox.read().await.fetch_full(id, message_key).await
- }
-
- pub async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox {
- super::snapshot::FrozenMailbox::new(self.clone()).await
- }
-
- // ---- Functions for changing the mailbox ----
-
- /// Add flags to message
- pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
- self.mbox.write().await.add_flags(id, flags).await
- }
-
- /// Delete flags from message
- pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
- self.mbox.write().await.del_flags(id, flags).await
- }
-
- /// Define the new flags for this message
- pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
- self.mbox.write().await.set_flags(id, flags).await
- }
-
- /// Insert an email into the mailbox
- pub async fn append<'a>(
- &self,
- msg: IMF<'a>,
- ident: Option<UniqueIdent>,
- flags: &[Flag],
- ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
- self.mbox.write().await.append(msg, ident, flags).await
- }
-
- /// Insert an email into the mailbox, copying it from an existing S3 object
- pub async fn append_from_s3<'a>(
- &self,
- msg: IMF<'a>,
- ident: UniqueIdent,
- blob_ref: storage::BlobRef,
- message_key: Key,
- ) -> Result<()> {
- self.mbox
- .write()
- .await
- .append_from_s3(msg, ident, blob_ref, message_key)
- .await
- }
-
- /// Delete a message definitively from the mailbox
- pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
- self.mbox.write().await.delete(id).await
- }
-
- /// Copy an email from an other Mailbox to this mailbox
- /// (use this when possible, as it allows for a certain number of storage optimizations)
- pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<UniqueIdent> {
- if self.id == from.id {
- bail!("Cannot copy into same mailbox");
- }
-
- let (mut selflock, fromlock);
- if self.id < from.id {
- selflock = self.mbox.write().await;
- fromlock = from.mbox.write().await;
- } else {
- fromlock = from.mbox.write().await;
- selflock = self.mbox.write().await;
- };
- selflock.copy_from(&fromlock, uuid).await
- }
-
- /// Move an email from an other Mailbox to this mailbox
- /// (use this when possible, as it allows for a certain number of storage optimizations)
- pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
- if self.id == from.id {
- bail!("Cannot copy move same mailbox");
- }
-
- let (mut selflock, mut fromlock);
- if self.id < from.id {
- selflock = self.mbox.write().await;
- fromlock = from.mbox.write().await;
- } else {
- fromlock = from.mbox.write().await;
- selflock = self.mbox.write().await;
- };
- selflock.move_from(&mut fromlock, uuid).await
- }
-}
-
-// ----
-
-// Non standard but common flags:
-// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
-struct MailboxInternal {
- // 2023-05-15 will probably be used later.
- #[allow(dead_code)]
- id: UniqueIdent,
- mail_path: String,
- encryption_key: Key,
- storage: Store,
- uid_index: Bayou<UidIndex>,
-}
-
-impl MailboxInternal {
- async fn force_sync(&mut self) -> Result<()> {
- self.uid_index.sync().await?;
- Ok(())
- }
-
- async fn opportunistic_sync(&mut self) -> Result<()> {
- self.uid_index.opportunistic_sync().await?;
- Ok(())
- }
-
- fn notifier(&self) -> std::sync::Weak<tokio::sync::Notify> {
- self.uid_index.notifier()
- }
-
- // ---- Functions for reading the mailbox ----
-
- async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
- let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
- let ops = ids
- .iter()
- .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
- .collect::<Vec<_>>();
- let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
-
- let mut meta_vec = vec![];
- for res in res_vec.into_iter() {
- let mut meta_opt = None;
-
- // Resolve conflicts
- for v in res.value.iter() {
- match v {
- storage::Alternative::Tombstone => (),
- storage::Alternative::Value(v) => {
- let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
- match meta_opt.as_mut() {
- None => {
- meta_opt = Some(meta);
- }
- Some(prevmeta) => {
- prevmeta.try_merge(meta)?;
- }
- }
- }
- }
- }
- if let Some(meta) = meta_opt {
- meta_vec.push(meta);
- } else {
- bail!("No valid meta value in k2v for {:?}", res.row_ref);
- }
- }
-
- Ok(meta_vec)
- }
-
- async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- let obj_res = self
- .storage
- .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
- .await?;
- let body = obj_res.value;
- cryptoblob::open(&body, message_key)
- }
-
- // ---- Functions for changing the mailbox ----
-
- async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
- let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec());
- self.uid_index.push(add_flag_op).await
- }
-
- async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
- let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec());
- self.uid_index.push(del_flag_op).await
- }
-
- async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
- let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec());
- self.uid_index.push(set_flag_op).await
- }
-
- async fn append(
- &mut self,
- mail: IMF<'_>,
- ident: Option<UniqueIdent>,
- flags: &[Flag],
- ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
- let ident = ident.unwrap_or_else(gen_ident);
- let message_key = gen_key();
-
- futures::try_join!(
- async {
- // Encrypt and save mail body
- let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
- self.storage
- .blob_insert(BlobVal::new(
- BlobRef(format!("{}/{}", self.mail_path, ident)),
- message_blob,
- ))
- .await?;
- Ok::<_, anyhow::Error>(())
- },
- async {
- // Save mail meta
- let meta = MailMeta {
- internaldate: now_msec(),
- headers: mail.parsed.raw_headers.to_vec(),
- message_key: message_key.clone(),
- rfc822_size: mail.raw.len(),
- };
- let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.storage
- .row_insert(vec![RowVal::new(
- RowRef::new(&self.mail_path, &ident.to_string()),
- meta_blob,
- )])
- .await?;
- Ok::<_, anyhow::Error>(())
- },
- self.uid_index.opportunistic_sync()
- )?;
-
- // Add mail to Bayou mail index
- let uid_state = self.uid_index.state();
- let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
-
- let uidvalidity = uid_state.uidvalidity;
- let (uid, modseq) = match add_mail_op {
- UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq),
- _ => unreachable!(),
- };
-
- self.uid_index.push(add_mail_op).await?;
-
- Ok((uidvalidity, uid, modseq))
- }
-
- async fn append_from_s3<'a>(
- &mut self,
- mail: IMF<'a>,
- ident: UniqueIdent,
- blob_src: storage::BlobRef,
- message_key: Key,
- ) -> Result<()> {
- futures::try_join!(
- async {
- // Copy mail body from previous location
- let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
- self.storage.blob_copy(&blob_src, &blob_dst).await?;
- Ok::<_, anyhow::Error>(())
- },
- async {
- // Save mail meta
- let meta = MailMeta {
- internaldate: now_msec(),
- headers: mail.parsed.raw_headers.to_vec(),
- message_key: message_key.clone(),
- rfc822_size: mail.raw.len(),
- };
- let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.storage
- .row_insert(vec![RowVal::new(
- RowRef::new(&self.mail_path, &ident.to_string()),
- meta_blob,
- )])
- .await?;
- Ok::<_, anyhow::Error>(())
- },
- self.uid_index.opportunistic_sync()
- )?;
-
- // Add mail to Bayou mail index
- let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]);
- self.uid_index.push(add_mail_op).await?;
-
- Ok(())
- }
-
- async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
- if !self.uid_index.state().table.contains_key(&ident) {
- bail!("Cannot delete mail that doesn't exit");
- }
-
- let del_mail_op = self.uid_index.state().op_mail_del(ident);
- self.uid_index.push(del_mail_op).await?;
-
- futures::try_join!(
- async {
- // Delete mail body from S3
- self.storage
- .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
- .await?;
- Ok::<_, anyhow::Error>(())
- },
- async {
- // Delete mail meta from K2V
- let sk = ident.to_string();
- let res = self
- .storage
- .row_fetch(&storage::Selector::Single(&RowRef::new(
- &self.mail_path,
- &sk,
- )))
- .await?;
- if let Some(row_val) = res.into_iter().next() {
- self.storage
- .row_rm(&storage::Selector::Single(&row_val.row_ref))
- .await?;
- }
- Ok::<_, anyhow::Error>(())
- }
- )?;
- Ok(())
- }
-
- async fn copy_from(
- &mut self,
- from: &MailboxInternal,
- source_id: UniqueIdent,
- ) -> Result<UniqueIdent> {
- let new_id = gen_ident();
- self.copy_internal(from, source_id, new_id).await?;
- Ok(new_id)
- }
-
- async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
- self.copy_internal(from, id, id).await?;
- from.delete(id).await?;
- Ok(())
- }
-
- async fn copy_internal(
- &mut self,
- from: &MailboxInternal,
- source_id: UniqueIdent,
- new_id: UniqueIdent,
- ) -> Result<()> {
- if self.encryption_key != from.encryption_key {
- bail!("Message to be copied/moved does not belong to same account.");
- }
-
- let flags = from
- .uid_index
- .state()
- .table
- .get(&source_id)
- .ok_or(anyhow!("Source mail not found"))?
- .2
- .clone();
-
- futures::try_join!(
- async {
- let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
- let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
- self.storage.blob_copy(&src, &dst).await?;
- Ok::<_, anyhow::Error>(())
- },
- async {
- // Copy mail meta in K2V
- let meta = &from.fetch_meta(&[source_id]).await?[0];
- let meta_blob = seal_serialize(meta, &self.encryption_key)?;
- self.storage
- .row_insert(vec![RowVal::new(
- RowRef::new(&self.mail_path, &new_id.to_string()),
- meta_blob,
- )])
- .await?;
- Ok::<_, anyhow::Error>(())
- },
- self.uid_index.opportunistic_sync(),
- )?;
-
- // Add mail to Bayou mail index
- let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags);
- self.uid_index.push(add_mail_op).await?;
-
- Ok(())
- }
-}
-
-// Can be useful to debug so we want this code
-// to be available to developers
-#[allow(dead_code)]
-fn dump(uid_index: &Bayou<UidIndex>) {
- let s = uid_index.state();
- println!("---- MAILBOX STATE ----");
- println!("UIDVALIDITY {}", s.uidvalidity);
- println!("UIDNEXT {}", s.uidnext);
- println!("INTERNALSEQ {}", s.internalseq);
- for (uid, ident) in s.idx_by_uid.iter() {
- println!(
- "{} {} {}",
- uid,
- hex::encode(ident.0),
- s.table.get(ident).cloned().unwrap().2.join(", ")
- );
- }
- println!();
-}
-
-// ----
-
-/// The metadata of a message that is stored in K2V
-/// at pk = mail/<mailbox uuid>, sk = <message uuid>
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct MailMeta {
- /// INTERNALDATE field (milliseconds since epoch)
- pub internaldate: u64,
- /// Headers of the message
- pub headers: Vec<u8>,
- /// Secret key for decrypting entire message
- pub message_key: Key,
- /// RFC822 size
- pub rfc822_size: usize,
-}
-
-impl MailMeta {
- fn try_merge(&mut self, other: Self) -> Result<()> {
- if self.headers != other.headers
- || self.message_key != other.message_key
- || self.rfc822_size != other.rfc822_size
- {
- bail!("Conflicting MailMeta values.");
- }
- self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
- Ok(())
- }
-}
diff --git a/src/mail/mod.rs b/src/mail/mod.rs
deleted file mode 100644
index 03e85cd..0000000
--- a/src/mail/mod.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-use std::convert::TryFrom;
-
-pub mod incoming;
-pub mod mailbox;
-pub mod query;
-pub mod snapshot;
-pub mod uidindex;
-pub mod unique_ident;
-pub mod namespace;
-
-// Internet Message Format
-// aka RFC 822 - RFC 2822 - RFC 5322
-// 2023-05-15 don't want to refactor this struct now.
-#[allow(clippy::upper_case_acronyms)]
-pub struct IMF<'a> {
- raw: &'a [u8],
- parsed: eml_codec::part::composite::Message<'a>,
-}
-
-impl<'a> TryFrom<&'a [u8]> for IMF<'a> {
- type Error = ();
-
- fn try_from(body: &'a [u8]) -> Result<IMF<'a>, ()> {
- let parsed = eml_codec::parse_message(body).or(Err(()))?.1;
- Ok(Self { raw: body, parsed })
- }
-}
diff --git a/src/mail/namespace.rs b/src/mail/namespace.rs
deleted file mode 100644
index 5e67173..0000000
--- a/src/mail/namespace.rs
+++ /dev/null
@@ -1,209 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc, Weak};
-
-use anyhow::{anyhow, bail, Result};
-use lazy_static::lazy_static;
-use serde::{Deserialize, Serialize};
-use tokio::sync::watch;
-
-use crate::cryptoblob::{open_deserialize, seal_serialize};
-use crate::login::Credentials;
-use crate::mail::incoming::incoming_mail_watch_process;
-use crate::mail::mailbox::Mailbox;
-use crate::mail::uidindex::ImapUidvalidity;
-use crate::mail::unique_ident::{gen_ident, UniqueIdent};
-use crate::storage;
-use crate::timestamp::now_msec;
-
-pub const MAILBOX_HIERARCHY_DELIMITER: char = '.';
-
-/// INBOX is the only mailbox that must always exist.
-/// It is created automatically when the account is created.
-/// IMAP allows the user to rename INBOX to something else,
-/// in this case all messages from INBOX are moved to a mailbox
-/// with the new name and the INBOX mailbox still exists and is empty.
-/// In our implementation, we indeed move the underlying mailbox
-/// to the new name (i.e. the new name has the same id as the previous
-/// INBOX), and we create a new empty mailbox for INBOX.
-pub const INBOX: &str = "INBOX";
-
-/// For convenience purpose, we also create some special mailbox
-/// that are described in RFC6154 SPECIAL-USE
-/// @FIXME maybe it should be a configuration parameter
-/// @FIXME maybe we should have a per-mailbox flag mechanism, either an enum or a string, so we
-/// track which mailbox is used for what.
-/// @FIXME Junk could be useful but we don't have any antispam solution yet so...
-/// @FIXME IMAP supports virtual mailbox. \All or \Flagged are intended to be virtual mailboxes.
-/// \Trash might be one, or not one. I don't know what we should do there.
-pub const DRAFTS: &str = "Drafts";
-pub const ARCHIVE: &str = "Archive";
-pub const SENT: &str = "Sent";
-pub const TRASH: &str = "Trash";
-
-pub(crate) const MAILBOX_LIST_PK: &str = "mailboxes";
-pub(crate) const MAILBOX_LIST_SK: &str = "list";
-
-// ---- User's mailbox list (serialized in K2V) ----
-
-#[derive(Serialize, Deserialize)]
-pub(crate) struct MailboxList(BTreeMap<String, MailboxListEntry>);
-
-#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
-pub(crate) struct MailboxListEntry {
- id_lww: (u64, Option<UniqueIdent>),
- uidvalidity: ImapUidvalidity,
-}
-
-impl MailboxListEntry {
- fn merge(&mut self, other: &Self) {
- // Simple CRDT merge rule
- if other.id_lww.0 > self.id_lww.0
- || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
- {
- self.id_lww = other.id_lww;
- }
- self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity);
- }
-}
-
-impl MailboxList {
- pub(crate) fn new() -> Self {
- Self(BTreeMap::new())
- }
-
- pub(crate) fn merge(&mut self, list2: Self) {
- for (k, v) in list2.0.into_iter() {
- if let Some(e) = self.0.get_mut(&k) {
- e.merge(&v);
- } else {
- self.0.insert(k, v);
- }
- }
- }
-
- pub(crate) fn existing_mailbox_names(&self) -> Vec<String> {
- self.0
- .iter()
- .filter(|(_, v)| v.id_lww.1.is_some())
- .map(|(k, _)| k.to_string())
- .collect()
- }
-
- pub(crate) fn has_mailbox(&self, name: &str) -> bool {
- matches!(
- self.0.get(name),
- Some(MailboxListEntry {
- id_lww: (_, Some(_)),
- ..
- })
- )
- }
-
- pub(crate) fn get_mailbox(&self, name: &str) -> Option<(ImapUidvalidity, Option<UniqueIdent>)> {
- self.0.get(name).map(
- |MailboxListEntry {
- id_lww: (_, mailbox_id),
- uidvalidity,
- }| (*uidvalidity, *mailbox_id),
- )
- }
-
- /// Ensures mailbox `name` maps to id `id`.
- /// If it already mapped to that, returns None.
- /// If a change had to be done, returns Some(new uidvalidity in mailbox).
- pub(crate) fn set_mailbox(&mut self, name: &str, id: Option<UniqueIdent>) -> Option<ImapUidvalidity> {
- let (ts, id, uidvalidity) = match self.0.get_mut(name) {
- None => {
- if id.is_none() {
- return None;
- } else {
- (now_msec(), id, ImapUidvalidity::new(1).unwrap())
- }
- }
- Some(MailboxListEntry {
- id_lww,
- uidvalidity,
- }) => {
- if id_lww.1 == id {
- return None;
- } else {
- (
- std::cmp::max(id_lww.0 + 1, now_msec()),
- id,
- ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(),
- )
- }
- }
- };
-
- self.0.insert(
- name.into(),
- MailboxListEntry {
- id_lww: (ts, id),
- uidvalidity,
- },
- );
- Some(uidvalidity)
- }
-
- pub(crate) fn update_uidvalidity(&mut self, name: &str, new_uidvalidity: ImapUidvalidity) {
- match self.0.get_mut(name) {
- None => {
- self.0.insert(
- name.into(),
- MailboxListEntry {
- id_lww: (now_msec(), None),
- uidvalidity: new_uidvalidity,
- },
- );
- }
- Some(MailboxListEntry { uidvalidity, .. }) => {
- *uidvalidity = std::cmp::max(*uidvalidity, new_uidvalidity);
- }
- }
- }
-
- pub(crate) fn create_mailbox(&mut self, name: &str) -> CreatedMailbox {
- if let Some(MailboxListEntry {
- id_lww: (_, Some(id)),
- uidvalidity,
- }) = self.0.get(name)
- {
- return CreatedMailbox::Existed(*id, *uidvalidity);
- }
-
- let id = gen_ident();
- let uidvalidity = self.set_mailbox(name, Some(id)).unwrap();
- CreatedMailbox::Created(id, uidvalidity)
- }
-
- pub(crate) fn rename_mailbox(&mut self, old_name: &str, new_name: &str) -> Result<()> {
- if let Some((uidvalidity, Some(mbid))) = self.get_mailbox(old_name) {
- if self.has_mailbox(new_name) {
- bail!(
- "Cannot rename {} into {}: {} already exists",
- old_name,
- new_name,
- new_name
- );
- }
-
- self.set_mailbox(old_name, None);
- self.set_mailbox(new_name, Some(mbid));
- self.update_uidvalidity(new_name, uidvalidity);
- Ok(())
- } else {
- bail!(
- "Cannot rename {} into {}: {} doesn't exist",
- old_name,
- new_name,
- old_name
- );
- }
- }
-}
-
-pub(crate) enum CreatedMailbox {
- Created(UniqueIdent, ImapUidvalidity),
- Existed(UniqueIdent, ImapUidvalidity),
-}
diff --git a/src/mail/query.rs b/src/mail/query.rs
deleted file mode 100644
index 3e6fe99..0000000
--- a/src/mail/query.rs
+++ /dev/null
@@ -1,137 +0,0 @@
-use super::mailbox::MailMeta;
-use super::snapshot::FrozenMailbox;
-use super::unique_ident::UniqueIdent;
-use anyhow::Result;
-use futures::future::FutureExt;
-use futures::stream::{BoxStream, Stream, StreamExt};
-
-/// Query is in charge of fetching efficiently
-/// requested data for a list of emails
-pub struct Query<'a, 'b> {
- pub frozen: &'a FrozenMailbox,
- pub emails: &'b [UniqueIdent],
- pub scope: QueryScope,
-}
-
-#[derive(Debug)]
-pub enum QueryScope {
- Index,
- Partial,
- Full,
-}
-impl QueryScope {
- pub fn union(&self, other: &QueryScope) -> QueryScope {
- match (self, other) {
- (QueryScope::Full, _) | (_, QueryScope::Full) => QueryScope::Full,
- (QueryScope::Partial, _) | (_, QueryScope::Partial) => QueryScope::Partial,
- (QueryScope::Index, QueryScope::Index) => QueryScope::Index,
- }
- }
-}
-
-//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>;
-
-impl<'a, 'b> Query<'a, 'b> {
- pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
- match self.scope {
- QueryScope::Index => Box::pin(
- futures::stream::iter(self.emails)
- .map(|&uuid| Ok(QueryResult::IndexResult { uuid })),
- ),
- QueryScope::Partial => Box::pin(self.partial()),
- QueryScope::Full => Box::pin(self.full()),
- }
- }
-
- // --- functions below are private *for reasons*
- fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
- async move {
- let maybe_meta_list: Result<Vec<MailMeta>> =
- self.frozen.mailbox.fetch_meta(self.emails).await;
- let list_res = maybe_meta_list
- .map(|meta_list| {
- meta_list
- .into_iter()
- .zip(self.emails)
- .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
- .collect()
- })
- .unwrap_or_else(|e| vec![Err(e)]);
-
- futures::stream::iter(list_res)
- }
- .flatten_stream()
- }
-
- fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
- self.partial().then(move |maybe_meta| async move {
- let meta = maybe_meta?;
-
- let content = self
- .frozen
- .mailbox
- .fetch_full(
- *meta.uuid(),
- &meta
- .metadata()
- .expect("meta to be PartialResult")
- .message_key,
- )
- .await?;
-
- Ok(meta.into_full(content).expect("meta to be PartialResult"))
- })
- }
-}
-
-#[derive(Debug, Clone)]
-pub enum QueryResult {
- IndexResult {
- uuid: UniqueIdent,
- },
- PartialResult {
- uuid: UniqueIdent,
- metadata: MailMeta,
- },
- FullResult {
- uuid: UniqueIdent,
- metadata: MailMeta,
- content: Vec<u8>,
- },
-}
-impl QueryResult {
- pub fn uuid(&self) -> &UniqueIdent {
- match self {
- Self::IndexResult { uuid, .. } => uuid,
- Self::PartialResult { uuid, .. } => uuid,
- Self::FullResult { uuid, .. } => uuid,
- }
- }
-
- pub fn metadata(&self) -> Option<&MailMeta> {
- match self {
- Self::IndexResult { .. } => None,
- Self::PartialResult { metadata, .. } => Some(metadata),
- Self::FullResult { metadata, .. } => Some(metadata),
- }
- }
-
- #[allow(dead_code)]
- pub fn content(&self) -> Option<&[u8]> {
- match self {
- Self::FullResult { content, .. } => Some(content),
- _ => None,
- }
- }
-
- fn into_full(self, content: Vec<u8>) -> Option<Self> {
- match self {
- Self::PartialResult { uuid, metadata } => Some(Self::FullResult {
- uuid,
- metadata,
- content,
- }),
- _ => None,
- }
- }
-}
diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs
deleted file mode 100644
index ed756b5..0000000
--- a/src/mail/snapshot.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-use std::sync::Arc;
-
-use anyhow::Result;
-
-use super::mailbox::Mailbox;
-use super::query::{Query, QueryScope};
-use super::uidindex::UidIndex;
-use super::unique_ident::UniqueIdent;
-
-/// A Frozen Mailbox has a snapshot of the current mailbox
-/// state that is desynchronized with the real mailbox state.
-/// It's up to the user to choose when their snapshot must be updated
-/// to give useful information to their clients
-pub struct FrozenMailbox {
- pub mailbox: Arc<Mailbox>,
- pub snapshot: UidIndex,
-}
-
-impl FrozenMailbox {
- /// Create a snapshot from a mailbox, the mailbox + the snapshot
- /// becomes the "Frozen Mailbox".
- pub async fn new(mailbox: Arc<Mailbox>) -> Self {
- let state = mailbox.current_uid_index().await;
-
- Self {
- mailbox,
- snapshot: state,
- }
- }
-
- /// Force the synchronization of the inner mailbox
- /// but do not update the local snapshot
- pub async fn sync(&self) -> Result<()> {
- self.mailbox.opportunistic_sync().await
- }
-
- /// Peek snapshot without updating the frozen mailbox
- /// Can be useful if you want to plan some writes
- /// while sending a diff to the client later
- pub async fn peek(&self) -> UidIndex {
- self.mailbox.current_uid_index().await
- }
-
- /// Update the FrozenMailbox local snapshot.
- /// Returns the old snapshot, so you can build a diff
- pub async fn update(&mut self) -> UidIndex {
- let old_snapshot = self.snapshot.clone();
- self.snapshot = self.mailbox.current_uid_index().await;
-
- old_snapshot
- }
-
- pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> {
- Query {
- frozen: self,
- emails: uuids,
- scope,
- }
- }
-}
diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs
deleted file mode 100644
index 5a06670..0000000
--- a/src/mail/uidindex.rs
+++ /dev/null
@@ -1,474 +0,0 @@
-use std::num::{NonZeroU32, NonZeroU64};
-
-use im::{HashMap, OrdMap, OrdSet};
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
-
-use crate::bayou::*;
-use crate::mail::unique_ident::UniqueIdent;
-
-pub type ModSeq = NonZeroU64;
-pub type ImapUid = NonZeroU32;
-pub type ImapUidvalidity = NonZeroU32;
-pub type Flag = String;
-pub type IndexEntry = (ImapUid, ModSeq, Vec<Flag>);
-
-/// A UidIndex handles the mutable part of a mailbox
-/// It is built by running the event log on it
-/// Each applied log generates a new UidIndex by cloning the previous one
-/// and applying the event. This is why we use immutable datastructures:
-/// they are cheap to clone.
-#[derive(Clone)]
-pub struct UidIndex {
- // Source of trust
- pub table: OrdMap<UniqueIdent, IndexEntry>,
-
- // Indexes optimized for queries
- pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>,
- pub idx_by_modseq: OrdMap<ModSeq, UniqueIdent>,
- pub idx_by_flag: FlagIndex,
-
- // "Public" Counters
- pub uidvalidity: ImapUidvalidity,
- pub uidnext: ImapUid,
- pub highestmodseq: ModSeq,
-
- // "Internal" Counters
- pub internalseq: ImapUid,
- pub internalmodseq: ModSeq,
-}
-
-#[derive(Clone, Serialize, Deserialize, Debug)]
-pub enum UidIndexOp {
- MailAdd(UniqueIdent, ImapUid, ModSeq, Vec<Flag>),
- MailDel(UniqueIdent),
- FlagAdd(UniqueIdent, ModSeq, Vec<Flag>),
- FlagDel(UniqueIdent, ModSeq, Vec<Flag>),
- FlagSet(UniqueIdent, ModSeq, Vec<Flag>),
- BumpUidvalidity(u32),
-}
-
-impl UidIndex {
- #[must_use]
- pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
- UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags)
- }
-
- #[must_use]
- pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp {
- UidIndexOp::MailDel(ident)
- }
-
- #[must_use]
- pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
- UidIndexOp::FlagAdd(ident, self.internalmodseq, flags)
- }
-
- #[must_use]
- pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
- UidIndexOp::FlagDel(ident, self.internalmodseq, flags)
- }
-
- #[must_use]
- pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
- UidIndexOp::FlagSet(ident, self.internalmodseq, flags)
- }
-
- #[must_use]
- pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp {
- UidIndexOp::BumpUidvalidity(count)
- }
-
- // INTERNAL functions to keep state consistent
-
- fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) {
- // Insert the email in our table
- self.table.insert(ident, (uid, modseq, flags.to_owned()));
-
- // Update the indexes/caches
- self.idx_by_uid.insert(uid, ident);
- self.idx_by_flag.insert(uid, flags);
- self.idx_by_modseq.insert(modseq, ident);
- }
-
- fn unreg_email(&mut self, ident: &UniqueIdent) {
- // We do nothing if the mail does not exist
- let (uid, modseq, flags) = match self.table.get(ident) {
- Some(v) => v,
- None => return,
- };
-
- // Delete all cache entries
- self.idx_by_uid.remove(uid);
- self.idx_by_flag.remove(*uid, flags);
- self.idx_by_modseq.remove(modseq);
-
- // Remove from source of trust
- self.table.remove(ident);
- }
-}
-
-impl Default for UidIndex {
- fn default() -> Self {
- Self {
- table: OrdMap::new(),
-
- idx_by_uid: OrdMap::new(),
- idx_by_modseq: OrdMap::new(),
- idx_by_flag: FlagIndex::new(),
-
- uidvalidity: NonZeroU32::new(1).unwrap(),
- uidnext: NonZeroU32::new(1).unwrap(),
- highestmodseq: NonZeroU64::new(1).unwrap(),
-
- internalseq: NonZeroU32::new(1).unwrap(),
- internalmodseq: NonZeroU64::new(1).unwrap(),
- }
- }
-}
-
-impl BayouState for UidIndex {
- type Op = UidIndexOp;
-
- fn apply(&self, op: &UidIndexOp) -> Self {
- let mut new = self.clone();
- match op {
- UidIndexOp::MailAdd(ident, uid, modseq, flags) => {
- // Change UIDValidity if there is a UID conflict or a MODSEQ conflict
- // @FIXME Need to prove that summing work
- // The intuition: we increase the UIDValidity by the number of possible conflicts
- if *uid < new.internalseq || *modseq < new.internalmodseq {
- let bump_uid = new.internalseq.get() - uid.get();
- let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
- new.uidvalidity =
- NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap();
- }
-
- // Assign the real uid of the email
- let new_uid = new.internalseq;
-
- // Assign the real modseq of the email and its new flags
- let new_modseq = new.internalmodseq;
-
- // Delete the previous entry if any.
- // Our proof has no assumption on `ident` uniqueness,
- // so we must handle this case even it is very unlikely
- // In this case, we overwrite the email.
- // Note: assigning a new UID is mandatory.
- new.unreg_email(ident);
-
- // We record our email and update ou caches
- new.reg_email(*ident, new_uid, new_modseq, flags);
-
- // Update counters
- new.highestmodseq = new.internalmodseq;
-
- new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
- new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
-
- new.uidnext = new.internalseq;
- }
- UidIndexOp::MailDel(ident) => {
- // If the email is known locally, we remove its references in all our indexes
- new.unreg_email(ident);
-
- // We update the counter
- new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
- }
- UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => {
- if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
- // Bump UIDValidity if required
- if *candidate_modseq < new.internalmodseq {
- let bump_modseq =
- (new.internalmodseq.get() - candidate_modseq.get()) as u32;
- new.uidvalidity =
- NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
- }
-
- // Add flags to the source of trust and the cache
- let mut to_add: Vec<Flag> = new_flags
- .iter()
- .filter(|f| !existing_flags.contains(f))
- .cloned()
- .collect();
- new.idx_by_flag.insert(*uid, &to_add);
- *email_modseq = new.internalmodseq;
- new.idx_by_modseq.insert(new.internalmodseq, *ident);
- existing_flags.append(&mut to_add);
-
- // Update counters
- new.highestmodseq = new.internalmodseq;
- new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
- }
- }
- UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => {
- if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
- // Bump UIDValidity if required
- if *candidate_modseq < new.internalmodseq {
- let bump_modseq =
- (new.internalmodseq.get() - candidate_modseq.get()) as u32;
- new.uidvalidity =
- NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
- }
-
- // Remove flags from the source of trust and the cache
- existing_flags.retain(|x| !rm_flags.contains(x));
- new.idx_by_flag.remove(*uid, rm_flags);
-
- // Register that email has been modified
- new.idx_by_modseq.insert(new.internalmodseq, *ident);
- *email_modseq = new.internalmodseq;
-
- // Update counters
- new.highestmodseq = new.internalmodseq;
- new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
- }
- }
- UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => {
- if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
- // Bump UIDValidity if required
- if *candidate_modseq < new.internalmodseq {
- let bump_modseq =
- (new.internalmodseq.get() - candidate_modseq.get()) as u32;
- new.uidvalidity =
- NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
- }
-
- // Remove flags from the source of trust and the cache
- let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags
- .iter()
- .cloned()
- .partition(|x| new_flags.contains(x));
- *existing_flags = keep_flags;
- let mut to_add: Vec<Flag> = new_flags
- .iter()
- .filter(|f| !existing_flags.contains(f))
- .cloned()
- .collect();
- existing_flags.append(&mut to_add);
- new.idx_by_flag.remove(*uid, &rm_flags);
- new.idx_by_flag.insert(*uid, &to_add);
-
- // Register that email has been modified
- new.idx_by_modseq.insert(new.internalmodseq, *ident);
- *email_modseq = new.internalmodseq;
-
- // Update counters
- new.highestmodseq = new.internalmodseq;
- new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
- }
- }
- UidIndexOp::BumpUidvalidity(count) => {
- new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count)
- .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap());
- }
- }
- new
- }
-}
-
-// ---- FlagIndex implementation ----
-
-#[derive(Clone)]
-pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>);
-pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet<ImapUid>>;
-
-impl FlagIndex {
- fn new() -> Self {
- Self(HashMap::new())
- }
- fn insert(&mut self, uid: ImapUid, flags: &[Flag]) {
- flags.iter().for_each(|flag| {
- self.0
- .entry(flag.clone())
- .or_insert(OrdSet::new())
- .insert(uid);
- });
- }
- fn remove(&mut self, uid: ImapUid, flags: &[Flag]) {
- for flag in flags.iter() {
- if let Some(set) = self.0.get_mut(flag) {
- set.remove(&uid);
- if set.is_empty() {
- self.0.remove(flag);
- }
- }
- }
- }
-
- pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> {
- self.0.get(f)
- }
-
- pub fn flags(&self) -> FlagIter {
- self.0.keys()
- }
-}
-
-// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ----
-
-#[derive(Serialize, Deserialize)]
-struct UidIndexSerializedRepr {
- mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec<Flag>)>,
-
- uidvalidity: ImapUidvalidity,
- uidnext: ImapUid,
- highestmodseq: ModSeq,
-
- internalseq: ImapUid,
- internalmodseq: ModSeq,
-}
-
-impl<'de> Deserialize<'de> for UidIndex {
- fn deserialize<D>(d: D) -> Result<Self, D::Error>
- where
- D: Deserializer<'de>,
- {
- let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?;
-
- let mut uidindex = UidIndex {
- table: OrdMap::new(),
-
- idx_by_uid: OrdMap::new(),
- idx_by_modseq: OrdMap::new(),
- idx_by_flag: FlagIndex::new(),
-
- uidvalidity: val.uidvalidity,
- uidnext: val.uidnext,
- highestmodseq: val.highestmodseq,
-
- internalseq: val.internalseq,
- internalmodseq: val.internalmodseq,
- };
-
- val.mails
- .iter()
- .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags));
-
- Ok(uidindex)
- }
-}
-
-impl Serialize for UidIndex {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- let mut mails = vec![];
- for (ident, (uid, modseq, flags)) in self.table.iter() {
- mails.push((*uid, *modseq, *ident, flags.clone()));
- }
-
- let val = UidIndexSerializedRepr {
- mails,
- uidvalidity: self.uidvalidity,
- uidnext: self.uidnext,
- highestmodseq: self.highestmodseq,
- internalseq: self.internalseq,
- internalmodseq: self.internalmodseq,
- };
-
- val.serialize(serializer)
- }
-}
-
-// ---- TESTS ----
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_uidindex() {
- let mut state = UidIndex::default();
-
- // Add message 1
- {
- let m = UniqueIdent([0x01; 24]);
- let f = vec!["\\Recent".to_string(), "\\Archive".to_string()];
- let ev = state.op_mail_add(m, f);
- state = state.apply(&ev);
-
- // Early checks
- assert_eq!(state.table.len(), 1);
- let (uid, modseq, flags) = state.table.get(&m).unwrap();
- assert_eq!(*uid, NonZeroU32::new(1).unwrap());
- assert_eq!(*modseq, NonZeroU64::new(1).unwrap());
- assert_eq!(flags.len(), 2);
- let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap();
- assert_eq!(&m, ident);
- let recent = state.idx_by_flag.0.get("\\Recent").unwrap();
- assert_eq!(recent.len(), 1);
- assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap());
- assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap());
- assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap());
- }
-
- // Add message 2
- {
- let m = UniqueIdent([0x02; 24]);
- let f = vec!["\\Seen".to_string(), "\\Archive".to_string()];
- let ev = state.op_mail_add(m, f);
- state = state.apply(&ev);
-
- let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
- assert_eq!(archive.len(), 2);
- }
-
- // Add flags to message 1
- {
- let m = UniqueIdent([0x01; 24]);
- let f = vec!["Important".to_string(), "$cl_1".to_string()];
- let ev = state.op_flag_add(m, f);
- state = state.apply(&ev);
- }
-
- // Delete flags from message 1
- {
- let m = UniqueIdent([0x01; 24]);
- let f = vec!["\\Recent".to_string()];
- let ev = state.op_flag_del(m, f);
- state = state.apply(&ev);
-
- let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
- assert_eq!(archive.len(), 2);
- }
-
- // Delete message 2
- {
- let m = UniqueIdent([0x02; 24]);
- let ev = state.op_mail_del(m);
- state = state.apply(&ev);
-
- let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
- assert_eq!(archive.len(), 1);
- }
-
- // Add a message 3 concurrent to message 1 (trigger a uid validity change)
- {
- let m = UniqueIdent([0x03; 24]);
- let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
- let ev = UidIndexOp::MailAdd(
- m,
- NonZeroU32::new(1).unwrap(),
- NonZeroU64::new(1).unwrap(),
- f,
- );
- state = state.apply(&ev);
- }
-
- // Checks
- {
- assert_eq!(state.table.len(), 2);
- assert!(state.uidvalidity > NonZeroU32::new(1).unwrap());
-
- let (last_uid, ident) = state.idx_by_uid.get_max().unwrap();
- assert_eq!(ident, &UniqueIdent([0x03; 24]));
-
- let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
- assert_eq!(archive.len(), 2);
- let mut iter = archive.iter();
- assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap());
- assert_eq!(iter.next().unwrap(), last_uid);
- }
- }
-}
diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs
deleted file mode 100644
index 0e629db..0000000
--- a/src/mail/unique_ident.rs
+++ /dev/null
@@ -1,101 +0,0 @@
-use std::str::FromStr;
-use std::sync::atomic::{AtomicU64, Ordering};
-
-use lazy_static::lazy_static;
-use rand::prelude::*;
-use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
-
-use crate::timestamp::now_msec;
-
-/// An internal Mail Identifier is composed of two components:
-/// - a process identifier, 128 bits, itself composed of:
-/// - the timestamp of when the process started, 64 bits
-/// - a 64-bit random number
-/// - a sequence number, 64 bits
-/// They are not part of the protocol but an internal representation
-/// required by Aerogramme.
-/// Their main property is to be unique without having to rely
-/// on synchronization between IMAP processes.
-#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
-pub struct UniqueIdent(pub [u8; 24]);
-
-struct IdentGenerator {
- pid: u128,
- sn: AtomicU64,
-}
-
-impl IdentGenerator {
- fn new() -> Self {
- let time = now_msec() as u128;
- let rand = thread_rng().gen::<u64>() as u128;
- Self {
- pid: (time << 64) | rand,
- sn: AtomicU64::new(0),
- }
- }
-
- fn gen(&self) -> UniqueIdent {
- let sn = self.sn.fetch_add(1, Ordering::Relaxed);
- let mut res = [0u8; 24];
- res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid));
- res[16..24].copy_from_slice(&u64::to_be_bytes(sn));
- UniqueIdent(res)
- }
-}
-
-lazy_static! {
- static ref GENERATOR: IdentGenerator = IdentGenerator::new();
-}
-
-pub fn gen_ident() -> UniqueIdent {
- GENERATOR.gen()
-}
-
-// -- serde --
-
-impl<'de> Deserialize<'de> for UniqueIdent {
- fn deserialize<D>(d: D) -> Result<Self, D::Error>
- where
- D: Deserializer<'de>,
- {
- let v = String::deserialize(d)?;
- UniqueIdent::from_str(&v).map_err(D::Error::custom)
- }
-}
-
-impl Serialize for UniqueIdent {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- serializer.serialize_str(&self.to_string())
- }
-}
-
-impl std::fmt::Display for UniqueIdent {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{}", hex::encode(self.0))
- }
-}
-
-impl std::fmt::Debug for UniqueIdent {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{}", hex::encode(self.0))
- }
-}
-
-impl FromStr for UniqueIdent {
- type Err = &'static str;
-
- fn from_str(s: &str) -> Result<UniqueIdent, &'static str> {
- let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
-
- if bytes.len() != 24 {
- return Err("bad length");
- }
-
- let mut tmp = [0u8; 24];
- tmp[..].copy_from_slice(&bytes);
- Ok(UniqueIdent(tmp))
- }
-}