From 1a43ce5ac7033c148f64a033f2b1d335e95e11d5 Mon Sep 17 00:00:00 2001
From: Quentin Dufour <quentin@deuxfleurs.fr>
Date: Fri, 8 Mar 2024 08:17:03 +0100
Subject: WIP refactor

---
 aero-collections/mail/incoming.rs     | 445 +++++++++++++++++++++++++++++
 aero-collections/mail/mailbox.rs      | 524 ++++++++++++++++++++++++++++++++++
 aero-collections/mail/mod.rs          |  27 ++
 aero-collections/mail/namespace.rs    | 209 ++++++++++++++
 aero-collections/mail/query.rs        | 137 +++++++++
 aero-collections/mail/snapshot.rs     |  60 ++++
 aero-collections/mail/uidindex.rs     | 474 ++++++++++++++++++++++++++++++
 aero-collections/mail/unique_ident.rs | 101 +++++++
 aero-collections/user.rs              | 313 ++++++++++++++++++++
 9 files changed, 2290 insertions(+)
 create mode 100644 aero-collections/mail/incoming.rs
 create mode 100644 aero-collections/mail/mailbox.rs
 create mode 100644 aero-collections/mail/mod.rs
 create mode 100644 aero-collections/mail/namespace.rs
 create mode 100644 aero-collections/mail/query.rs
 create mode 100644 aero-collections/mail/snapshot.rs
 create mode 100644 aero-collections/mail/uidindex.rs
 create mode 100644 aero-collections/mail/unique_ident.rs
 create mode 100644 aero-collections/user.rs

(limited to 'aero-collections')

diff --git a/aero-collections/mail/incoming.rs b/aero-collections/mail/incoming.rs
new file mode 100644
index 0000000..e2ad97d
--- /dev/null
+++ b/aero-collections/mail/incoming.rs
@@ -0,0 +1,445 @@
+//use std::collections::HashMap;
+use std::convert::TryFrom;
+
+use std::sync::{Arc, Weak};
+use std::time::Duration;
+
+use anyhow::{anyhow, bail, Result};
+use base64::Engine;
+use futures::{future::BoxFuture, FutureExt};
+//use tokio::io::AsyncReadExt;
+use tokio::sync::watch;
+use tracing::{debug, error, info, warn};
+
+use crate::cryptoblob;
+use crate::login::{Credentials, PublicCredentials};
+use crate::mail::mailbox::Mailbox;
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::unique_ident::*;
+use crate::user::User;
+use crate::mail::IMF;
+use crate::storage;
+use crate::timestamp::now_msec;
+
+const INCOMING_PK: &str = "incoming";
+const INCOMING_LOCK_SK: &str = "lock";
+const INCOMING_WATCH_SK: &str = "watch";
+
+const MESSAGE_KEY: &str = "message-key";
+
+// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
+// It is renewed every LOCK_DURATION/3
+// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
+// lost the lock.
+const LOCK_DURATION: Duration = Duration::from_secs(300);
+
+// In addition to checking when notified, also check for new mail every 10 minutes
+const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
+
+pub async fn incoming_mail_watch_process(
+    user: Weak<User>,
+    creds: Credentials,
+    rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) {
+    if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
+        error!("Error in incoming mail watch process: {}", e);
+    }
+}
+
+async fn incoming_mail_watch_process_internal(
+    user: Weak<User>,
+    creds: Credentials,
+    mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) -> Result<()> {
+    let mut lock_held = k2v_lock_loop(
+        creds.storage.build().await?,
+        storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
+    );
+    let storage = creds.storage.build().await?;
+
+    let mut inbox: Option<Arc<Mailbox>> = None;
+    let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+
+    loop {
+        let maybe_updated_incoming_key = if *lock_held.borrow() {
+            debug!("incoming lock held");
+
+            let wait_new_mail = async {
+                loop {
+                    match storage.row_poll(&incoming_key).await {
+                        Ok(row_val) => break row_val.row_ref,
+                        Err(e) => {
+                            error!("Error in wait_new_mail: {}", e);
+                            tokio::time::sleep(Duration::from_secs(30)).await;
+                        }
+                    }
+                }
+            };
+
+            tokio::select! {
+                inc_k = wait_new_mail => Some(inc_k),
+                _     = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
+                _     = lock_held.changed() => None,
+                _     = rx_inbox_id.changed() => None,
+            }
+        } else {
+            debug!("incoming lock not held");
+            tokio::select! {
+                _ = lock_held.changed() => None,
+                _ = rx_inbox_id.changed() => None,
+            }
+        };
+
+        let user = match Weak::upgrade(&user) {
+            Some(user) => user,
+            None => {
+                debug!("User no longer available, exiting incoming loop.");
+                break;
+            }
+        };
+        debug!("User still available");
+
+        // If INBOX no longer is same mailbox, open new mailbox
+        let inbox_id = *rx_inbox_id.borrow();
+        if let Some((id, uidvalidity)) = inbox_id {
+            if Some(id) != inbox.as_ref().map(|b| b.id) {
+                match user.open_mailbox_by_id(id, uidvalidity).await {
+                    Ok(mb) => {
+                        inbox = Some(mb);
+                    }
+                    Err(e) => {
+                        inbox = None;
+                        error!("Error when opening inbox ({}): {}", id, e);
+                        tokio::time::sleep(Duration::from_secs(30)).await;
+                        continue;
+                    }
+                }
+            }
+        }
+
+        // If we were able to open INBOX, and we have mail,
+        // fetch new mail
+        if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
+            match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
+                Ok(()) => {
+                    incoming_key = updated_incoming_key;
+                }
+                Err(e) => {
+                    error!("Could not fetch incoming mail: {}", e);
+                    tokio::time::sleep(Duration::from_secs(30)).await;
+                }
+            }
+        }
+    }
+    drop(rx_inbox_id);
+    Ok(())
+}
+
+async fn handle_incoming_mail(
+    user: &Arc<User>,
+    storage: &storage::Store,
+    inbox: &Arc<Mailbox>,
+    lock_held: &watch::Receiver<bool>,
+) -> Result<()> {
+    let mails_res = storage.blob_list("incoming/").await?;
+
+    for object in mails_res {
+        if !*lock_held.borrow() {
+            break;
+        }
+        let key = object.0;
+        if let Some(mail_id) = key.strip_prefix("incoming/") {
+            if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
+                move_incoming_message(user, storage, inbox, mail_id).await?;
+            }
+        }
+    }
+
+    Ok(())
+}
+
+async fn move_incoming_message(
+    user: &Arc<User>,
+    storage: &storage::Store,
+    inbox: &Arc<Mailbox>,
+    id: UniqueIdent,
+) -> Result<()> {
+    info!("Moving incoming message: {}", id);
+
+    let object_key = format!("incoming/{}", id);
+
+    // 1. Fetch message from S3
+    let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
+
+    // 1.a decrypt message key from headers
+    //info!("Object metadata: {:?}", get_result.metadata);
+    let key_encrypted_b64 = object
+        .meta
+        .get(MESSAGE_KEY)
+        .ok_or(anyhow!("Missing key in metadata"))?;
+    let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
+    let message_key = sodiumoxide::crypto::sealedbox::open(
+        &key_encrypted,
+        &user.creds.keys.public,
+        &user.creds.keys.secret,
+    )
+    .map_err(|_| anyhow!("Cannot decrypt message key"))?;
+    let message_key =
+        cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
+
+    // 1.b retrieve message body
+    let obj_body = object.value;
+    let plain_mail = cryptoblob::open(&obj_body, &message_key)
+        .map_err(|_| anyhow!("Cannot decrypt email content"))?;
+
+    // 2 parse mail and add to inbox
+    let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
+    inbox
+        .append_from_s3(msg, id, object.blob_ref.clone(), message_key)
+        .await?;
+
+    // 3 delete from incoming
+    storage.blob_rm(&object.blob_ref).await?;
+
+    Ok(())
+}
+
+// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
+
+fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
+    let (held_tx, held_rx) = watch::channel(false);
+
+    tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
+
+    held_rx
+}
+
+#[derive(Clone, Debug)]
+enum LockState {
+    Unknown,
+    Empty,
+    Held(UniqueIdent, u64, storage::RowRef),
+}
+
+async fn k2v_lock_loop_internal(
+    storage: storage::Store,
+    row_ref: storage::RowRef,
+    held_tx: watch::Sender<bool>,
+) {
+    let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
+    let mut state_rx_2 = state_rx.clone();
+
+    let our_pid = gen_ident();
+
+    // Loop 1: watch state of lock in K2V, save that in corresponding watch channel
+    let watch_lock_loop: BoxFuture<Result<()>> = async {
+        let mut ct = row_ref.clone();
+        loop {
+            debug!("k2v watch lock loop iter: ct = {:?}", ct);
+            match storage.row_poll(&ct).await {
+                Err(e) => {
+                    error!(
+                        "Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
+                        e
+                    );
+                    state_tx.send(LockState::Unknown)?;
+                    tokio::time::sleep(Duration::from_secs(30)).await;
+                }
+                Ok(cv) => {
+                    let mut lock_state = None;
+                    for v in cv.value.iter() {
+                        if let storage::Alternative::Value(vbytes) = v {
+                            if vbytes.len() == 32 {
+                                let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
+                                let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
+                                if lock_state
+                                    .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
+                                    .unwrap_or(true)
+                                {
+                                    lock_state = Some((pid, ts));
+                                }
+                            }
+                        }
+                    }
+                    let new_ct = cv.row_ref;
+
+                    debug!(
+                        "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
+                        ct, new_ct, lock_state
+                    );
+                    state_tx.send(
+                        lock_state
+                            .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
+                            .unwrap_or(LockState::Empty),
+                    )?;
+                    ct = new_ct;
+                }
+            }
+        }
+    }
+    .boxed();
+
+    // Loop 2: notify user whether we are holding the lock or not
+    let lock_notify_loop: BoxFuture<Result<()>> = async {
+        loop {
+            let now = now_msec();
+            let held_with_expiration_time = match &*state_rx.borrow_and_update() {
+                LockState::Held(pid, ts, _ct) if *pid == our_pid => {
+                    let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
+                    if now < expiration_time {
+                        Some(expiration_time)
+                    } else {
+                        None
+                    }
+                }
+                _ => None,
+            };
+            let held = held_with_expiration_time.is_some();
+            if held != *held_tx.borrow() {
+                held_tx.send(held)?;
+            }
+
+            let await_expired = async {
+                match held_with_expiration_time {
+                    None => futures::future::pending().await,
+                    Some(expiration_time) => {
+                        tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
+                    }
+                };
+            };
+
+            tokio::select!(
+                r = state_rx.changed() => {
+                    r?;
+                }
+                _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
+                _ = await_expired => continue,
+            );
+        }
+    }
+    .boxed();
+
+    // Loop 3: acquire lock when relevant
+    let take_lock_loop: BoxFuture<Result<()>> = async {
+        loop {
+            let now = now_msec();
+            let state: LockState = state_rx_2.borrow_and_update().clone();
+            let (acquire_at, ct) = match state {
+                LockState::Unknown => {
+                    // If state of the lock is unknown, don't try to acquire
+                    state_rx_2.changed().await?;
+                    continue;
+                }
+                LockState::Empty => (now, None),
+                LockState::Held(pid, ts, ct) => {
+                    if pid == our_pid {
+                        (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
+                    } else {
+                        (ts, Some(ct))
+                    }
+                }
+            };
+
+            // Wait until it is time to acquire lock
+            if acquire_at > now {
+                tokio::select!(
+                    r = state_rx_2.changed() => {
+                        // If lock state changed in the meantime, don't acquire and loop around
+                        r?;
+                        continue;
+                    }
+                    _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
+                );
+            }
+
+            // Acquire lock
+            let mut lock = vec![0u8; 32];
+            lock[..8].copy_from_slice(&u64::to_be_bytes(
+                now_msec() + LOCK_DURATION.as_millis() as u64,
+            ));
+            lock[8..].copy_from_slice(&our_pid.0);
+            let row = match ct {
+                Some(existing) => existing,
+                None => row_ref.clone(),
+            };
+            if let Err(e) = storage
+                .row_insert(vec![storage::RowVal::new(row, lock)])
+                .await
+            {
+                error!("Could not take lock: {}", e);
+                tokio::time::sleep(Duration::from_secs(30)).await;
+            }
+
+            // Wait for new information to loop back
+            state_rx_2.changed().await?;
+        }
+    }
+    .boxed();
+
+    let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
+
+    debug!("lock loop exited, releasing");
+
+    if !held_tx.is_closed() {
+        warn!("weird...");
+        let _ = held_tx.send(false);
+    }
+
+    // If lock is ours, release it
+    let release = match &*state_rx.borrow() {
+        LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
+        _ => None,
+    };
+    if let Some(ct) = release {
+        match storage.row_rm(&storage::Selector::Single(&ct)).await {
+            Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
+            Ok(_) => (),
+        };
+    }
+}
+
+// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
+
+pub struct EncryptedMessage {
+    key: cryptoblob::Key,
+    encrypted_body: Vec<u8>,
+}
+
+impl EncryptedMessage {
+    pub fn new(body: Vec<u8>) -> Result<Self> {
+        let key = cryptoblob::gen_key();
+        let encrypted_body = cryptoblob::seal(&body, &key)?;
+        Ok(Self {
+            key,
+            encrypted_body,
+        })
+    }
+
+    pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
+        let storage = creds.storage.build().await?;
+
+        // Get causality token of previous watch key
+        let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+        let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
+            Err(_) => query,
+            Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
+        };
+
+        // Write mail to encrypted storage
+        let encrypted_key =
+            sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
+        let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
+
+        let blob_val = storage::BlobVal::new(
+            storage::BlobRef(format!("incoming/{}", gen_ident())),
+            self.encrypted_body.clone().into(),
+        )
+        .with_meta(MESSAGE_KEY.to_string(), key_header);
+        storage.blob_insert(blob_val).await?;
+
+        // Update watch key to signal new mail
+        let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
+        storage.row_insert(vec![watch_val]).await?;
+        Ok(())
+    }
+}
diff --git a/aero-collections/mail/mailbox.rs b/aero-collections/mail/mailbox.rs
new file mode 100644
index 0000000..d1a5473
--- /dev/null
+++ b/aero-collections/mail/mailbox.rs
@@ -0,0 +1,524 @@
+use anyhow::{anyhow, bail, Result};
+use serde::{Deserialize, Serialize};
+use tokio::sync::RwLock;
+
+use crate::bayou::Bayou;
+use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
+use crate::login::Credentials;
+use crate::mail::uidindex::*;
+use crate::mail::unique_ident::*;
+use crate::mail::IMF;
+use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
+use crate::timestamp::now_msec;
+
+pub struct Mailbox {
+    pub(super) id: UniqueIdent,
+    mbox: RwLock<MailboxInternal>,
+}
+
+impl Mailbox {
+    pub(crate) async fn open(
+        creds: &Credentials,
+        id: UniqueIdent,
+        min_uidvalidity: ImapUidvalidity,
+    ) -> Result<Self> {
+        let index_path = format!("index/{}", id);
+        let mail_path = format!("mail/{}", id);
+
+        let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
+        uid_index.sync().await?;
+
+        let uidvalidity = uid_index.state().uidvalidity;
+        if uidvalidity < min_uidvalidity {
+            uid_index
+                .push(
+                    uid_index
+                        .state()
+                        .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
+                )
+                .await?;
+        }
+
+        // @FIXME reporting through opentelemetry or some logs
+        // info on the "shape" of the mailbox would be welcomed
+        /*
+        dump(&uid_index);
+        */
+
+        let mbox = RwLock::new(MailboxInternal {
+            id,
+            encryption_key: creds.keys.master.clone(),
+            storage: creds.storage.build().await?,
+            uid_index,
+            mail_path,
+        });
+
+        Ok(Self { id, mbox })
+    }
+
+    /// Sync data with backing store
+    pub async fn force_sync(&self) -> Result<()> {
+        self.mbox.write().await.force_sync().await
+    }
+
+    /// Sync data with backing store only if changes are detected
+    /// or last sync is too old
+    pub async fn opportunistic_sync(&self) -> Result<()> {
+        self.mbox.write().await.opportunistic_sync().await
+    }
+
+    /// Block until a sync has been done (due to changes in the event log)
+    pub async fn notify(&self) -> std::sync::Weak<tokio::sync::Notify> {
+        self.mbox.read().await.notifier()
+    }
+
+    // ---- Functions for reading the mailbox ----
+
+    /// Get a clone of the current UID Index of this mailbox
+    /// (cloning is cheap so don't hesitate to use this)
+    pub async fn current_uid_index(&self) -> UidIndex {
+        self.mbox.read().await.uid_index.state().clone()
+    }
+
+    /// Fetch the metadata (headers + some more info) of the specified
+    /// mail IDs
+    pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+        self.mbox.read().await.fetch_meta(ids).await
+    }
+
+    /// Fetch an entire e-mail
+    pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+        self.mbox.read().await.fetch_full(id, message_key).await
+    }
+
+    pub async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox {
+        super::snapshot::FrozenMailbox::new(self.clone()).await
+    }
+
+    // ---- Functions for changing the mailbox ----
+
+    /// Add flags to message
+    pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+        self.mbox.write().await.add_flags(id, flags).await
+    }
+
+    /// Delete flags from message
+    pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+        self.mbox.write().await.del_flags(id, flags).await
+    }
+
+    /// Define the new flags for this message
+    pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+        self.mbox.write().await.set_flags(id, flags).await
+    }
+
+    /// Insert an email into the mailbox
+    pub async fn append<'a>(
+        &self,
+        msg: IMF<'a>,
+        ident: Option<UniqueIdent>,
+        flags: &[Flag],
+    ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
+        self.mbox.write().await.append(msg, ident, flags).await
+    }
+
+    /// Insert an email into the mailbox, copying it from an existing S3 object
+    pub async fn append_from_s3<'a>(
+        &self,
+        msg: IMF<'a>,
+        ident: UniqueIdent,
+        blob_ref: storage::BlobRef,
+        message_key: Key,
+    ) -> Result<()> {
+        self.mbox
+            .write()
+            .await
+            .append_from_s3(msg, ident, blob_ref, message_key)
+            .await
+    }
+
+    /// Delete a message definitively from the mailbox
+    pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
+        self.mbox.write().await.delete(id).await
+    }
+
+    /// Copy an email from an other Mailbox to this mailbox
+    /// (use this when possible, as it allows for a certain number of storage optimizations)
+    pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<UniqueIdent> {
+        if self.id == from.id {
+            bail!("Cannot copy into same mailbox");
+        }
+
+        let (mut selflock, fromlock);
+        if self.id < from.id {
+            selflock = self.mbox.write().await;
+            fromlock = from.mbox.write().await;
+        } else {
+            fromlock = from.mbox.write().await;
+            selflock = self.mbox.write().await;
+        };
+        selflock.copy_from(&fromlock, uuid).await
+    }
+
+    /// Move an email from an other Mailbox to this mailbox
+    /// (use this when possible, as it allows for a certain number of storage optimizations)
+    pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
+        if self.id == from.id {
+            bail!("Cannot copy move same mailbox");
+        }
+
+        let (mut selflock, mut fromlock);
+        if self.id < from.id {
+            selflock = self.mbox.write().await;
+            fromlock = from.mbox.write().await;
+        } else {
+            fromlock = from.mbox.write().await;
+            selflock = self.mbox.write().await;
+        };
+        selflock.move_from(&mut fromlock, uuid).await
+    }
+}
+
+// ----
+
+// Non standard but common flags:
+// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
+struct MailboxInternal {
+    // 2023-05-15 will probably be used later.
+    #[allow(dead_code)]
+    id: UniqueIdent,
+    mail_path: String,
+    encryption_key: Key,
+    storage: Store,
+    uid_index: Bayou<UidIndex>,
+}
+
+impl MailboxInternal {
+    async fn force_sync(&mut self) -> Result<()> {
+        self.uid_index.sync().await?;
+        Ok(())
+    }
+
+    async fn opportunistic_sync(&mut self) -> Result<()> {
+        self.uid_index.opportunistic_sync().await?;
+        Ok(())
+    }
+
+    fn notifier(&self) -> std::sync::Weak<tokio::sync::Notify> {
+        self.uid_index.notifier()
+    }
+
+    // ---- Functions for reading the mailbox ----
+
+    async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+        let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
+        let ops = ids
+            .iter()
+            .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
+            .collect::<Vec<_>>();
+        let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
+
+        let mut meta_vec = vec![];
+        for res in res_vec.into_iter() {
+            let mut meta_opt = None;
+
+            // Resolve conflicts
+            for v in res.value.iter() {
+                match v {
+                    storage::Alternative::Tombstone => (),
+                    storage::Alternative::Value(v) => {
+                        let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
+                        match meta_opt.as_mut() {
+                            None => {
+                                meta_opt = Some(meta);
+                            }
+                            Some(prevmeta) => {
+                                prevmeta.try_merge(meta)?;
+                            }
+                        }
+                    }
+                }
+            }
+            if let Some(meta) = meta_opt {
+                meta_vec.push(meta);
+            } else {
+                bail!("No valid meta value in k2v for {:?}", res.row_ref);
+            }
+        }
+
+        Ok(meta_vec)
+    }
+
+    async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+        let obj_res = self
+            .storage
+            .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
+            .await?;
+        let body = obj_res.value;
+        cryptoblob::open(&body, message_key)
+    }
+
+    // ---- Functions for changing the mailbox ----
+
+    async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+        let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec());
+        self.uid_index.push(add_flag_op).await
+    }
+
+    async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+        let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec());
+        self.uid_index.push(del_flag_op).await
+    }
+
+    async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+        let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec());
+        self.uid_index.push(set_flag_op).await
+    }
+
+    async fn append(
+        &mut self,
+        mail: IMF<'_>,
+        ident: Option<UniqueIdent>,
+        flags: &[Flag],
+    ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
+        let ident = ident.unwrap_or_else(gen_ident);
+        let message_key = gen_key();
+
+        futures::try_join!(
+            async {
+                // Encrypt and save mail body
+                let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
+                self.storage
+                    .blob_insert(BlobVal::new(
+                        BlobRef(format!("{}/{}", self.mail_path, ident)),
+                        message_blob,
+                    ))
+                    .await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            async {
+                // Save mail meta
+                let meta = MailMeta {
+                    internaldate: now_msec(),
+                    headers: mail.parsed.raw_headers.to_vec(),
+                    message_key: message_key.clone(),
+                    rfc822_size: mail.raw.len(),
+                };
+                let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
+                self.storage
+                    .row_insert(vec![RowVal::new(
+                        RowRef::new(&self.mail_path, &ident.to_string()),
+                        meta_blob,
+                    )])
+                    .await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            self.uid_index.opportunistic_sync()
+        )?;
+
+        // Add mail to Bayou mail index
+        let uid_state = self.uid_index.state();
+        let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
+
+        let uidvalidity = uid_state.uidvalidity;
+        let (uid, modseq) = match add_mail_op {
+            UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq),
+            _ => unreachable!(),
+        };
+
+        self.uid_index.push(add_mail_op).await?;
+
+        Ok((uidvalidity, uid, modseq))
+    }
+
+    async fn append_from_s3<'a>(
+        &mut self,
+        mail: IMF<'a>,
+        ident: UniqueIdent,
+        blob_src: storage::BlobRef,
+        message_key: Key,
+    ) -> Result<()> {
+        futures::try_join!(
+            async {
+                // Copy mail body from previous location
+                let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
+                self.storage.blob_copy(&blob_src, &blob_dst).await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            async {
+                // Save mail meta
+                let meta = MailMeta {
+                    internaldate: now_msec(),
+                    headers: mail.parsed.raw_headers.to_vec(),
+                    message_key: message_key.clone(),
+                    rfc822_size: mail.raw.len(),
+                };
+                let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
+                self.storage
+                    .row_insert(vec![RowVal::new(
+                        RowRef::new(&self.mail_path, &ident.to_string()),
+                        meta_blob,
+                    )])
+                    .await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            self.uid_index.opportunistic_sync()
+        )?;
+
+        // Add mail to Bayou mail index
+        let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]);
+        self.uid_index.push(add_mail_op).await?;
+
+        Ok(())
+    }
+
+    async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
+        if !self.uid_index.state().table.contains_key(&ident) {
+            bail!("Cannot delete mail that doesn't exit");
+        }
+
+        let del_mail_op = self.uid_index.state().op_mail_del(ident);
+        self.uid_index.push(del_mail_op).await?;
+
+        futures::try_join!(
+            async {
+                // Delete mail body from S3
+                self.storage
+                    .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
+                    .await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            async {
+                // Delete mail meta from K2V
+                let sk = ident.to_string();
+                let res = self
+                    .storage
+                    .row_fetch(&storage::Selector::Single(&RowRef::new(
+                        &self.mail_path,
+                        &sk,
+                    )))
+                    .await?;
+                if let Some(row_val) = res.into_iter().next() {
+                    self.storage
+                        .row_rm(&storage::Selector::Single(&row_val.row_ref))
+                        .await?;
+                }
+                Ok::<_, anyhow::Error>(())
+            }
+        )?;
+        Ok(())
+    }
+
+    async fn copy_from(
+        &mut self,
+        from: &MailboxInternal,
+        source_id: UniqueIdent,
+    ) -> Result<UniqueIdent> {
+        let new_id = gen_ident();
+        self.copy_internal(from, source_id, new_id).await?;
+        Ok(new_id)
+    }
+
+    async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
+        self.copy_internal(from, id, id).await?;
+        from.delete(id).await?;
+        Ok(())
+    }
+
+    async fn copy_internal(
+        &mut self,
+        from: &MailboxInternal,
+        source_id: UniqueIdent,
+        new_id: UniqueIdent,
+    ) -> Result<()> {
+        if self.encryption_key != from.encryption_key {
+            bail!("Message to be copied/moved does not belong to same account.");
+        }
+
+        let flags = from
+            .uid_index
+            .state()
+            .table
+            .get(&source_id)
+            .ok_or(anyhow!("Source mail not found"))?
+            .2
+            .clone();
+
+        futures::try_join!(
+            async {
+                let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
+                let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
+                self.storage.blob_copy(&src, &dst).await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            async {
+                // Copy mail meta in K2V
+                let meta = &from.fetch_meta(&[source_id]).await?[0];
+                let meta_blob = seal_serialize(meta, &self.encryption_key)?;
+                self.storage
+                    .row_insert(vec![RowVal::new(
+                        RowRef::new(&self.mail_path, &new_id.to_string()),
+                        meta_blob,
+                    )])
+                    .await?;
+                Ok::<_, anyhow::Error>(())
+            },
+            self.uid_index.opportunistic_sync(),
+        )?;
+
+        // Add mail to Bayou mail index
+        let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags);
+        self.uid_index.push(add_mail_op).await?;
+
+        Ok(())
+    }
+}
+
+// Can be useful to debug so we want this code
+// to be available to developers
+#[allow(dead_code)]
+fn dump(uid_index: &Bayou<UidIndex>) {
+    let s = uid_index.state();
+    println!("---- MAILBOX STATE ----");
+    println!("UIDVALIDITY {}", s.uidvalidity);
+    println!("UIDNEXT {}", s.uidnext);
+    println!("INTERNALSEQ {}", s.internalseq);
+    for (uid, ident) in s.idx_by_uid.iter() {
+        println!(
+            "{} {} {}",
+            uid,
+            hex::encode(ident.0),
+            s.table.get(ident).cloned().unwrap().2.join(", ")
+        );
+    }
+    println!();
+}
+
+// ----
+
+/// The metadata of a message that is stored in K2V
+/// at pk = mail/<mailbox uuid>, sk = <message uuid>
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MailMeta {
+    /// INTERNALDATE field (milliseconds since epoch)
+    pub internaldate: u64,
+    /// Headers of the message
+    pub headers: Vec<u8>,
+    /// Secret key for decrypting entire message
+    pub message_key: Key,
+    /// RFC822 size
+    pub rfc822_size: usize,
+}
+
+impl MailMeta {
+    fn try_merge(&mut self, other: Self) -> Result<()> {
+        if self.headers != other.headers
+            || self.message_key != other.message_key
+            || self.rfc822_size != other.rfc822_size
+        {
+            bail!("Conflicting MailMeta values.");
+        }
+        self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
+        Ok(())
+    }
+}
diff --git a/aero-collections/mail/mod.rs b/aero-collections/mail/mod.rs
new file mode 100644
index 0000000..03e85cd
--- /dev/null
+++ b/aero-collections/mail/mod.rs
@@ -0,0 +1,27 @@
+use std::convert::TryFrom;
+
+pub mod incoming;
+pub mod mailbox;
+pub mod query;
+pub mod snapshot;
+pub mod uidindex;
+pub mod unique_ident;
+pub mod namespace;
+
+// Internet Message Format
+// aka RFC 822 - RFC 2822 - RFC 5322
+// 2023-05-15 don't want to refactor this struct now.
+#[allow(clippy::upper_case_acronyms)]
+pub struct IMF<'a> {
+    raw: &'a [u8],
+    parsed: eml_codec::part::composite::Message<'a>,
+}
+
+impl<'a> TryFrom<&'a [u8]> for IMF<'a> {
+    type Error = ();
+
+    fn try_from(body: &'a [u8]) -> Result<IMF<'a>, ()> {
+        let parsed = eml_codec::parse_message(body).or(Err(()))?.1;
+        Ok(Self { raw: body, parsed })
+    }
+}
diff --git a/aero-collections/mail/namespace.rs b/aero-collections/mail/namespace.rs
new file mode 100644
index 0000000..5e67173
--- /dev/null
+++ b/aero-collections/mail/namespace.rs
@@ -0,0 +1,209 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, Weak};
+
+use anyhow::{anyhow, bail, Result};
+use lazy_static::lazy_static;
+use serde::{Deserialize, Serialize};
+use tokio::sync::watch;
+
+use crate::cryptoblob::{open_deserialize, seal_serialize};
+use crate::login::Credentials;
+use crate::mail::incoming::incoming_mail_watch_process;
+use crate::mail::mailbox::Mailbox;
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::unique_ident::{gen_ident, UniqueIdent};
+use crate::storage;
+use crate::timestamp::now_msec;
+
+pub const MAILBOX_HIERARCHY_DELIMITER: char = '.';
+
+/// INBOX is the only mailbox that must always exist.
+/// It is created automatically when the account is created.
+/// IMAP allows the user to rename INBOX to something else,
+/// in this case all messages from INBOX are moved to a mailbox
+/// with the new name and the INBOX mailbox still exists and is empty.
+/// In our implementation, we indeed move the underlying mailbox
+/// to the new name (i.e. the new name has the same id as the previous
+/// INBOX), and we create a new empty mailbox for INBOX.
+pub const INBOX: &str = "INBOX";
+
+/// For convenience purpose, we also create some special mailbox
+/// that are described in RFC6154 SPECIAL-USE
+/// @FIXME maybe it should be a configuration parameter
+/// @FIXME maybe we should have a per-mailbox flag mechanism, either an enum or a string, so we
+/// track which mailbox is used for what.
+/// @FIXME Junk could be useful but we don't have any antispam solution yet so...
+/// @FIXME IMAP supports virtual mailbox. \All or \Flagged are intended to be virtual mailboxes.
+/// \Trash might be one, or not one. I don't know what we should do there.
+pub const DRAFTS: &str = "Drafts";
+pub const ARCHIVE: &str = "Archive";
+pub const SENT: &str = "Sent";
+pub const TRASH: &str = "Trash";
+
+pub(crate) const MAILBOX_LIST_PK: &str = "mailboxes";
+pub(crate) const MAILBOX_LIST_SK: &str = "list";
+
+// ---- User's mailbox list (serialized in K2V) ----
+
+#[derive(Serialize, Deserialize)]
+pub(crate) struct MailboxList(BTreeMap<String, MailboxListEntry>);
+
+#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
+pub(crate) struct MailboxListEntry {
+    id_lww: (u64, Option<UniqueIdent>),
+    uidvalidity: ImapUidvalidity,
+}
+
+impl MailboxListEntry {
+    fn merge(&mut self, other: &Self) {
+        // Simple CRDT merge rule
+        if other.id_lww.0 > self.id_lww.0
+            || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
+        {
+            self.id_lww = other.id_lww;
+        }
+        self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity);
+    }
+}
+
+impl MailboxList {
+    pub(crate) fn new() -> Self {
+        Self(BTreeMap::new())
+    }
+
+    pub(crate) fn merge(&mut self, list2: Self) {
+        for (k, v) in list2.0.into_iter() {
+            if let Some(e) = self.0.get_mut(&k) {
+                e.merge(&v);
+            } else {
+                self.0.insert(k, v);
+            }
+        }
+    }
+
+    pub(crate) fn existing_mailbox_names(&self) -> Vec<String> {
+        self.0
+            .iter()
+            .filter(|(_, v)| v.id_lww.1.is_some())
+            .map(|(k, _)| k.to_string())
+            .collect()
+    }
+
+    pub(crate) fn has_mailbox(&self, name: &str) -> bool {
+        matches!(
+            self.0.get(name),
+            Some(MailboxListEntry {
+                id_lww: (_, Some(_)),
+                ..
+            })
+        )
+    }
+
+    pub(crate) fn get_mailbox(&self, name: &str) -> Option<(ImapUidvalidity, Option<UniqueIdent>)> {
+        self.0.get(name).map(
+            |MailboxListEntry {
+                 id_lww: (_, mailbox_id),
+                 uidvalidity,
+             }| (*uidvalidity, *mailbox_id),
+        )
+    }
+
+    /// Ensures mailbox `name` maps to id `id`.
+    /// If it already mapped to that, returns None.
+    /// If a change had to be done, returns Some(new uidvalidity in mailbox).
+    pub(crate) fn set_mailbox(&mut self, name: &str, id: Option<UniqueIdent>) -> Option<ImapUidvalidity> {
+        let (ts, id, uidvalidity) = match self.0.get_mut(name) {
+            None => {
+                if id.is_none() {
+                    return None;
+                } else {
+                    (now_msec(), id, ImapUidvalidity::new(1).unwrap())
+                }
+            }
+            Some(MailboxListEntry {
+                id_lww,
+                uidvalidity,
+            }) => {
+                if id_lww.1 == id {
+                    return None;
+                } else {
+                    (
+                        std::cmp::max(id_lww.0 + 1, now_msec()),
+                        id,
+                        ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(),
+                    )
+                }
+            }
+        };
+
+        self.0.insert(
+            name.into(),
+            MailboxListEntry {
+                id_lww: (ts, id),
+                uidvalidity,
+            },
+        );
+        Some(uidvalidity)
+    }
+
+    pub(crate) fn update_uidvalidity(&mut self, name: &str, new_uidvalidity: ImapUidvalidity) {
+        match self.0.get_mut(name) {
+            None => {
+                self.0.insert(
+                    name.into(),
+                    MailboxListEntry {
+                        id_lww: (now_msec(), None),
+                        uidvalidity: new_uidvalidity,
+                    },
+                );
+            }
+            Some(MailboxListEntry { uidvalidity, .. }) => {
+                *uidvalidity = std::cmp::max(*uidvalidity, new_uidvalidity);
+            }
+        }
+    }
+
+    pub(crate) fn create_mailbox(&mut self, name: &str) -> CreatedMailbox {
+        if let Some(MailboxListEntry {
+            id_lww: (_, Some(id)),
+            uidvalidity,
+        }) = self.0.get(name)
+        {
+            return CreatedMailbox::Existed(*id, *uidvalidity);
+        }
+
+        let id = gen_ident();
+        let uidvalidity = self.set_mailbox(name, Some(id)).unwrap();
+        CreatedMailbox::Created(id, uidvalidity)
+    }
+
+    pub(crate) fn rename_mailbox(&mut self, old_name: &str, new_name: &str) -> Result<()> {
+        if let Some((uidvalidity, Some(mbid))) = self.get_mailbox(old_name) {
+            if self.has_mailbox(new_name) {
+                bail!(
+                    "Cannot rename {} into {}: {} already exists",
+                    old_name,
+                    new_name,
+                    new_name
+                );
+            }
+
+            self.set_mailbox(old_name, None);
+            self.set_mailbox(new_name, Some(mbid));
+            self.update_uidvalidity(new_name, uidvalidity);
+            Ok(())
+        } else {
+            bail!(
+                "Cannot rename {} into {}: {} doesn't exist",
+                old_name,
+                new_name,
+                old_name
+            );
+        }
+    }
+}
+
+pub(crate) enum CreatedMailbox {
+    Created(UniqueIdent, ImapUidvalidity),
+    Existed(UniqueIdent, ImapUidvalidity),
+}
diff --git a/aero-collections/mail/query.rs b/aero-collections/mail/query.rs
new file mode 100644
index 0000000..3e6fe99
--- /dev/null
+++ b/aero-collections/mail/query.rs
@@ -0,0 +1,137 @@
+use super::mailbox::MailMeta;
+use super::snapshot::FrozenMailbox;
+use super::unique_ident::UniqueIdent;
+use anyhow::Result;
+use futures::future::FutureExt;
+use futures::stream::{BoxStream, Stream, StreamExt};
+
+/// Query is in charge of fetching efficiently
+/// requested data for a list of emails
+pub struct Query<'a, 'b> {
+    pub frozen: &'a FrozenMailbox,
+    pub emails: &'b [UniqueIdent],
+    pub scope: QueryScope,
+}
+
+#[derive(Debug)]
+pub enum QueryScope {
+    Index,
+    Partial,
+    Full,
+}
+impl QueryScope {
+    pub fn union(&self, other: &QueryScope) -> QueryScope {
+        match (self, other) {
+            (QueryScope::Full, _) | (_, QueryScope::Full) => QueryScope::Full,
+            (QueryScope::Partial, _) | (_, QueryScope::Partial) => QueryScope::Partial,
+            (QueryScope::Index, QueryScope::Index) => QueryScope::Index,
+        }
+    }
+}
+
+//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>;
+
+impl<'a, 'b> Query<'a, 'b> {
+    pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
+        match self.scope {
+            QueryScope::Index => Box::pin(
+                futures::stream::iter(self.emails)
+                    .map(|&uuid| Ok(QueryResult::IndexResult { uuid })),
+            ),
+            QueryScope::Partial => Box::pin(self.partial()),
+            QueryScope::Full => Box::pin(self.full()),
+        }
+    }
+
+    // --- functions below are private *for reasons*
+    fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+        async move {
+            let maybe_meta_list: Result<Vec<MailMeta>> =
+                self.frozen.mailbox.fetch_meta(self.emails).await;
+            let list_res = maybe_meta_list
+                .map(|meta_list| {
+                    meta_list
+                        .into_iter()
+                        .zip(self.emails)
+                        .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
+                        .collect()
+                })
+                .unwrap_or_else(|e| vec![Err(e)]);
+
+            futures::stream::iter(list_res)
+        }
+        .flatten_stream()
+    }
+
+    fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+        self.partial().then(move |maybe_meta| async move {
+            let meta = maybe_meta?;
+
+            let content = self
+                .frozen
+                .mailbox
+                .fetch_full(
+                    *meta.uuid(),
+                    &meta
+                        .metadata()
+                        .expect("meta to be PartialResult")
+                        .message_key,
+                )
+                .await?;
+
+            Ok(meta.into_full(content).expect("meta to be PartialResult"))
+        })
+    }
+}
+
+#[derive(Debug, Clone)]
+pub enum QueryResult {
+    IndexResult {
+        uuid: UniqueIdent,
+    },
+    PartialResult {
+        uuid: UniqueIdent,
+        metadata: MailMeta,
+    },
+    FullResult {
+        uuid: UniqueIdent,
+        metadata: MailMeta,
+        content: Vec<u8>,
+    },
+}
+impl QueryResult {
+    pub fn uuid(&self) -> &UniqueIdent {
+        match self {
+            Self::IndexResult { uuid, .. } => uuid,
+            Self::PartialResult { uuid, .. } => uuid,
+            Self::FullResult { uuid, .. } => uuid,
+        }
+    }
+
+    pub fn metadata(&self) -> Option<&MailMeta> {
+        match self {
+            Self::IndexResult { .. } => None,
+            Self::PartialResult { metadata, .. } => Some(metadata),
+            Self::FullResult { metadata, .. } => Some(metadata),
+        }
+    }
+
+    #[allow(dead_code)]
+    pub fn content(&self) -> Option<&[u8]> {
+        match self {
+            Self::FullResult { content, .. } => Some(content),
+            _ => None,
+        }
+    }
+
+    fn into_full(self, content: Vec<u8>) -> Option<Self> {
+        match self {
+            Self::PartialResult { uuid, metadata } => Some(Self::FullResult {
+                uuid,
+                metadata,
+                content,
+            }),
+            _ => None,
+        }
+    }
+}
diff --git a/aero-collections/mail/snapshot.rs b/aero-collections/mail/snapshot.rs
new file mode 100644
index 0000000..ed756b5
--- /dev/null
+++ b/aero-collections/mail/snapshot.rs
@@ -0,0 +1,60 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+
+use super::mailbox::Mailbox;
+use super::query::{Query, QueryScope};
+use super::uidindex::UidIndex;
+use super::unique_ident::UniqueIdent;
+
+/// A Frozen Mailbox has a snapshot of the current mailbox
+/// state that is desynchronized with the real mailbox state.
+/// It's up to the user to choose when their snapshot must be updated
+/// to give useful information to their clients
+pub struct FrozenMailbox {
+    pub mailbox: Arc<Mailbox>,
+    pub snapshot: UidIndex,
+}
+
+impl FrozenMailbox {
+    /// Create a snapshot from a mailbox, the mailbox + the snapshot
+    /// becomes the "Frozen Mailbox".
+    pub async fn new(mailbox: Arc<Mailbox>) -> Self {
+        let state = mailbox.current_uid_index().await;
+
+        Self {
+            mailbox,
+            snapshot: state,
+        }
+    }
+
+    /// Force the synchronization of the inner mailbox
+    /// but do not update the local snapshot
+    pub async fn sync(&self) -> Result<()> {
+        self.mailbox.opportunistic_sync().await
+    }
+
+    /// Peek snapshot without updating the frozen mailbox
+    /// Can be useful if you want to plan some writes
+    /// while sending a diff to the client later
+    pub async fn peek(&self) -> UidIndex {
+        self.mailbox.current_uid_index().await
+    }
+
+    /// Update the FrozenMailbox local snapshot.
+    /// Returns the old snapshot, so you can build a diff
+    pub async fn update(&mut self) -> UidIndex {
+        let old_snapshot = self.snapshot.clone();
+        self.snapshot = self.mailbox.current_uid_index().await;
+
+        old_snapshot
+    }
+
+    pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> {
+        Query {
+            frozen: self,
+            emails: uuids,
+            scope,
+        }
+    }
+}
diff --git a/aero-collections/mail/uidindex.rs b/aero-collections/mail/uidindex.rs
new file mode 100644
index 0000000..5a06670
--- /dev/null
+++ b/aero-collections/mail/uidindex.rs
@@ -0,0 +1,474 @@
+use std::num::{NonZeroU32, NonZeroU64};
+
+use im::{HashMap, OrdMap, OrdSet};
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::bayou::*;
+use crate::mail::unique_ident::UniqueIdent;
+
+pub type ModSeq = NonZeroU64;
+pub type ImapUid = NonZeroU32;
+pub type ImapUidvalidity = NonZeroU32;
+pub type Flag = String;
+pub type IndexEntry = (ImapUid, ModSeq, Vec<Flag>);
+
+/// A UidIndex handles the mutable part of a mailbox
+/// It is built by running the event log on it
+/// Each applied log generates a new UidIndex by cloning the previous one
+/// and applying the event. This is why we use immutable datastructures:
+/// they are cheap to clone.
+#[derive(Clone)]
+pub struct UidIndex {
+    // Source of trust
+    pub table: OrdMap<UniqueIdent, IndexEntry>,
+
+    // Indexes optimized for queries
+    pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>,
+    pub idx_by_modseq: OrdMap<ModSeq, UniqueIdent>,
+    pub idx_by_flag: FlagIndex,
+
+    // "Public" Counters
+    pub uidvalidity: ImapUidvalidity,
+    pub uidnext: ImapUid,
+    pub highestmodseq: ModSeq,
+
+    // "Internal" Counters
+    pub internalseq: ImapUid,
+    pub internalmodseq: ModSeq,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub enum UidIndexOp {
+    MailAdd(UniqueIdent, ImapUid, ModSeq, Vec<Flag>),
+    MailDel(UniqueIdent),
+    FlagAdd(UniqueIdent, ModSeq, Vec<Flag>),
+    FlagDel(UniqueIdent, ModSeq, Vec<Flag>),
+    FlagSet(UniqueIdent, ModSeq, Vec<Flag>),
+    BumpUidvalidity(u32),
+}
+
+impl UidIndex {
+    #[must_use]
+    pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+        UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags)
+    }
+
+    #[must_use]
+    pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp {
+        UidIndexOp::MailDel(ident)
+    }
+
+    #[must_use]
+    pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+        UidIndexOp::FlagAdd(ident, self.internalmodseq, flags)
+    }
+
+    #[must_use]
+    pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+        UidIndexOp::FlagDel(ident, self.internalmodseq, flags)
+    }
+
+    #[must_use]
+    pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+        UidIndexOp::FlagSet(ident, self.internalmodseq, flags)
+    }
+
+    #[must_use]
+    pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp {
+        UidIndexOp::BumpUidvalidity(count)
+    }
+
+    // INTERNAL functions to keep state consistent
+
+    fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) {
+        // Insert the email in our table
+        self.table.insert(ident, (uid, modseq, flags.to_owned()));
+
+        // Update the indexes/caches
+        self.idx_by_uid.insert(uid, ident);
+        self.idx_by_flag.insert(uid, flags);
+        self.idx_by_modseq.insert(modseq, ident);
+    }
+
+    fn unreg_email(&mut self, ident: &UniqueIdent) {
+        // We do nothing if the mail does not exist
+        let (uid, modseq, flags) = match self.table.get(ident) {
+            Some(v) => v,
+            None => return,
+        };
+
+        // Delete all cache entries
+        self.idx_by_uid.remove(uid);
+        self.idx_by_flag.remove(*uid, flags);
+        self.idx_by_modseq.remove(modseq);
+
+        // Remove from source of trust
+        self.table.remove(ident);
+    }
+}
+
+impl Default for UidIndex {
+    fn default() -> Self {
+        Self {
+            table: OrdMap::new(),
+
+            idx_by_uid: OrdMap::new(),
+            idx_by_modseq: OrdMap::new(),
+            idx_by_flag: FlagIndex::new(),
+
+            uidvalidity: NonZeroU32::new(1).unwrap(),
+            uidnext: NonZeroU32::new(1).unwrap(),
+            highestmodseq: NonZeroU64::new(1).unwrap(),
+
+            internalseq: NonZeroU32::new(1).unwrap(),
+            internalmodseq: NonZeroU64::new(1).unwrap(),
+        }
+    }
+}
+
+impl BayouState for UidIndex {
+    type Op = UidIndexOp;
+
+    fn apply(&self, op: &UidIndexOp) -> Self {
+        let mut new = self.clone();
+        match op {
+            UidIndexOp::MailAdd(ident, uid, modseq, flags) => {
+                // Change UIDValidity if there is a UID conflict or a MODSEQ conflict
+                // @FIXME Need to prove that summing work
+                // The intuition: we increase the UIDValidity by the number of possible conflicts
+                if *uid < new.internalseq || *modseq < new.internalmodseq {
+                    let bump_uid = new.internalseq.get() - uid.get();
+                    let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
+                    new.uidvalidity =
+                        NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap();
+                }
+
+                // Assign the real uid of the email
+                let new_uid = new.internalseq;
+
+                // Assign the real modseq of the email and its new flags
+                let new_modseq = new.internalmodseq;
+
+                // Delete the previous entry if any.
+                // Our proof has no assumption on `ident` uniqueness,
+                // so we must handle this case even it is very unlikely
+                // In this case, we overwrite the email.
+                // Note: assigning a new UID is mandatory.
+                new.unreg_email(ident);
+
+                // We record our email and update ou caches
+                new.reg_email(*ident, new_uid, new_modseq, flags);
+
+                // Update counters
+                new.highestmodseq = new.internalmodseq;
+
+                new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
+                new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+
+                new.uidnext = new.internalseq;
+            }
+            UidIndexOp::MailDel(ident) => {
+                // If the email is known locally, we remove its references in all our indexes
+                new.unreg_email(ident);
+
+                // We update the counter
+                new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
+            }
+            UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => {
+                if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
+                    // Bump UIDValidity if required
+                    if *candidate_modseq < new.internalmodseq {
+                        let bump_modseq =
+                            (new.internalmodseq.get() - candidate_modseq.get()) as u32;
+                        new.uidvalidity =
+                            NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
+                    }
+
+                    // Add flags to the source of trust and the cache
+                    let mut to_add: Vec<Flag> = new_flags
+                        .iter()
+                        .filter(|f| !existing_flags.contains(f))
+                        .cloned()
+                        .collect();
+                    new.idx_by_flag.insert(*uid, &to_add);
+                    *email_modseq = new.internalmodseq;
+                    new.idx_by_modseq.insert(new.internalmodseq, *ident);
+                    existing_flags.append(&mut to_add);
+
+                    // Update counters
+                    new.highestmodseq = new.internalmodseq;
+                    new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+                }
+            }
+            UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => {
+                if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
+                    // Bump UIDValidity if required
+                    if *candidate_modseq < new.internalmodseq {
+                        let bump_modseq =
+                            (new.internalmodseq.get() - candidate_modseq.get()) as u32;
+                        new.uidvalidity =
+                            NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
+                    }
+
+                    // Remove flags from the source of trust and the cache
+                    existing_flags.retain(|x| !rm_flags.contains(x));
+                    new.idx_by_flag.remove(*uid, rm_flags);
+
+                    // Register that email has been modified
+                    new.idx_by_modseq.insert(new.internalmodseq, *ident);
+                    *email_modseq = new.internalmodseq;
+
+                    // Update counters
+                    new.highestmodseq = new.internalmodseq;
+                    new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+                }
+            }
+            UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => {
+                if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
+                    // Bump UIDValidity if required
+                    if *candidate_modseq < new.internalmodseq {
+                        let bump_modseq =
+                            (new.internalmodseq.get() - candidate_modseq.get()) as u32;
+                        new.uidvalidity =
+                            NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
+                    }
+
+                    // Remove flags from the source of trust and the cache
+                    let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags
+                        .iter()
+                        .cloned()
+                        .partition(|x| new_flags.contains(x));
+                    *existing_flags = keep_flags;
+                    let mut to_add: Vec<Flag> = new_flags
+                        .iter()
+                        .filter(|f| !existing_flags.contains(f))
+                        .cloned()
+                        .collect();
+                    existing_flags.append(&mut to_add);
+                    new.idx_by_flag.remove(*uid, &rm_flags);
+                    new.idx_by_flag.insert(*uid, &to_add);
+
+                    // Register that email has been modified
+                    new.idx_by_modseq.insert(new.internalmodseq, *ident);
+                    *email_modseq = new.internalmodseq;
+
+                    // Update counters
+                    new.highestmodseq = new.internalmodseq;
+                    new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+                }
+            }
+            UidIndexOp::BumpUidvalidity(count) => {
+                new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count)
+                    .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap());
+            }
+        }
+        new
+    }
+}
+
+// ---- FlagIndex implementation ----
+
+#[derive(Clone)]
+pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>);
+pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet<ImapUid>>;
+
+impl FlagIndex {
+    fn new() -> Self {
+        Self(HashMap::new())
+    }
+    fn insert(&mut self, uid: ImapUid, flags: &[Flag]) {
+        flags.iter().for_each(|flag| {
+            self.0
+                .entry(flag.clone())
+                .or_insert(OrdSet::new())
+                .insert(uid);
+        });
+    }
+    fn remove(&mut self, uid: ImapUid, flags: &[Flag]) {
+        for flag in flags.iter() {
+            if let Some(set) = self.0.get_mut(flag) {
+                set.remove(&uid);
+                if set.is_empty() {
+                    self.0.remove(flag);
+                }
+            }
+        }
+    }
+
+    pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> {
+        self.0.get(f)
+    }
+
+    pub fn flags(&self) -> FlagIter {
+        self.0.keys()
+    }
+}
+
+// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ----
+
+#[derive(Serialize, Deserialize)]
+struct UidIndexSerializedRepr {
+    mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec<Flag>)>,
+
+    uidvalidity: ImapUidvalidity,
+    uidnext: ImapUid,
+    highestmodseq: ModSeq,
+
+    internalseq: ImapUid,
+    internalmodseq: ModSeq,
+}
+
+impl<'de> Deserialize<'de> for UidIndex {
+    fn deserialize<D>(d: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?;
+
+        let mut uidindex = UidIndex {
+            table: OrdMap::new(),
+
+            idx_by_uid: OrdMap::new(),
+            idx_by_modseq: OrdMap::new(),
+            idx_by_flag: FlagIndex::new(),
+
+            uidvalidity: val.uidvalidity,
+            uidnext: val.uidnext,
+            highestmodseq: val.highestmodseq,
+
+            internalseq: val.internalseq,
+            internalmodseq: val.internalmodseq,
+        };
+
+        val.mails
+            .iter()
+            .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags));
+
+        Ok(uidindex)
+    }
+}
+
+impl Serialize for UidIndex {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        let mut mails = vec![];
+        for (ident, (uid, modseq, flags)) in self.table.iter() {
+            mails.push((*uid, *modseq, *ident, flags.clone()));
+        }
+
+        let val = UidIndexSerializedRepr {
+            mails,
+            uidvalidity: self.uidvalidity,
+            uidnext: self.uidnext,
+            highestmodseq: self.highestmodseq,
+            internalseq: self.internalseq,
+            internalmodseq: self.internalmodseq,
+        };
+
+        val.serialize(serializer)
+    }
+}
+
+// ---- TESTS ----
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_uidindex() {
+        let mut state = UidIndex::default();
+
+        // Add message 1
+        {
+            let m = UniqueIdent([0x01; 24]);
+            let f = vec!["\\Recent".to_string(), "\\Archive".to_string()];
+            let ev = state.op_mail_add(m, f);
+            state = state.apply(&ev);
+
+            // Early checks
+            assert_eq!(state.table.len(), 1);
+            let (uid, modseq, flags) = state.table.get(&m).unwrap();
+            assert_eq!(*uid, NonZeroU32::new(1).unwrap());
+            assert_eq!(*modseq, NonZeroU64::new(1).unwrap());
+            assert_eq!(flags.len(), 2);
+            let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap();
+            assert_eq!(&m, ident);
+            let recent = state.idx_by_flag.0.get("\\Recent").unwrap();
+            assert_eq!(recent.len(), 1);
+            assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap());
+            assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap());
+            assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap());
+        }
+
+        // Add message 2
+        {
+            let m = UniqueIdent([0x02; 24]);
+            let f = vec!["\\Seen".to_string(), "\\Archive".to_string()];
+            let ev = state.op_mail_add(m, f);
+            state = state.apply(&ev);
+
+            let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+            assert_eq!(archive.len(), 2);
+        }
+
+        // Add flags to message 1
+        {
+            let m = UniqueIdent([0x01; 24]);
+            let f = vec!["Important".to_string(), "$cl_1".to_string()];
+            let ev = state.op_flag_add(m, f);
+            state = state.apply(&ev);
+        }
+
+        // Delete flags from message 1
+        {
+            let m = UniqueIdent([0x01; 24]);
+            let f = vec!["\\Recent".to_string()];
+            let ev = state.op_flag_del(m, f);
+            state = state.apply(&ev);
+
+            let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+            assert_eq!(archive.len(), 2);
+        }
+
+        // Delete message 2
+        {
+            let m = UniqueIdent([0x02; 24]);
+            let ev = state.op_mail_del(m);
+            state = state.apply(&ev);
+
+            let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+            assert_eq!(archive.len(), 1);
+        }
+
+        // Add a message 3 concurrent to message 1 (trigger a uid validity change)
+        {
+            let m = UniqueIdent([0x03; 24]);
+            let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
+            let ev = UidIndexOp::MailAdd(
+                m,
+                NonZeroU32::new(1).unwrap(),
+                NonZeroU64::new(1).unwrap(),
+                f,
+            );
+            state = state.apply(&ev);
+        }
+
+        // Checks
+        {
+            assert_eq!(state.table.len(), 2);
+            assert!(state.uidvalidity > NonZeroU32::new(1).unwrap());
+
+            let (last_uid, ident) = state.idx_by_uid.get_max().unwrap();
+            assert_eq!(ident, &UniqueIdent([0x03; 24]));
+
+            let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+            assert_eq!(archive.len(), 2);
+            let mut iter = archive.iter();
+            assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap());
+            assert_eq!(iter.next().unwrap(), last_uid);
+        }
+    }
+}
diff --git a/aero-collections/mail/unique_ident.rs b/aero-collections/mail/unique_ident.rs
new file mode 100644
index 0000000..0e629db
--- /dev/null
+++ b/aero-collections/mail/unique_ident.rs
@@ -0,0 +1,101 @@
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+
+use lazy_static::lazy_static;
+use rand::prelude::*;
+use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::timestamp::now_msec;
+
+/// An internal Mail Identifier is composed of two components:
+/// - a process identifier, 128 bits, itself composed of:
+///   - the timestamp of when the process started, 64 bits
+///   - a 64-bit random number
+/// - a sequence number, 64 bits
+/// They are not part of the protocol but an internal representation
+/// required by Aerogramme.
+/// Their main property is to be unique without having to rely
+/// on synchronization between IMAP processes.
+#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
+pub struct UniqueIdent(pub [u8; 24]);
+
+struct IdentGenerator {
+    pid: u128,
+    sn: AtomicU64,
+}
+
+impl IdentGenerator {
+    fn new() -> Self {
+        let time = now_msec() as u128;
+        let rand = thread_rng().gen::<u64>() as u128;
+        Self {
+            pid: (time << 64) | rand,
+            sn: AtomicU64::new(0),
+        }
+    }
+
+    fn gen(&self) -> UniqueIdent {
+        let sn = self.sn.fetch_add(1, Ordering::Relaxed);
+        let mut res = [0u8; 24];
+        res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid));
+        res[16..24].copy_from_slice(&u64::to_be_bytes(sn));
+        UniqueIdent(res)
+    }
+}
+
+lazy_static! {
+    static ref GENERATOR: IdentGenerator = IdentGenerator::new();
+}
+
+pub fn gen_ident() -> UniqueIdent {
+    GENERATOR.gen()
+}
+
+// -- serde --
+
+impl<'de> Deserialize<'de> for UniqueIdent {
+    fn deserialize<D>(d: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let v = String::deserialize(d)?;
+        UniqueIdent::from_str(&v).map_err(D::Error::custom)
+    }
+}
+
+impl Serialize for UniqueIdent {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        serializer.serialize_str(&self.to_string())
+    }
+}
+
+impl std::fmt::Display for UniqueIdent {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", hex::encode(self.0))
+    }
+}
+
+impl std::fmt::Debug for UniqueIdent {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", hex::encode(self.0))
+    }
+}
+
+impl FromStr for UniqueIdent {
+    type Err = &'static str;
+
+    fn from_str(s: &str) -> Result<UniqueIdent, &'static str> {
+        let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
+
+        if bytes.len() != 24 {
+            return Err("bad length");
+        }
+
+        let mut tmp = [0u8; 24];
+        tmp[..].copy_from_slice(&bytes);
+        Ok(UniqueIdent(tmp))
+    }
+}
diff --git a/aero-collections/user.rs b/aero-collections/user.rs
new file mode 100644
index 0000000..a38b9c1
--- /dev/null
+++ b/aero-collections/user.rs
@@ -0,0 +1,313 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, Weak};
+
+use anyhow::{anyhow, bail, Result};
+use lazy_static::lazy_static;
+use serde::{Deserialize, Serialize};
+use tokio::sync::watch;
+
+use crate::cryptoblob::{open_deserialize, seal_serialize};
+use crate::login::Credentials;
+use crate::mail::incoming::incoming_mail_watch_process;
+use crate::mail::mailbox::Mailbox;
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::unique_ident::{gen_ident, UniqueIdent};
+use crate::storage;
+use crate::timestamp::now_msec;
+
+use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox};
+
+//@FIXME User should be totally rewriten
+//to extract the local mailbox list
+//to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace)
+
+pub struct User {
+    pub username: String,
+    pub creds: Credentials,
+    pub storage: storage::Store,
+    pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
+
+    tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
+}
+
+impl User {
+    pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
+        let cache_key = (username.clone(), creds.storage.unique());
+
+        {
+            let cache = USER_CACHE.lock().unwrap();
+            if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) {
+                return Ok(u);
+            }
+        }
+
+        let user = Self::open(username, creds).await?;
+
+        let mut cache = USER_CACHE.lock().unwrap();
+        if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) {
+            drop(user);
+            Ok(concurrent_user)
+        } else {
+            cache.insert(cache_key, Arc::downgrade(&user));
+            Ok(user)
+        }
+    }
+
+    /// Lists user's available mailboxes
+    pub async fn list_mailboxes(&self) -> Result<Vec<String>> {
+        let (list, _ct) = self.load_mailbox_list().await?;
+        Ok(list.existing_mailbox_names())
+    }
+
+    /// Opens an existing mailbox given its IMAP name.
+    pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
+        let (mut list, ct) = self.load_mailbox_list().await?;
+
+        //@FIXME it could be a trace or an opentelemtry trace thing.
+        // Be careful to not leak sensible data
+        /*
+        eprintln!("List of mailboxes:");
+        for ent in list.0.iter() {
+            eprintln!(" - {:?}", ent);
+        }
+        */
+
+        if let Some((uidvalidity, Some(mbid))) = list.get_mailbox(name) {
+            let mb = self.open_mailbox_by_id(mbid, uidvalidity).await?;
+            let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
+            if mb_uidvalidity > uidvalidity {
+                list.update_uidvalidity(name, mb_uidvalidity);
+                self.save_mailbox_list(&list, ct).await?;
+            }
+            Ok(Some(mb))
+        } else {
+            Ok(None)
+        }
+    }
+
+    /// Check whether mailbox exists
+    pub async fn has_mailbox(&self, name: &str) -> Result<bool> {
+        let (list, _ct) = self.load_mailbox_list().await?;
+        Ok(list.has_mailbox(name))
+    }
+
+    /// Creates a new mailbox in the user's IMAP namespace.
+    pub async fn create_mailbox(&self, name: &str) -> Result<()> {
+        if name.ends_with(MAILBOX_HIERARCHY_DELIMITER) {
+            bail!("Invalid mailbox name: {}", name);
+        }
+
+        let (mut list, ct) = self.load_mailbox_list().await?;
+        match list.create_mailbox(name) {
+            CreatedMailbox::Created(_, _) => {
+                self.save_mailbox_list(&list, ct).await?;
+                Ok(())
+            }
+            CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
+        }
+    }
+
+    /// Deletes a mailbox in the user's IMAP namespace.
+    pub async fn delete_mailbox(&self, name: &str) -> Result<()> {
+        if name == INBOX {
+            bail!("Cannot delete INBOX");
+        }
+
+        let (mut list, ct) = self.load_mailbox_list().await?;
+        if list.has_mailbox(name) {
+            //@TODO: actually delete mailbox contents
+            list.set_mailbox(name, None);
+            self.save_mailbox_list(&list, ct).await?;
+            Ok(())
+        } else {
+            bail!("Mailbox {} does not exist", name);
+        }
+    }
+
+    /// Renames a mailbox in the user's IMAP namespace.
+    pub async fn rename_mailbox(&self, old_name: &str, new_name: &str) -> Result<()> {
+        let (mut list, ct) = self.load_mailbox_list().await?;
+
+        if old_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) {
+            bail!("Invalid mailbox name: {}", old_name);
+        }
+        if new_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) {
+            bail!("Invalid mailbox name: {}", new_name);
+        }
+
+        if old_name == INBOX {
+            list.rename_mailbox(old_name, new_name)?;
+            if !self.ensure_inbox_exists(&mut list, &ct).await? {
+                self.save_mailbox_list(&list, ct).await?;
+            }
+        } else {
+            let names = list.existing_mailbox_names();
+
+            let old_name_w_delim = format!("{}{}", old_name, MAILBOX_HIERARCHY_DELIMITER);
+            let new_name_w_delim = format!("{}{}", new_name, MAILBOX_HIERARCHY_DELIMITER);
+
+            if names
+                .iter()
+                .any(|x| x == new_name || x.starts_with(&new_name_w_delim))
+            {
+                bail!("Mailbox {} already exists", new_name);
+            }
+
+            for name in names.iter() {
+                if name == old_name {
+                    list.rename_mailbox(name, new_name)?;
+                } else if let Some(tail) = name.strip_prefix(&old_name_w_delim) {
+                    let nnew = format!("{}{}", new_name_w_delim, tail);
+                    list.rename_mailbox(name, &nnew)?;
+                }
+            }
+
+            self.save_mailbox_list(&list, ct).await?;
+        }
+        Ok(())
+    }
+
+    // ---- Internal user & mailbox management ----
+
+    async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
+        let storage = creds.storage.build().await?;
+
+        let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
+
+        let user = Arc::new(Self {
+            username,
+            creds: creds.clone(),
+            storage,
+            tx_inbox_id,
+            mailboxes: std::sync::Mutex::new(HashMap::new()),
+        });
+
+        // Ensure INBOX exists (done inside load_mailbox_list)
+        user.load_mailbox_list().await?;
+
+        tokio::spawn(incoming_mail_watch_process(
+            Arc::downgrade(&user),
+            user.creds.clone(),
+            rx_inbox_id,
+        ));
+
+        Ok(user)
+    }
+
+    pub(super) async fn open_mailbox_by_id(
+        &self,
+        id: UniqueIdent,
+        min_uidvalidity: ImapUidvalidity,
+    ) -> Result<Arc<Mailbox>> {
+        {
+            let cache = self.mailboxes.lock().unwrap();
+            if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) {
+                return Ok(mb);
+            }
+        }
+
+        let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?);
+
+        let mut cache = self.mailboxes.lock().unwrap();
+        if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) {
+            drop(mb); // we worked for nothing but at least we didn't starve someone else
+            Ok(concurrent_mb)
+        } else {
+            cache.insert(id, Arc::downgrade(&mb));
+            Ok(mb)
+        }
+    }
+
+    // ---- Mailbox list management ----
+
+    async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
+        let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
+        let (mut list, row) = match self
+            .storage
+            .row_fetch(&storage::Selector::Single(&row_ref))
+            .await
+        {
+            Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
+            Err(e) => return Err(e.into()),
+            Ok(rv) => {
+                let mut list = MailboxList::new();
+                let (row_ref, row_vals) = match rv.into_iter().next() {
+                    Some(row_val) => (row_val.row_ref, row_val.value),
+                    None => (row_ref, vec![]),
+                };
+
+                for v in row_vals {
+                    if let storage::Alternative::Value(vbytes) = v {
+                        let list2 =
+                            open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
+                        list.merge(list2);
+                    }
+                }
+                (list, Some(row_ref))
+            }
+        };
+
+        let is_default_mbx_missing = [DRAFTS, ARCHIVE, SENT, TRASH]
+            .iter()
+            .map(|mbx| list.create_mailbox(mbx))
+            .fold(false, |acc, r| {
+                acc || matches!(r, CreatedMailbox::Created(..))
+            });
+        let is_inbox_missing = self.ensure_inbox_exists(&mut list, &row).await?;
+        if is_default_mbx_missing && !is_inbox_missing {
+            // It's the only case where we created some mailboxes and not saved them
+            // So we save them!
+            self.save_mailbox_list(&list, row.clone()).await?;
+        }
+
+        Ok((list, row))
+    }
+
+    async fn ensure_inbox_exists(
+        &self,
+        list: &mut MailboxList,
+        ct: &Option<storage::RowRef>,
+    ) -> Result<bool> {
+        // If INBOX doesn't exist, create a new mailbox with that name
+        // and save new mailbox list.
+        // Also, ensure that the mpsc::watch that keeps track of the
+        // inbox id is up-to-date.
+        let saved;
+        let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) {
+            CreatedMailbox::Created(i, v) => {
+                self.save_mailbox_list(list, ct.clone()).await?;
+                saved = true;
+                (i, v)
+            }
+            CreatedMailbox::Existed(i, v) => {
+                saved = false;
+                (i, v)
+            }
+        };
+        let inbox_id = Some((inbox_id, inbox_uidvalidity));
+        if *self.tx_inbox_id.borrow() != inbox_id {
+            self.tx_inbox_id.send(inbox_id).unwrap();
+        }
+
+        Ok(saved)
+    }
+
+    async fn save_mailbox_list(
+        &self,
+        list: &MailboxList,
+        ct: Option<storage::RowRef>,
+    ) -> Result<()> {
+        let list_blob = seal_serialize(list, &self.creds.keys.master)?;
+        let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK));
+        let row_val = storage::RowVal::new(rref, list_blob);
+        self.storage.row_insert(vec![row_val]).await?;
+        Ok(())
+    }
+}
+
+// ---- User cache ----
+
+lazy_static! {
+    static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> =
+        std::sync::Mutex::new(HashMap::new());
+}
-- 
cgit v1.2.3