aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src
diff options
context:
space:
mode:
Diffstat (limited to 'aero-collections/src')
-rw-r--r--aero-collections/src/calendar/mod.rs204
-rw-r--r--aero-collections/src/calendar/namespace.rs324
-rw-r--r--aero-collections/src/davdag.rs342
-rw-r--r--aero-collections/src/lib.rs5
-rw-r--r--aero-collections/src/mail/incoming.rs443
-rw-r--r--aero-collections/src/mail/mailbox.rs525
-rw-r--r--aero-collections/src/mail/mod.rs24
-rw-r--r--aero-collections/src/mail/namespace.rs206
-rw-r--r--aero-collections/src/mail/query.rs137
-rw-r--r--aero-collections/src/mail/snapshot.rs60
-rw-r--r--aero-collections/src/mail/uidindex.rs474
-rw-r--r--aero-collections/src/unique_ident.rs101
-rw-r--r--aero-collections/src/user.rs327
13 files changed, 3172 insertions, 0 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs
new file mode 100644
index 0000000..ac07842
--- /dev/null
+++ b/aero-collections/src/calendar/mod.rs
@@ -0,0 +1,204 @@
+pub mod namespace;
+
+use anyhow::{anyhow, bail, Result};
+use tokio::sync::RwLock;
+
+use aero_bayou::Bayou;
+use aero_user::cryptoblob::{self, gen_key, Key};
+use aero_user::login::Credentials;
+use aero_user::storage::{self, BlobRef, BlobVal, Store};
+
+use crate::davdag::{BlobId, DavDag, IndexEntry, SyncChange, Token};
+use crate::unique_ident::*;
+
+pub struct Calendar {
+ pub(super) id: UniqueIdent,
+ internal: RwLock<CalendarInternal>,
+}
+
+impl Calendar {
+ pub(crate) async fn open(creds: &Credentials, id: UniqueIdent) -> Result<Self> {
+ let bayou_path = format!("calendar/dag/{}", id);
+ let cal_path = format!("calendar/events/{}", id);
+
+ let mut davdag = Bayou::<DavDag>::new(creds, bayou_path).await?;
+ davdag.sync().await?;
+
+ let internal = RwLock::new(CalendarInternal {
+ id,
+ encryption_key: creds.keys.master.clone(),
+ storage: creds.storage.build().await?,
+ davdag,
+ cal_path,
+ });
+
+ Ok(Self { id, internal })
+ }
+
+ // ---- DAG sync utilities
+
+ /// Sync data with backing store
+ pub async fn force_sync(&self) -> Result<()> {
+ self.internal.write().await.force_sync().await
+ }
+
+ /// Sync data with backing store only if changes are detected
+ /// or last sync is too old
+ pub async fn opportunistic_sync(&self) -> Result<()> {
+ self.internal.write().await.opportunistic_sync().await
+ }
+
+ // ---- Data API
+
+ /// Access the DAG internal data (you can get the list of files for example)
+ pub async fn dag(&self) -> DavDag {
+ // Cloning is cheap
+ self.internal.read().await.davdag.state().clone()
+ }
+
+ /// Access the current token
+ pub async fn token(&self) -> Result<Token> {
+ self.internal.write().await.current_token().await
+ }
+
+ /// The diff API is a write API as we might need to push a merge node
+ /// to get a new sync token
+ pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ self.internal.write().await.diff(sync_token).await
+ }
+
+ /// Get a specific event
+ pub async fn get(&self, evt_id: UniqueIdent) -> Result<Vec<u8>> {
+ self.internal.read().await.get(evt_id).await
+ }
+
+ /// Put a specific event
+ pub async fn put<'a>(&self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
+ self.internal.write().await.put(name, evt).await
+ }
+
+ /// Delete a specific event
+ pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
+ self.internal.write().await.delete(blob_id).await
+ }
+}
+
+use base64::Engine;
+const MESSAGE_KEY: &str = "message-key";
+struct CalendarInternal {
+ #[allow(dead_code)]
+ id: UniqueIdent,
+ cal_path: String,
+ encryption_key: Key,
+ storage: Store,
+ davdag: Bayou<DavDag>,
+}
+
+impl CalendarInternal {
+ async fn force_sync(&mut self) -> Result<()> {
+ self.davdag.sync().await?;
+ Ok(())
+ }
+
+ async fn opportunistic_sync(&mut self) -> Result<()> {
+ self.davdag.opportunistic_sync().await?;
+ Ok(())
+ }
+
+ async fn get(&self, blob_id: BlobId) -> Result<Vec<u8>> {
+ // Fetch message from S3
+ let blob_ref = storage::BlobRef(format!("{}/{}", self.cal_path, blob_id));
+ let object = self.storage.blob_fetch(&blob_ref).await?;
+
+ // Decrypt message key from headers
+ let key_encrypted_b64 = object
+ .meta
+ .get(MESSAGE_KEY)
+ .ok_or(anyhow!("Missing key in metadata"))?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
+ let message_key_raw = cryptoblob::open(&key_encrypted, &self.encryption_key)?;
+ let message_key =
+ cryptoblob::Key::from_slice(&message_key_raw).ok_or(anyhow!("Invalid message key"))?;
+
+ // Decrypt body
+ let body = object.value;
+ cryptoblob::open(&body, &message_key)
+ }
+
+ async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
+ let message_key = gen_key();
+ let blob_id = gen_ident();
+
+ let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);
+
+ // Write event to S3
+ let message_blob = cryptoblob::seal(evt, &message_key)?;
+ let blob_val = BlobVal::new(
+ BlobRef(format!("{}/{}", self.cal_path, blob_id)),
+ message_blob,
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+
+ let etag = self.storage.blob_insert(blob_val).await?;
+
+ // Add entry to Bayou
+ let entry: IndexEntry = (blob_id, name.to_string(), etag);
+ let davstate = self.davdag.state();
+ let put_op = davstate.op_put(entry.clone());
+ let token = put_op.token();
+ self.davdag.push(put_op).await?;
+
+ Ok((token, entry))
+ }
+
+ async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
+ let davstate = self.davdag.state();
+
+ if !davstate.table.contains_key(&blob_id) {
+ bail!("Cannot delete event that doesn't exist");
+ }
+
+ let del_op = davstate.op_delete(blob_id);
+ let token = del_op.token();
+ self.davdag.push(del_op).await?;
+
+ let blob_ref = BlobRef(format!("{}/{}", self.cal_path, blob_id));
+ self.storage.blob_rm(&blob_ref).await?;
+
+ Ok(token)
+ }
+
+ async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ let davstate = self.davdag.state();
+
+ let token_changed = davstate.resolve(sync_token)?;
+ let changes = token_changed
+ .iter()
+ .filter_map(|t: &Token| davstate.change.get(t))
+ .map(|s| s.clone())
+ .filter(|s| match s {
+ SyncChange::Ok((filename, _)) => davstate.idx_by_filename.get(filename).is_some(),
+ SyncChange::NotFound(filename) => davstate.idx_by_filename.get(filename).is_none(),
+ })
+ .collect();
+
+ let token = self.current_token().await?;
+ Ok((token, changes))
+ }
+
+ async fn current_token(&mut self) -> Result<Token> {
+ let davstate = self.davdag.state();
+ let heads = davstate.heads_vec();
+ let token = match heads.as_slice() {
+ [token] => *token,
+ _ => {
+ let op_mg = davstate.op_merge();
+ let token = op_mg.token();
+ self.davdag.push(op_mg).await?;
+ token
+ }
+ };
+ Ok(token)
+ }
+}
diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs
new file mode 100644
index 0000000..db65703
--- /dev/null
+++ b/aero-collections/src/calendar/namespace.rs
@@ -0,0 +1,324 @@
+use anyhow::{bail, Result};
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, Weak};
+
+use serde::{Deserialize, Serialize};
+
+use aero_bayou::timestamp::now_msec;
+use aero_user::cryptoblob::{open_deserialize, seal_serialize};
+use aero_user::storage;
+
+use super::Calendar;
+use crate::unique_ident::{gen_ident, UniqueIdent};
+use crate::user::User;
+
+pub(crate) const CAL_LIST_PK: &str = "calendars";
+pub(crate) const CAL_LIST_SK: &str = "list";
+pub(crate) const MAIN_CAL: &str = "Personal";
+pub(crate) const MAX_CALNAME_CHARS: usize = 32;
+
+pub struct CalendarNs(std::sync::Mutex<HashMap<UniqueIdent, Weak<Calendar>>>);
+
+impl CalendarNs {
+ /// Create a new calendar namespace
+ pub fn new() -> Self {
+ Self(std::sync::Mutex::new(HashMap::new()))
+ }
+
+ /// Open a calendar by name
+ pub async fn open(&self, user: &Arc<User>, name: &str) -> Result<Option<Arc<Calendar>>> {
+ let (list, _ct) = CalendarList::load(user).await?;
+
+ match list.get(name) {
+ None => Ok(None),
+ Some(ident) => Ok(Some(self.open_by_id(user, ident).await?)),
+ }
+ }
+
+ /// Open a calendar by unique id
+ /// Check user.rs::open_mailbox_by_id to understand this function
+ pub async fn open_by_id(&self, user: &Arc<User>, id: UniqueIdent) -> Result<Arc<Calendar>> {
+ {
+ let cache = self.0.lock().unwrap();
+ if let Some(cal) = cache.get(&id).and_then(Weak::upgrade) {
+ return Ok(cal);
+ }
+ }
+
+ let cal = Arc::new(Calendar::open(&user.creds, id).await?);
+
+ let mut cache = self.0.lock().unwrap();
+ if let Some(concurrent_cal) = cache.get(&id).and_then(Weak::upgrade) {
+ drop(cal); // we worked for nothing but at least we didn't starve someone else
+ Ok(concurrent_cal)
+ } else {
+ cache.insert(id, Arc::downgrade(&cal));
+ Ok(cal)
+ }
+ }
+
+ /// List calendars
+ pub async fn list(&self, user: &Arc<User>) -> Result<Vec<String>> {
+ CalendarList::load(user).await.map(|(list, _)| list.names())
+ }
+
+ /// Delete a calendar from the index
+ pub async fn delete(&self, user: &Arc<User>, name: &str) -> Result<()> {
+ // We currently assume that main cal is a bit specific
+ if name == MAIN_CAL {
+ bail!("Cannot delete main calendar");
+ }
+
+ let (mut list, ct) = CalendarList::load(user).await?;
+ if list.has(name) {
+ //@TODO: actually delete calendar content
+ list.bind(name, None);
+ list.save(user, ct).await?;
+ Ok(())
+ } else {
+ bail!("Calendar {} does not exist", name);
+ }
+ }
+
+ /// Rename a calendar in the index
+ pub async fn rename(&self, user: &Arc<User>, old: &str, new: &str) -> Result<()> {
+ if old == MAIN_CAL {
+ bail!("Renaming main calendar is not supported currently");
+ }
+ if !new.chars().all(char::is_alphanumeric) {
+ bail!("Unsupported characters in new calendar name, only alphanumeric characters are allowed currently");
+ }
+ if new.len() > MAX_CALNAME_CHARS {
+ bail!("Calendar name can't contain more than 32 characters");
+ }
+
+ let (mut list, ct) = CalendarList::load(user).await?;
+ list.rename(old, new)?;
+ list.save(user, ct).await?;
+
+ Ok(())
+ }
+
+ /// Create calendar
+ pub async fn create(&self, user: &Arc<User>, name: &str) -> Result<()> {
+ if name == MAIN_CAL {
+ bail!("Main calendar is automatically created, can't create it manually");
+ }
+ if !name.chars().all(char::is_alphanumeric) {
+ bail!("Unsupported characters in new calendar name, only alphanumeric characters are allowed");
+ }
+ if name.len() > MAX_CALNAME_CHARS {
+ bail!("Calendar name can't contain more than 32 characters");
+ }
+
+ let (mut list, ct) = CalendarList::load(user).await?;
+ match list.create(name) {
+ CalendarExists::Existed(_) => bail!("Calendar {} already exists", name),
+ CalendarExists::Created(_) => (),
+ }
+ list.save(user, ct).await?;
+
+ Ok(())
+ }
+
+ /// Has calendar
+ pub async fn has(&self, user: &Arc<User>, name: &str) -> Result<bool> {
+ CalendarList::load(user)
+ .await
+ .map(|(list, _)| list.has(name))
+ }
+}
+
+// ------
+// ------ From this point, implementation is hidden from the rest of the crate
+// ------
+
+#[derive(Serialize, Deserialize)]
+struct CalendarList(BTreeMap<String, CalendarListEntry>);
+
+#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
+struct CalendarListEntry {
+ id_lww: (u64, Option<UniqueIdent>),
+}
+
+impl CalendarList {
+ // ---- Index persistence related functions
+
+ /// Load from storage
+ async fn load(user: &Arc<User>) -> Result<(Self, Option<storage::RowRef>)> {
+ let row_ref = storage::RowRef::new(CAL_LIST_PK, CAL_LIST_SK);
+ let (mut list, row) = match user
+ .storage
+ .row_fetch(&storage::Selector::Single(&row_ref))
+ .await
+ {
+ Err(storage::StorageError::NotFound) => (Self::new(), None),
+ Err(e) => return Err(e.into()),
+ Ok(rv) => {
+ let mut list = Self::new();
+ let (row_ref, row_vals) = match rv.into_iter().next() {
+ Some(row_val) => (row_val.row_ref, row_val.value),
+ None => (row_ref, vec![]),
+ };
+
+ for v in row_vals {
+ if let storage::Alternative::Value(vbytes) = v {
+ let list2 =
+ open_deserialize::<CalendarList>(&vbytes, &user.creds.keys.master)?;
+ list.merge(list2);
+ }
+ }
+ (list, Some(row_ref))
+ }
+ };
+
+ // Create default calendars (currently only one calendar is created)
+ let is_default_cal_missing = [MAIN_CAL]
+ .iter()
+ .map(|calname| list.create(calname))
+ .fold(false, |acc, r| {
+ acc || matches!(r, CalendarExists::Created(..))
+ });
+
+ // Save the index if we created a new calendar
+ if is_default_cal_missing {
+ list.save(user, row.clone()).await?;
+ }
+
+ Ok((list, row))
+ }
+
+ /// Save an updated index
+ async fn save(&self, user: &Arc<User>, ct: Option<storage::RowRef>) -> Result<()> {
+ let list_blob = seal_serialize(self, &user.creds.keys.master)?;
+ let rref = ct.unwrap_or(storage::RowRef::new(CAL_LIST_PK, CAL_LIST_SK));
+ let row_val = storage::RowVal::new(rref, list_blob);
+ user.storage.row_insert(vec![row_val]).await?;
+ Ok(())
+ }
+
+ // ----- Index manipulation functions
+
+ /// Ensure that a given calendar exists
+ /// (Don't forget to save if it returns CalendarExists::Created)
+ fn create(&mut self, name: &str) -> CalendarExists {
+ if let Some(CalendarListEntry {
+ id_lww: (_, Some(id)),
+ }) = self.0.get(name)
+ {
+ return CalendarExists::Existed(*id);
+ }
+
+ let id = gen_ident();
+ self.bind(name, Some(id)).unwrap();
+ CalendarExists::Created(id)
+ }
+
+ /// Get a list of all calendar names
+ fn names(&self) -> Vec<String> {
+ self.0
+ .iter()
+ .filter(|(_, v)| v.id_lww.1.is_some())
+ .map(|(k, _)| k.to_string())
+ .collect()
+ }
+
+ /// For a given calendar name, get its Unique Identifier
+ fn get(&self, name: &str) -> Option<UniqueIdent> {
+ self.0
+ .get(name)
+ .map(|CalendarListEntry { id_lww: (_, ident) }| *ident)
+ .flatten()
+ }
+
+ /// Check if a given calendar name exists
+ fn has(&self, name: &str) -> bool {
+ self.get(name).is_some()
+ }
+
+ /// Rename a calendar
+ fn rename(&mut self, old: &str, new: &str) -> Result<()> {
+ if self.has(new) {
+ bail!("Calendar {} already exists", new);
+ }
+ let ident = match self.get(old) {
+ None => bail!("Calendar {} does not exist", old),
+ Some(ident) => ident,
+ };
+
+ self.bind(old, None);
+ self.bind(new, Some(ident));
+
+ Ok(())
+ }
+
+ // ----- Internal logic
+
+ /// New is not publicly exposed, use `load` instead
+ fn new() -> Self {
+ Self(BTreeMap::new())
+ }
+
+ /// Low level index updating logic (used to add/rename/delete) an entry
+ fn bind(&mut self, name: &str, id: Option<UniqueIdent>) -> Option<()> {
+ let (ts, id) = match self.0.get_mut(name) {
+ None => {
+ if id.is_none() {
+ // User wants to delete entry with given name (passed id is None)
+ // Entry does not exist (get_mut is None)
+ // Nothing to do
+ return None;
+ } else {
+ // User wants entry with given name to be present (id is Some)
+ // Entry does not exist
+ // Initialize entry
+ (now_msec(), id)
+ }
+ }
+ Some(CalendarListEntry { id_lww }) => {
+ if id_lww.1 == id {
+ // Entry is already equals to the requested id (Option<UniqueIdent)
+ // Nothing to do
+ return None;
+ } else {
+ // Entry does not equal to what we know internally
+ // We update the Last Write Win CRDT here with the new id value
+ (std::cmp::max(id_lww.0 + 1, now_msec()), id)
+ }
+ }
+ };
+
+ // If we did not return here, that's because we have to update
+ // something in our internal index.
+ self.0
+ .insert(name.into(), CalendarListEntry { id_lww: (ts, id) });
+ Some(())
+ }
+
+ // Merge 2 calendar lists by applying a LWW logic on each element
+ fn merge(&mut self, list2: Self) {
+ for (k, v) in list2.0.into_iter() {
+ if let Some(e) = self.0.get_mut(&k) {
+ e.merge(&v);
+ } else {
+ self.0.insert(k, v);
+ }
+ }
+ }
+}
+
+impl CalendarListEntry {
+ fn merge(&mut self, other: &Self) {
+ // Simple CRDT merge rule
+ if other.id_lww.0 > self.id_lww.0
+ || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
+ {
+ self.id_lww = other.id_lww;
+ }
+ }
+}
+
+pub(crate) enum CalendarExists {
+ Created(UniqueIdent),
+ Existed(UniqueIdent),
+}
diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs
new file mode 100644
index 0000000..74e745f
--- /dev/null
+++ b/aero-collections/src/davdag.rs
@@ -0,0 +1,342 @@
+use anyhow::{bail, Result};
+use im::{ordset, OrdMap, OrdSet};
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+use aero_bayou::*;
+
+use crate::unique_ident::{gen_ident, UniqueIdent};
+
+/// Parents are only persisted in the event log,
+/// not in the checkpoints.
+pub type Token = UniqueIdent;
+pub type Parents = Vec<Token>;
+pub type SyncDesc = (Parents, Token);
+
+pub type BlobId = UniqueIdent;
+pub type Etag = String;
+pub type FileName = String;
+pub type IndexEntry = (BlobId, FileName, Etag);
+
+#[derive(Clone, Default)]
+pub struct DavDag {
+ /// Source of trust
+ pub table: OrdMap<BlobId, IndexEntry>,
+
+ /// Indexes optimized for queries
+ pub idx_by_filename: OrdMap<FileName, BlobId>,
+
+ // ------------ Below this line, data is ephemeral, ie. not checkpointed
+ /// Partial synchronization graph
+ pub ancestors: OrdMap<Token, OrdSet<Token>>,
+
+ /// All nodes
+ pub all_nodes: OrdSet<Token>,
+ /// Head nodes
+ pub heads: OrdSet<Token>,
+ /// Origin nodes
+ pub origins: OrdSet<Token>,
+
+ /// File change token by token
+ pub change: OrdMap<Token, SyncChange>,
+}
+
+#[derive(Clone, Debug)]
+pub enum SyncChange {
+ Ok((FileName, BlobId)),
+ NotFound(FileName),
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub enum DavDagOp {
+ /// Merge is a virtual operation run when multiple heads are discovered
+ Merge(SyncDesc),
+
+ /// Add an item to the collection
+ Put(SyncDesc, IndexEntry),
+
+ /// Delete an item from the collection
+ Delete(SyncDesc, BlobId),
+}
+impl DavDagOp {
+ pub fn token(&self) -> Token {
+ match self {
+ Self::Merge((_, t)) => *t,
+ Self::Put((_, t), _) => *t,
+ Self::Delete((_, t), _) => *t,
+ }
+ }
+}
+
+impl DavDag {
+ pub fn op_merge(&self) -> DavDagOp {
+ DavDagOp::Merge(self.sync_desc())
+ }
+
+ pub fn op_put(&self, entry: IndexEntry) -> DavDagOp {
+ DavDagOp::Put(self.sync_desc(), entry)
+ }
+
+ pub fn op_delete(&self, blob_id: BlobId) -> DavDagOp {
+ DavDagOp::Delete(self.sync_desc(), blob_id)
+ }
+
+ // HELPER functions
+
+ pub fn heads_vec(&self) -> Vec<Token> {
+ self.heads.clone().into_iter().collect()
+ }
+
+ /// A sync descriptor
+ pub fn sync_desc(&self) -> SyncDesc {
+ (self.heads_vec(), gen_ident())
+ }
+
+ /// Resolve a sync token
+ pub fn resolve(&self, known: Token) -> Result<OrdSet<Token>> {
+ let already_known = self.all_ancestors(known);
+
+ // We can't capture all missing events if we are not connected
+ // to all sinks of the graph,
+ // ie. if we don't already know all the sinks,
+ // ie. if we are missing so much history that
+ // the event log has been transformed into a checkpoint
+ if !self.origins.is_subset(already_known.clone()) {
+ bail!("Not enough history to produce a correct diff, a full resync is needed");
+ }
+
+ // Missing items are *all existing graph items* from which
+ // we removed *all items known by the given node*.
+ // In other words, all values in `all_nodes` that are not in `already_known`.
+ Ok(self.all_nodes.clone().relative_complement(already_known))
+ }
+
+ /// Find all ancestors of a given node
+ fn all_ancestors(&self, known: Token) -> OrdSet<Token> {
+ let mut all_known: OrdSet<UniqueIdent> = OrdSet::new();
+ let mut to_collect = vec![known];
+ loop {
+ let cursor = match to_collect.pop() {
+ // Loop stops here
+ None => break,
+ Some(v) => v,
+ };
+
+ if all_known.insert(cursor).is_some() {
+ // Item already processed
+ continue;
+ }
+
+ // Collect parents
+ let parents = match self.ancestors.get(&cursor) {
+ None => continue,
+ Some(c) => c,
+ };
+ to_collect.extend(parents.iter());
+ }
+ all_known
+ }
+
+ // INTERNAL functions
+
+ /// Register a WebDAV item (put, copy, move)
+ fn register(&mut self, sync_token: Option<Token>, entry: IndexEntry) {
+ let (blob_id, filename, _etag) = entry.clone();
+
+ // Insert item in the source of trust
+ self.table.insert(blob_id, entry);
+
+ // Update the cache
+ self.idx_by_filename.insert(filename.to_string(), blob_id);
+
+ // Record the change in the ephemeral synchronization map
+ if let Some(sync_token) = sync_token {
+ self.change
+ .insert(sync_token, SyncChange::Ok((filename, blob_id)));
+ }
+ }
+
+ /// Unregister a WebDAV item (delete, move)
+ fn unregister(&mut self, sync_token: Token, blob_id: &BlobId) {
+ // Query the source of truth to get the information we
+ // need to clean the indexes
+ let (_blob_id, filename, _etag) = match self.table.get(blob_id) {
+ Some(v) => v,
+ // Element does not exist, return early
+ None => return,
+ };
+ self.idx_by_filename.remove(filename);
+
+ // Record the change in the ephemeral synchronization map
+ self.change
+ .insert(sync_token, SyncChange::NotFound(filename.to_string()));
+
+ // Finally clear item from the source of trust
+ self.table.remove(blob_id);
+ }
+
+ /// When an event is processed, update the synchronization DAG
+ fn sync_dag(&mut self, sync_desc: &SyncDesc) {
+ let (parents, child) = sync_desc;
+
+ // --- Update ANCESTORS
+ // We register ancestors as it is required for the sync algorithm
+ self.ancestors.insert(
+ *child,
+ parents.iter().fold(ordset![], |mut acc, p| {
+ acc.insert(*p);
+ acc
+ }),
+ );
+
+ // --- Update ORIGINS
+ // If this event has no parents, it's an origin
+ if parents.is_empty() {
+ self.origins.insert(*child);
+ }
+
+ // --- Update HEADS
+ // Remove from HEADS this event's parents
+ parents.iter().for_each(|par| {
+ self.heads.remove(par);
+ });
+
+ // This event becomes a new HEAD in turn
+ self.heads.insert(*child);
+
+ // --- Update ALL NODES
+ self.all_nodes.insert(*child);
+ }
+}
+
+impl std::fmt::Debug for DavDag {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str("DavDag\n")?;
+ for elem in self.table.iter() {
+ f.write_fmt(format_args!("\t{:?} => {:?}", elem.0, elem.1))?;
+ }
+ Ok(())
+ }
+}
+
+impl BayouState for DavDag {
+ type Op = DavDagOp;
+
+ fn apply(&self, op: &Self::Op) -> Self {
+ let mut new = self.clone();
+
+ match op {
+ DavDagOp::Put(sync_desc, entry) => {
+ new.sync_dag(sync_desc);
+ new.register(Some(sync_desc.1), entry.clone());
+ }
+ DavDagOp::Delete(sync_desc, blob_id) => {
+ new.sync_dag(sync_desc);
+ new.unregister(sync_desc.1, blob_id);
+ }
+ DavDagOp::Merge(sync_desc) => {
+ new.sync_dag(sync_desc);
+ }
+ }
+
+ new
+ }
+}
+
+// CUSTOM SERIALIZATION & DESERIALIZATION
+#[derive(Serialize, Deserialize)]
+struct DavDagSerializedRepr {
+ items: Vec<IndexEntry>,
+ heads: Vec<UniqueIdent>,
+}
+
+impl<'de> Deserialize<'de> for DavDag {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let val: DavDagSerializedRepr = DavDagSerializedRepr::deserialize(d)?;
+ let mut davdag = DavDag::default();
+
+ // Build the table + index
+ val.items
+ .into_iter()
+ .for_each(|entry| davdag.register(None, entry));
+
+ // Initialize the synchronization DAG with its roots
+ val.heads.into_iter().for_each(|ident| {
+ davdag.heads.insert(ident);
+ davdag.origins.insert(ident);
+ davdag.all_nodes.insert(ident);
+ });
+
+ Ok(davdag)
+ }
+}
+
+impl Serialize for DavDag {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ // Indexes are rebuilt on the fly, we serialize only the core database
+ let items = self.table.iter().map(|(_, entry)| entry.clone()).collect();
+
+ // We keep only the head entries from the sync graph,
+ // these entries will be used to initialize it back when deserializing
+ let heads = self.heads_vec();
+
+ // Finale serialization object
+ let val = DavDagSerializedRepr { items, heads };
+ val.serialize(serializer)
+ }
+}
+
+// ---- TESTS ----
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn base() {
+ let mut state = DavDag::default();
+
+ // Add item 1
+ {
+ let m = UniqueIdent([0x01; 24]);
+ let ev = state.op_put((m, "cal.ics".into(), "321-321".into()));
+ state = state.apply(&ev);
+
+ assert_eq!(state.table.len(), 1);
+ assert_eq!(state.resolve(ev.token()).unwrap().len(), 0);
+ }
+
+ // Add 2 concurrent items
+ let (t1, t2) = {
+ let blob1 = UniqueIdent([0x02; 24]);
+ let ev1 = state.op_put((blob1, "cal2.ics".into(), "321-321".into()));
+
+ let blob2 = UniqueIdent([0x01; 24]);
+ let ev2 = state.op_delete(blob2);
+
+ state = state.apply(&ev1);
+ state = state.apply(&ev2);
+
+ assert_eq!(state.table.len(), 1);
+ assert_eq!(state.resolve(ev1.token()).unwrap(), ordset![ev2.token()]);
+
+ (ev1.token(), ev2.token())
+ };
+
+ // Add later a new item
+ {
+ let blob3 = UniqueIdent([0x03; 24]);
+ let ev = state.op_put((blob3, "cal3.ics".into(), "321-321".into()));
+
+ state = state.apply(&ev);
+ assert_eq!(state.table.len(), 2);
+ assert_eq!(state.resolve(ev.token()).unwrap().len(), 0);
+ assert_eq!(state.resolve(t1).unwrap(), ordset![t2, ev.token()]);
+ }
+ }
+}
diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs
new file mode 100644
index 0000000..eabf61c
--- /dev/null
+++ b/aero-collections/src/lib.rs
@@ -0,0 +1,5 @@
+pub mod calendar;
+pub mod davdag;
+pub mod mail;
+pub mod unique_ident;
+pub mod user;
diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs
new file mode 100644
index 0000000..55c2515
--- /dev/null
+++ b/aero-collections/src/mail/incoming.rs
@@ -0,0 +1,443 @@
+use std::sync::{Arc, Weak};
+use std::time::Duration;
+
+use anyhow::{anyhow, bail, Result};
+use base64::Engine;
+use futures::{future::BoxFuture, FutureExt};
+//use tokio::io::AsyncReadExt;
+use tokio::sync::watch;
+use tracing::{debug, error, info, warn};
+
+use aero_bayou::timestamp::now_msec;
+use aero_user::cryptoblob;
+use aero_user::login::{Credentials, PublicCredentials};
+use aero_user::storage;
+
+use crate::mail::mailbox::Mailbox;
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::IMF;
+use crate::unique_ident::*;
+use crate::user::User;
+
+const INCOMING_PK: &str = "incoming";
+const INCOMING_LOCK_SK: &str = "lock";
+const INCOMING_WATCH_SK: &str = "watch";
+
+const MESSAGE_KEY: &str = "message-key";
+
+// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
+// It is renewed every LOCK_DURATION/3
+// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
+// lost the lock.
+const LOCK_DURATION: Duration = Duration::from_secs(300);
+
+// In addition to checking when notified, also check for new mail every 10 minutes
+const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
+
+pub async fn incoming_mail_watch_process(
+ user: Weak<User>,
+ creds: Credentials,
+ rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) {
+ if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
+ error!("Error in incoming mail watch process: {}", e);
+ }
+}
+
+async fn incoming_mail_watch_process_internal(
+ user: Weak<User>,
+ creds: Credentials,
+ mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
+) -> Result<()> {
+ let mut lock_held = k2v_lock_loop(
+ creds.storage.build().await?,
+ storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
+ );
+ let storage = creds.storage.build().await?;
+
+ let mut inbox: Option<Arc<Mailbox>> = None;
+ let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+
+ loop {
+ let maybe_updated_incoming_key = if *lock_held.borrow() {
+ debug!("incoming lock held");
+
+ let wait_new_mail = async {
+ loop {
+ match storage.row_poll(&incoming_key).await {
+ Ok(row_val) => break row_val.row_ref,
+ Err(e) => {
+ error!("Error in wait_new_mail: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ };
+
+ tokio::select! {
+ inc_k = wait_new_mail => Some(inc_k),
+ _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
+ }
+ } else {
+ debug!("incoming lock not held");
+ tokio::select! {
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
+ }
+ };
+
+ let user = match Weak::upgrade(&user) {
+ Some(user) => user,
+ None => {
+ debug!("User no longer available, exiting incoming loop.");
+ break;
+ }
+ };
+ debug!("User still available");
+
+ // If INBOX no longer is same mailbox, open new mailbox
+ let inbox_id = *rx_inbox_id.borrow();
+ if let Some((id, uidvalidity)) = inbox_id {
+ if Some(id) != inbox.as_ref().map(|b| b.id) {
+ match user.open_mailbox_by_id(id, uidvalidity).await {
+ Ok(mb) => {
+ inbox = Some(mb);
+ }
+ Err(e) => {
+ inbox = None;
+ error!("Error when opening inbox ({}): {}", id, e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ continue;
+ }
+ }
+ }
+ }
+
+ // If we were able to open INBOX, and we have mail,
+ // fetch new mail
+ if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
+ match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
+ Ok(()) => {
+ incoming_key = updated_incoming_key;
+ }
+ Err(e) => {
+ error!("Could not fetch incoming mail: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ }
+ }
+ drop(rx_inbox_id);
+ Ok(())
+}
+
+async fn handle_incoming_mail(
+ user: &Arc<User>,
+ storage: &storage::Store,
+ inbox: &Arc<Mailbox>,
+ lock_held: &watch::Receiver<bool>,
+) -> Result<()> {
+ let mails_res = storage.blob_list("incoming/").await?;
+
+ for object in mails_res {
+ if !*lock_held.borrow() {
+ break;
+ }
+ let key = object.0;
+ if let Some(mail_id) = key.strip_prefix("incoming/") {
+ if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
+ move_incoming_message(user, storage, inbox, mail_id).await?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn move_incoming_message(
+ user: &Arc<User>,
+ storage: &storage::Store,
+ inbox: &Arc<Mailbox>,
+ id: UniqueIdent,
+) -> Result<()> {
+ info!("Moving incoming message: {}", id);
+
+ let object_key = format!("incoming/{}", id);
+
+ // 1. Fetch message from S3
+ let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
+
+ // 1.a decrypt message key from headers
+ //info!("Object metadata: {:?}", get_result.metadata);
+ let key_encrypted_b64 = object
+ .meta
+ .get(MESSAGE_KEY)
+ .ok_or(anyhow!("Missing key in metadata"))?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
+ let message_key = sodiumoxide::crypto::sealedbox::open(
+ &key_encrypted,
+ &user.creds.keys.public,
+ &user.creds.keys.secret,
+ )
+ .map_err(|_| anyhow!("Cannot decrypt message key"))?;
+ let message_key =
+ cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
+
+ // 1.b retrieve message body
+ let obj_body = object.value;
+ let plain_mail = cryptoblob::open(&obj_body, &message_key)
+ .map_err(|_| anyhow!("Cannot decrypt email content"))?;
+
+ // 2 parse mail and add to inbox
+ let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
+ inbox
+ .append_from_s3(msg, id, object.blob_ref.clone(), message_key)
+ .await?;
+
+ // 3 delete from incoming
+ storage.blob_rm(&object.blob_ref).await?;
+
+ Ok(())
+}
+
+// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
+
+fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
+ let (held_tx, held_rx) = watch::channel(false);
+
+ tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
+
+ held_rx
+}
+
+#[derive(Clone, Debug)]
+enum LockState {
+ Unknown,
+ Empty,
+ Held(UniqueIdent, u64, storage::RowRef),
+}
+
+async fn k2v_lock_loop_internal(
+ storage: storage::Store,
+ row_ref: storage::RowRef,
+ held_tx: watch::Sender<bool>,
+) {
+ let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
+ let mut state_rx_2 = state_rx.clone();
+
+ let our_pid = gen_ident();
+
+ // Loop 1: watch state of lock in K2V, save that in corresponding watch channel
+ let watch_lock_loop: BoxFuture<Result<()>> = async {
+ let mut ct = row_ref.clone();
+ loop {
+ debug!("k2v watch lock loop iter: ct = {:?}", ct);
+ match storage.row_poll(&ct).await {
+ Err(e) => {
+ error!(
+ "Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
+ e
+ );
+ state_tx.send(LockState::Unknown)?;
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(cv) => {
+ let mut lock_state = None;
+ for v in cv.value.iter() {
+ if let storage::Alternative::Value(vbytes) = v {
+ if vbytes.len() == 32 {
+ let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
+ let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
+ if lock_state
+ .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
+ .unwrap_or(true)
+ {
+ lock_state = Some((pid, ts));
+ }
+ }
+ }
+ }
+ let new_ct = cv.row_ref;
+
+ debug!(
+ "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
+ ct, new_ct, lock_state
+ );
+ state_tx.send(
+ lock_state
+ .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
+ .unwrap_or(LockState::Empty),
+ )?;
+ ct = new_ct;
+ }
+ }
+ }
+ }
+ .boxed();
+
+ // Loop 2: notify user whether we are holding the lock or not
+ let lock_notify_loop: BoxFuture<Result<()>> = async {
+ loop {
+ let now = now_msec();
+ let held_with_expiration_time = match &*state_rx.borrow_and_update() {
+ LockState::Held(pid, ts, _ct) if *pid == our_pid => {
+ let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
+ if now < expiration_time {
+ Some(expiration_time)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ };
+ let held = held_with_expiration_time.is_some();
+ if held != *held_tx.borrow() {
+ held_tx.send(held)?;
+ }
+
+ let await_expired = async {
+ match held_with_expiration_time {
+ None => futures::future::pending().await,
+ Some(expiration_time) => {
+ tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
+ }
+ };
+ };
+
+ tokio::select!(
+ r = state_rx.changed() => {
+ r?;
+ }
+ _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
+ _ = await_expired => continue,
+ );
+ }
+ }
+ .boxed();
+
+ // Loop 3: acquire lock when relevant
+ let take_lock_loop: BoxFuture<Result<()>> = async {
+ loop {
+ let now = now_msec();
+ let state: LockState = state_rx_2.borrow_and_update().clone();
+ let (acquire_at, ct) = match state {
+ LockState::Unknown => {
+ // If state of the lock is unknown, don't try to acquire
+ state_rx_2.changed().await?;
+ continue;
+ }
+ LockState::Empty => (now, None),
+ LockState::Held(pid, ts, ct) => {
+ if pid == our_pid {
+ (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
+ } else {
+ (ts, Some(ct))
+ }
+ }
+ };
+
+ // Wait until it is time to acquire lock
+ if acquire_at > now {
+ tokio::select!(
+ r = state_rx_2.changed() => {
+ // If lock state changed in the meantime, don't acquire and loop around
+ r?;
+ continue;
+ }
+ _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
+ );
+ }
+
+ // Acquire lock
+ let mut lock = vec![0u8; 32];
+ lock[..8].copy_from_slice(&u64::to_be_bytes(
+ now_msec() + LOCK_DURATION.as_millis() as u64,
+ ));
+ lock[8..].copy_from_slice(&our_pid.0);
+ let row = match ct {
+ Some(existing) => existing,
+ None => row_ref.clone(),
+ };
+ if let Err(e) = storage
+ .row_insert(vec![storage::RowVal::new(row, lock)])
+ .await
+ {
+ error!("Could not take lock: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+
+ // Wait for new information to loop back
+ state_rx_2.changed().await?;
+ }
+ }
+ .boxed();
+
+ let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
+
+ debug!("lock loop exited, releasing");
+
+ if !held_tx.is_closed() {
+ warn!("weird...");
+ let _ = held_tx.send(false);
+ }
+
+ // If lock is ours, release it
+ let release = match &*state_rx.borrow() {
+ LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
+ _ => None,
+ };
+ if let Some(ct) = release {
+ match storage.row_rm(&storage::Selector::Single(&ct)).await {
+ Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
+ Ok(_) => (),
+ };
+ }
+}
+
+// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
+
+pub struct EncryptedMessage {
+ key: cryptoblob::Key,
+ encrypted_body: Vec<u8>,
+}
+
+impl EncryptedMessage {
+ pub fn new(body: Vec<u8>) -> Result<Self> {
+ let key = cryptoblob::gen_key();
+ let encrypted_body = cryptoblob::seal(&body, &key)?;
+ Ok(Self {
+ key,
+ encrypted_body,
+ })
+ }
+
+ pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
+ let storage = creds.storage.build().await?;
+
+ // Get causality token of previous watch key
+ let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+ let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
+ Err(_) => query,
+ Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
+ };
+
+ // Write mail to encrypted storage
+ let encrypted_key =
+ sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
+
+ let blob_val = storage::BlobVal::new(
+ storage::BlobRef(format!("incoming/{}", gen_ident())),
+ self.encrypted_body.clone().into(),
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+ storage.blob_insert(blob_val).await?;
+
+ // Update watch key to signal new mail
+ let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
+ storage.row_insert(vec![watch_val]).await?;
+ Ok(())
+ }
+}
diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs
new file mode 100644
index 0000000..bec9669
--- /dev/null
+++ b/aero-collections/src/mail/mailbox.rs
@@ -0,0 +1,525 @@
+use anyhow::{anyhow, bail, Result};
+use serde::{Deserialize, Serialize};
+use tokio::sync::RwLock;
+
+use aero_bayou::timestamp::now_msec;
+use aero_bayou::Bayou;
+use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
+use aero_user::login::Credentials;
+use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
+
+use crate::mail::uidindex::*;
+use crate::mail::IMF;
+use crate::unique_ident::*;
+
+pub struct Mailbox {
+ pub(super) id: UniqueIdent,
+ mbox: RwLock<MailboxInternal>,
+}
+
+impl Mailbox {
+ pub(crate) async fn open(
+ creds: &Credentials,
+ id: UniqueIdent,
+ min_uidvalidity: ImapUidvalidity,
+ ) -> Result<Self> {
+ let index_path = format!("index/{}", id);
+ let mail_path = format!("mail/{}", id);
+
+ let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
+ uid_index.sync().await?;
+
+ let uidvalidity = uid_index.state().uidvalidity;
+ if uidvalidity < min_uidvalidity {
+ uid_index
+ .push(
+ uid_index
+ .state()
+ .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
+ )
+ .await?;
+ }
+
+ // @FIXME reporting through opentelemetry or some logs
+ // info on the "shape" of the mailbox would be welcomed
+ /*
+ dump(&uid_index);
+ */
+
+ let mbox = RwLock::new(MailboxInternal {
+ id,
+ encryption_key: creds.keys.master.clone(),
+ storage: creds.storage.build().await?,
+ uid_index,
+ mail_path,
+ });
+
+ Ok(Self { id, mbox })
+ }
+
+ /// Sync data with backing store
+ pub async fn force_sync(&self) -> Result<()> {
+ self.mbox.write().await.force_sync().await
+ }
+
+ /// Sync data with backing store only if changes are detected
+ /// or last sync is too old
+ pub async fn opportunistic_sync(&self) -> Result<()> {
+ self.mbox.write().await.opportunistic_sync().await
+ }
+
+ /// Block until a sync has been done (due to changes in the event log)
+ pub async fn notify(&self) -> std::sync::Weak<tokio::sync::Notify> {
+ self.mbox.read().await.notifier()
+ }
+
+ // ---- Functions for reading the mailbox ----
+
+ /// Get a clone of the current UID Index of this mailbox
+ /// (cloning is cheap so don't hesitate to use this)
+ pub async fn current_uid_index(&self) -> UidIndex {
+ self.mbox.read().await.uid_index.state().clone()
+ }
+
+ /// Fetch the metadata (headers + some more info) of the specified
+ /// mail IDs
+ pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ self.mbox.read().await.fetch_meta(ids).await
+ }
+
+ /// Fetch an entire e-mail
+ pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ self.mbox.read().await.fetch_full(id, message_key).await
+ }
+
+ pub async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox {
+ super::snapshot::FrozenMailbox::new(self.clone()).await
+ }
+
+ // ---- Functions for changing the mailbox ----
+
+ /// Add flags to message
+ pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ self.mbox.write().await.add_flags(id, flags).await
+ }
+
+ /// Delete flags from message
+ pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ self.mbox.write().await.del_flags(id, flags).await
+ }
+
+ /// Define the new flags for this message
+ pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ self.mbox.write().await.set_flags(id, flags).await
+ }
+
+ /// Insert an email into the mailbox
+ pub async fn append<'a>(
+ &self,
+ msg: IMF<'a>,
+ ident: Option<UniqueIdent>,
+ flags: &[Flag],
+ ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
+ self.mbox.write().await.append(msg, ident, flags).await
+ }
+
+ /// Insert an email into the mailbox, copying it from an existing S3 object
+ pub async fn append_from_s3<'a>(
+ &self,
+ msg: IMF<'a>,
+ ident: UniqueIdent,
+ blob_ref: storage::BlobRef,
+ message_key: Key,
+ ) -> Result<()> {
+ self.mbox
+ .write()
+ .await
+ .append_from_s3(msg, ident, blob_ref, message_key)
+ .await
+ }
+
+ /// Delete a message definitively from the mailbox
+ pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
+ self.mbox.write().await.delete(id).await
+ }
+
+ /// Copy an email from an other Mailbox to this mailbox
+ /// (use this when possible, as it allows for a certain number of storage optimizations)
+ pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<UniqueIdent> {
+ if self.id == from.id {
+ bail!("Cannot copy into same mailbox");
+ }
+
+ let (mut selflock, fromlock);
+ if self.id < from.id {
+ selflock = self.mbox.write().await;
+ fromlock = from.mbox.write().await;
+ } else {
+ fromlock = from.mbox.write().await;
+ selflock = self.mbox.write().await;
+ };
+ selflock.copy_from(&fromlock, uuid).await
+ }
+
+ /// Move an email from an other Mailbox to this mailbox
+ /// (use this when possible, as it allows for a certain number of storage optimizations)
+ pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
+ if self.id == from.id {
+ bail!("Cannot copy move same mailbox");
+ }
+
+ let (mut selflock, mut fromlock);
+ if self.id < from.id {
+ selflock = self.mbox.write().await;
+ fromlock = from.mbox.write().await;
+ } else {
+ fromlock = from.mbox.write().await;
+ selflock = self.mbox.write().await;
+ };
+ selflock.move_from(&mut fromlock, uuid).await
+ }
+}
+
+// ----
+
+// Non standard but common flags:
+// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
+struct MailboxInternal {
+ // 2023-05-15 will probably be used later.
+ #[allow(dead_code)]
+ id: UniqueIdent,
+ mail_path: String,
+ encryption_key: Key,
+ storage: Store,
+ uid_index: Bayou<UidIndex>,
+}
+
+impl MailboxInternal {
+ async fn force_sync(&mut self) -> Result<()> {
+ self.uid_index.sync().await?;
+ Ok(())
+ }
+
+ async fn opportunistic_sync(&mut self) -> Result<()> {
+ self.uid_index.opportunistic_sync().await?;
+ Ok(())
+ }
+
+ fn notifier(&self) -> std::sync::Weak<tokio::sync::Notify> {
+ self.uid_index.notifier()
+ }
+
+ // ---- Functions for reading the mailbox ----
+
+ async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
+ let ops = ids
+ .iter()
+ .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
+ .collect::<Vec<_>>();
+ let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
+
+ let mut meta_vec = vec![];
+ for res in res_vec.into_iter() {
+ let mut meta_opt = None;
+
+ // Resolve conflicts
+ for v in res.value.iter() {
+ match v {
+ storage::Alternative::Tombstone => (),
+ storage::Alternative::Value(v) => {
+ let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
+ match meta_opt.as_mut() {
+ None => {
+ meta_opt = Some(meta);
+ }
+ Some(prevmeta) => {
+ prevmeta.try_merge(meta)?;
+ }
+ }
+ }
+ }
+ }
+ if let Some(meta) = meta_opt {
+ meta_vec.push(meta);
+ } else {
+ bail!("No valid meta value in k2v for {:?}", res.row_ref);
+ }
+ }
+
+ Ok(meta_vec)
+ }
+
+ async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ let obj_res = self
+ .storage
+ .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
+ .await?;
+ let body = obj_res.value;
+ cryptoblob::open(&body, message_key)
+ }
+
+ // ---- Functions for changing the mailbox ----
+
+ async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec());
+ self.uid_index.push(add_flag_op).await
+ }
+
+ async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec());
+ self.uid_index.push(del_flag_op).await
+ }
+
+ async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec());
+ self.uid_index.push(set_flag_op).await
+ }
+
+ async fn append(
+ &mut self,
+ mail: IMF<'_>,
+ ident: Option<UniqueIdent>,
+ flags: &[Flag],
+ ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
+ let ident = ident.unwrap_or_else(gen_ident);
+ let message_key = gen_key();
+
+ futures::try_join!(
+ async {
+ // Encrypt and save mail body
+ let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
+ self.storage
+ .blob_insert(BlobVal::new(
+ BlobRef(format!("{}/{}", self.mail_path, ident)),
+ message_blob,
+ ))
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Save mail meta
+ let meta = MailMeta {
+ internaldate: now_msec(),
+ headers: mail.parsed.raw_headers.to_vec(),
+ message_key: message_key.clone(),
+ rfc822_size: mail.raw.len(),
+ };
+ let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ self.uid_index.opportunistic_sync()
+ )?;
+
+ // Add mail to Bayou mail index
+ let uid_state = self.uid_index.state();
+ let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
+
+ let uidvalidity = uid_state.uidvalidity;
+ let (uid, modseq) = match add_mail_op {
+ UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq),
+ _ => unreachable!(),
+ };
+
+ self.uid_index.push(add_mail_op).await?;
+
+ Ok((uidvalidity, uid, modseq))
+ }
+
+ async fn append_from_s3<'a>(
+ &mut self,
+ mail: IMF<'a>,
+ ident: UniqueIdent,
+ blob_src: storage::BlobRef,
+ message_key: Key,
+ ) -> Result<()> {
+ futures::try_join!(
+ async {
+ // Copy mail body from previous location
+ let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
+ self.storage.blob_copy(&blob_src, &blob_dst).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Save mail meta
+ let meta = MailMeta {
+ internaldate: now_msec(),
+ headers: mail.parsed.raw_headers.to_vec(),
+ message_key: message_key.clone(),
+ rfc822_size: mail.raw.len(),
+ };
+ let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ self.uid_index.opportunistic_sync()
+ )?;
+
+ // Add mail to Bayou mail index
+ let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]);
+ self.uid_index.push(add_mail_op).await?;
+
+ Ok(())
+ }
+
+ async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
+ if !self.uid_index.state().table.contains_key(&ident) {
+ bail!("Cannot delete mail that doesn't exist");
+ }
+
+ let del_mail_op = self.uid_index.state().op_mail_del(ident);
+ self.uid_index.push(del_mail_op).await?;
+
+ futures::try_join!(
+ async {
+ // Delete mail body from S3
+ self.storage
+ .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Delete mail meta from K2V
+ let sk = ident.to_string();
+ let res = self
+ .storage
+ .row_fetch(&storage::Selector::Single(&RowRef::new(
+ &self.mail_path,
+ &sk,
+ )))
+ .await?;
+ if let Some(row_val) = res.into_iter().next() {
+ self.storage
+ .row_rm(&storage::Selector::Single(&row_val.row_ref))
+ .await?;
+ }
+ Ok::<_, anyhow::Error>(())
+ }
+ )?;
+ Ok(())
+ }
+
+ async fn copy_from(
+ &mut self,
+ from: &MailboxInternal,
+ source_id: UniqueIdent,
+ ) -> Result<UniqueIdent> {
+ let new_id = gen_ident();
+ self.copy_internal(from, source_id, new_id).await?;
+ Ok(new_id)
+ }
+
+ async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
+ self.copy_internal(from, id, id).await?;
+ from.delete(id).await?;
+ Ok(())
+ }
+
+ async fn copy_internal(
+ &mut self,
+ from: &MailboxInternal,
+ source_id: UniqueIdent,
+ new_id: UniqueIdent,
+ ) -> Result<()> {
+ if self.encryption_key != from.encryption_key {
+ bail!("Message to be copied/moved does not belong to same account.");
+ }
+
+ let flags = from
+ .uid_index
+ .state()
+ .table
+ .get(&source_id)
+ .ok_or(anyhow!("Source mail not found"))?
+ .2
+ .clone();
+
+ futures::try_join!(
+ async {
+ let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
+ let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
+ self.storage.blob_copy(&src, &dst).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Copy mail meta in K2V
+ let meta = &from.fetch_meta(&[source_id]).await?[0];
+ let meta_blob = seal_serialize(meta, &self.encryption_key)?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &new_id.to_string()),
+ meta_blob,
+ )])
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ self.uid_index.opportunistic_sync(),
+ )?;
+
+ // Add mail to Bayou mail index
+ let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags);
+ self.uid_index.push(add_mail_op).await?;
+
+ Ok(())
+ }
+}
+
+// Can be useful to debug so we want this code
+// to be available to developers
+#[allow(dead_code)]
+fn dump(uid_index: &Bayou<UidIndex>) {
+ let s = uid_index.state();
+ println!("---- MAILBOX STATE ----");
+ println!("UIDVALIDITY {}", s.uidvalidity);
+ println!("UIDNEXT {}", s.uidnext);
+ println!("INTERNALSEQ {}", s.internalseq);
+ for (uid, ident) in s.idx_by_uid.iter() {
+ println!(
+ "{} {} {}",
+ uid,
+ hex::encode(ident.0),
+ s.table.get(ident).cloned().unwrap().2.join(", ")
+ );
+ }
+ println!();
+}
+
+// ----
+
+/// The metadata of a message that is stored in K2V
+/// at pk = mail/<mailbox uuid>, sk = <message uuid>
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MailMeta {
+ /// INTERNALDATE field (milliseconds since epoch)
+ pub internaldate: u64,
+ /// Headers of the message
+ pub headers: Vec<u8>,
+ /// Secret key for decrypting entire message
+ pub message_key: Key,
+ /// RFC822 size
+ pub rfc822_size: usize,
+}
+
+impl MailMeta {
+ fn try_merge(&mut self, other: Self) -> Result<()> {
+ if self.headers != other.headers
+ || self.message_key != other.message_key
+ || self.rfc822_size != other.rfc822_size
+ {
+ bail!("Conflicting MailMeta values.");
+ }
+ self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
+ Ok(())
+ }
+}
diff --git a/aero-collections/src/mail/mod.rs b/aero-collections/src/mail/mod.rs
new file mode 100644
index 0000000..584a9eb
--- /dev/null
+++ b/aero-collections/src/mail/mod.rs
@@ -0,0 +1,24 @@
+pub mod incoming;
+pub mod mailbox;
+pub mod namespace;
+pub mod query;
+pub mod snapshot;
+pub mod uidindex;
+
+// Internet Message Format
+// aka RFC 822 - RFC 2822 - RFC 5322
+// 2023-05-15 don't want to refactor this struct now.
+#[allow(clippy::upper_case_acronyms)]
+pub struct IMF<'a> {
+ raw: &'a [u8],
+ parsed: eml_codec::part::composite::Message<'a>,
+}
+
+impl<'a> TryFrom<&'a [u8]> for IMF<'a> {
+ type Error = ();
+
+ fn try_from(body: &'a [u8]) -> Result<IMF<'a>, ()> {
+ let parsed = eml_codec::parse_message(body).or(Err(()))?.1;
+ Ok(Self { raw: body, parsed })
+ }
+}
diff --git a/aero-collections/src/mail/namespace.rs b/aero-collections/src/mail/namespace.rs
new file mode 100644
index 0000000..0f1db7d
--- /dev/null
+++ b/aero-collections/src/mail/namespace.rs
@@ -0,0 +1,206 @@
+use std::collections::BTreeMap;
+
+use anyhow::{bail, Result};
+use serde::{Deserialize, Serialize};
+
+use aero_bayou::timestamp::now_msec;
+
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::unique_ident::{gen_ident, UniqueIdent};
+
+pub const MAILBOX_HIERARCHY_DELIMITER: char = '.';
+
+/// INBOX is the only mailbox that must always exist.
+/// It is created automatically when the account is created.
+/// IMAP allows the user to rename INBOX to something else,
+/// in this case all messages from INBOX are moved to a mailbox
+/// with the new name and the INBOX mailbox still exists and is empty.
+/// In our implementation, we indeed move the underlying mailbox
+/// to the new name (i.e. the new name has the same id as the previous
+/// INBOX), and we create a new empty mailbox for INBOX.
+pub const INBOX: &str = "INBOX";
+
+/// For convenience purpose, we also create some special mailbox
+/// that are described in RFC6154 SPECIAL-USE
+/// @FIXME maybe it should be a configuration parameter
+/// @FIXME maybe we should have a per-mailbox flag mechanism, either an enum or a string, so we
+/// track which mailbox is used for what.
+/// @FIXME Junk could be useful but we don't have any antispam solution yet so...
+/// @FIXME IMAP supports virtual mailbox. \All or \Flagged are intended to be virtual mailboxes.
+/// \Trash might be one, or not one. I don't know what we should do there.
+pub const DRAFTS: &str = "Drafts";
+pub const ARCHIVE: &str = "Archive";
+pub const SENT: &str = "Sent";
+pub const TRASH: &str = "Trash";
+
+pub(crate) const MAILBOX_LIST_PK: &str = "mailboxes";
+pub(crate) const MAILBOX_LIST_SK: &str = "list";
+
+// ---- User's mailbox list (serialized in K2V) ----
+
+#[derive(Serialize, Deserialize)]
+pub(crate) struct MailboxList(BTreeMap<String, MailboxListEntry>);
+
+#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
+pub(crate) struct MailboxListEntry {
+ id_lww: (u64, Option<UniqueIdent>),
+ uidvalidity: ImapUidvalidity,
+}
+
+impl MailboxListEntry {
+ fn merge(&mut self, other: &Self) {
+ // Simple CRDT merge rule
+ if other.id_lww.0 > self.id_lww.0
+ || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
+ {
+ self.id_lww = other.id_lww;
+ }
+ self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity);
+ }
+}
+
+impl MailboxList {
+ pub(crate) fn new() -> Self {
+ Self(BTreeMap::new())
+ }
+
+ pub(crate) fn merge(&mut self, list2: Self) {
+ for (k, v) in list2.0.into_iter() {
+ if let Some(e) = self.0.get_mut(&k) {
+ e.merge(&v);
+ } else {
+ self.0.insert(k, v);
+ }
+ }
+ }
+
+ pub(crate) fn existing_mailbox_names(&self) -> Vec<String> {
+ self.0
+ .iter()
+ .filter(|(_, v)| v.id_lww.1.is_some())
+ .map(|(k, _)| k.to_string())
+ .collect()
+ }
+
+ pub(crate) fn has_mailbox(&self, name: &str) -> bool {
+ matches!(
+ self.0.get(name),
+ Some(MailboxListEntry {
+ id_lww: (_, Some(_)),
+ ..
+ })
+ )
+ }
+
+ pub(crate) fn get_mailbox(&self, name: &str) -> Option<(ImapUidvalidity, Option<UniqueIdent>)> {
+ self.0.get(name).map(
+ |MailboxListEntry {
+ id_lww: (_, mailbox_id),
+ uidvalidity,
+ }| (*uidvalidity, *mailbox_id),
+ )
+ }
+
+ /// Ensures mailbox `name` maps to id `id`.
+ /// If it already mapped to that, returns None.
+ /// If a change had to be done, returns Some(new uidvalidity in mailbox).
+ pub(crate) fn set_mailbox(
+ &mut self,
+ name: &str,
+ id: Option<UniqueIdent>,
+ ) -> Option<ImapUidvalidity> {
+ let (ts, id, uidvalidity) = match self.0.get_mut(name) {
+ None => {
+ if id.is_none() {
+ return None;
+ } else {
+ (now_msec(), id, ImapUidvalidity::new(1).unwrap())
+ }
+ }
+ Some(MailboxListEntry {
+ id_lww,
+ uidvalidity,
+ }) => {
+ if id_lww.1 == id {
+ return None;
+ } else {
+ (
+ std::cmp::max(id_lww.0 + 1, now_msec()),
+ id,
+ ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(),
+ )
+ }
+ }
+ };
+
+ self.0.insert(
+ name.into(),
+ MailboxListEntry {
+ id_lww: (ts, id),
+ uidvalidity,
+ },
+ );
+ Some(uidvalidity)
+ }
+
+ pub(crate) fn update_uidvalidity(&mut self, name: &str, new_uidvalidity: ImapUidvalidity) {
+ match self.0.get_mut(name) {
+ None => {
+ self.0.insert(
+ name.into(),
+ MailboxListEntry {
+ id_lww: (now_msec(), None),
+ uidvalidity: new_uidvalidity,
+ },
+ );
+ }
+ Some(MailboxListEntry { uidvalidity, .. }) => {
+ *uidvalidity = std::cmp::max(*uidvalidity, new_uidvalidity);
+ }
+ }
+ }
+
+ pub(crate) fn create_mailbox(&mut self, name: &str) -> CreatedMailbox {
+ if let Some(MailboxListEntry {
+ id_lww: (_, Some(id)),
+ uidvalidity,
+ }) = self.0.get(name)
+ {
+ return CreatedMailbox::Existed(*id, *uidvalidity);
+ }
+
+ let id = gen_ident();
+ let uidvalidity = self.set_mailbox(name, Some(id)).unwrap();
+ CreatedMailbox::Created(id, uidvalidity)
+ }
+
+ pub(crate) fn rename_mailbox(&mut self, old_name: &str, new_name: &str) -> Result<()> {
+ if let Some((uidvalidity, Some(mbid))) = self.get_mailbox(old_name) {
+ if self.has_mailbox(new_name) {
+ bail!(
+ "Cannot rename {} into {}: {} already exists",
+ old_name,
+ new_name,
+ new_name
+ );
+ }
+
+ self.set_mailbox(old_name, None);
+ self.set_mailbox(new_name, Some(mbid));
+ self.update_uidvalidity(new_name, uidvalidity);
+ Ok(())
+ } else {
+ bail!(
+ "Cannot rename {} into {}: {} doesn't exist",
+ old_name,
+ new_name,
+ old_name
+ );
+ }
+ }
+}
+
+pub(crate) enum CreatedMailbox {
+ Created(UniqueIdent, ImapUidvalidity),
+ Existed(UniqueIdent, ImapUidvalidity),
+}
diff --git a/aero-collections/src/mail/query.rs b/aero-collections/src/mail/query.rs
new file mode 100644
index 0000000..7faba41
--- /dev/null
+++ b/aero-collections/src/mail/query.rs
@@ -0,0 +1,137 @@
+use super::mailbox::MailMeta;
+use super::snapshot::FrozenMailbox;
+use crate::unique_ident::UniqueIdent;
+use anyhow::Result;
+use futures::future::FutureExt;
+use futures::stream::{BoxStream, Stream, StreamExt};
+
+/// Query is in charge of fetching efficiently
+/// requested data for a list of emails
+pub struct Query<'a, 'b> {
+ pub frozen: &'a FrozenMailbox,
+ pub emails: &'b [UniqueIdent],
+ pub scope: QueryScope,
+}
+
+#[derive(Debug)]
+pub enum QueryScope {
+ Index,
+ Partial,
+ Full,
+}
+impl QueryScope {
+ pub fn union(&self, other: &QueryScope) -> QueryScope {
+ match (self, other) {
+ (QueryScope::Full, _) | (_, QueryScope::Full) => QueryScope::Full,
+ (QueryScope::Partial, _) | (_, QueryScope::Partial) => QueryScope::Partial,
+ (QueryScope::Index, QueryScope::Index) => QueryScope::Index,
+ }
+ }
+}
+
+//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>;
+
+impl<'a, 'b> Query<'a, 'b> {
+ pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
+ match self.scope {
+ QueryScope::Index => Box::pin(
+ futures::stream::iter(self.emails)
+ .map(|&uuid| Ok(QueryResult::IndexResult { uuid })),
+ ),
+ QueryScope::Partial => Box::pin(self.partial()),
+ QueryScope::Full => Box::pin(self.full()),
+ }
+ }
+
+ // --- functions below are private *for reasons*
+ fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+ async move {
+ let maybe_meta_list: Result<Vec<MailMeta>> =
+ self.frozen.mailbox.fetch_meta(self.emails).await;
+ let list_res = maybe_meta_list
+ .map(|meta_list| {
+ meta_list
+ .into_iter()
+ .zip(self.emails)
+ .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
+ .collect()
+ })
+ .unwrap_or_else(|e| vec![Err(e)]);
+
+ futures::stream::iter(list_res)
+ }
+ .flatten_stream()
+ }
+
+ fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+ self.partial().then(move |maybe_meta| async move {
+ let meta = maybe_meta?;
+
+ let content = self
+ .frozen
+ .mailbox
+ .fetch_full(
+ *meta.uuid(),
+ &meta
+ .metadata()
+ .expect("meta to be PartialResult")
+ .message_key,
+ )
+ .await?;
+
+ Ok(meta.into_full(content).expect("meta to be PartialResult"))
+ })
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum QueryResult {
+ IndexResult {
+ uuid: UniqueIdent,
+ },
+ PartialResult {
+ uuid: UniqueIdent,
+ metadata: MailMeta,
+ },
+ FullResult {
+ uuid: UniqueIdent,
+ metadata: MailMeta,
+ content: Vec<u8>,
+ },
+}
+impl QueryResult {
+ pub fn uuid(&self) -> &UniqueIdent {
+ match self {
+ Self::IndexResult { uuid, .. } => uuid,
+ Self::PartialResult { uuid, .. } => uuid,
+ Self::FullResult { uuid, .. } => uuid,
+ }
+ }
+
+ pub fn metadata(&self) -> Option<&MailMeta> {
+ match self {
+ Self::IndexResult { .. } => None,
+ Self::PartialResult { metadata, .. } => Some(metadata),
+ Self::FullResult { metadata, .. } => Some(metadata),
+ }
+ }
+
+ #[allow(dead_code)]
+ pub fn content(&self) -> Option<&[u8]> {
+ match self {
+ Self::FullResult { content, .. } => Some(content),
+ _ => None,
+ }
+ }
+
+ fn into_full(self, content: Vec<u8>) -> Option<Self> {
+ match self {
+ Self::PartialResult { uuid, metadata } => Some(Self::FullResult {
+ uuid,
+ metadata,
+ content,
+ }),
+ _ => None,
+ }
+ }
+}
diff --git a/aero-collections/src/mail/snapshot.rs b/aero-collections/src/mail/snapshot.rs
new file mode 100644
index 0000000..6f8a8a8
--- /dev/null
+++ b/aero-collections/src/mail/snapshot.rs
@@ -0,0 +1,60 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+
+use super::mailbox::Mailbox;
+use super::query::{Query, QueryScope};
+use super::uidindex::UidIndex;
+use crate::unique_ident::UniqueIdent;
+
+/// A Frozen Mailbox has a snapshot of the current mailbox
+/// state that is desynchronized with the real mailbox state.
+/// It's up to the user to choose when their snapshot must be updated
+/// to give useful information to their clients
+pub struct FrozenMailbox {
+ pub mailbox: Arc<Mailbox>,
+ pub snapshot: UidIndex,
+}
+
+impl FrozenMailbox {
+ /// Create a snapshot from a mailbox, the mailbox + the snapshot
+ /// becomes the "Frozen Mailbox".
+ pub async fn new(mailbox: Arc<Mailbox>) -> Self {
+ let state = mailbox.current_uid_index().await;
+
+ Self {
+ mailbox,
+ snapshot: state,
+ }
+ }
+
+ /// Force the synchronization of the inner mailbox
+ /// but do not update the local snapshot
+ pub async fn sync(&self) -> Result<()> {
+ self.mailbox.opportunistic_sync().await
+ }
+
+ /// Peek snapshot without updating the frozen mailbox
+ /// Can be useful if you want to plan some writes
+ /// while sending a diff to the client later
+ pub async fn peek(&self) -> UidIndex {
+ self.mailbox.current_uid_index().await
+ }
+
+ /// Update the FrozenMailbox local snapshot.
+ /// Returns the old snapshot, so you can build a diff
+ pub async fn update(&mut self) -> UidIndex {
+ let old_snapshot = self.snapshot.clone();
+ self.snapshot = self.mailbox.current_uid_index().await;
+
+ old_snapshot
+ }
+
+ pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> {
+ Query {
+ frozen: self,
+ emails: uuids,
+ scope,
+ }
+ }
+}
diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs
new file mode 100644
index 0000000..6df3206
--- /dev/null
+++ b/aero-collections/src/mail/uidindex.rs
@@ -0,0 +1,474 @@
+use std::num::{NonZeroU32, NonZeroU64};
+
+use im::{HashMap, OrdMap, OrdSet};
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::unique_ident::UniqueIdent;
+use aero_bayou::*;
+
+pub type ModSeq = NonZeroU64;
+pub type ImapUid = NonZeroU32;
+pub type ImapUidvalidity = NonZeroU32;
+pub type Flag = String;
+pub type IndexEntry = (ImapUid, ModSeq, Vec<Flag>);
+
+/// A UidIndex handles the mutable part of a mailbox
+/// It is built by running the event log on it
+/// Each applied log generates a new UidIndex by cloning the previous one
+/// and applying the event. This is why we use immutable datastructures:
+/// they are cheap to clone.
+#[derive(Clone)]
+pub struct UidIndex {
+ // Source of trust
+ pub table: OrdMap<UniqueIdent, IndexEntry>,
+
+ // Indexes optimized for queries
+ pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>,
+ pub idx_by_modseq: OrdMap<ModSeq, UniqueIdent>,
+ pub idx_by_flag: FlagIndex,
+
+ // "Public" Counters
+ pub uidvalidity: ImapUidvalidity,
+ pub uidnext: ImapUid,
+ pub highestmodseq: ModSeq,
+
+ // "Internal" Counters
+ pub internalseq: ImapUid,
+ pub internalmodseq: ModSeq,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub enum UidIndexOp {
+ MailAdd(UniqueIdent, ImapUid, ModSeq, Vec<Flag>),
+ MailDel(UniqueIdent),
+ FlagAdd(UniqueIdent, ModSeq, Vec<Flag>),
+ FlagDel(UniqueIdent, ModSeq, Vec<Flag>),
+ FlagSet(UniqueIdent, ModSeq, Vec<Flag>),
+ BumpUidvalidity(u32),
+}
+
+impl UidIndex {
+ #[must_use]
+ pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags)
+ }
+
+ #[must_use]
+ pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp {
+ UidIndexOp::MailDel(ident)
+ }
+
+ #[must_use]
+ pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagAdd(ident, self.internalmodseq, flags)
+ }
+
+ #[must_use]
+ pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagDel(ident, self.internalmodseq, flags)
+ }
+
+ #[must_use]
+ pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagSet(ident, self.internalmodseq, flags)
+ }
+
+ #[must_use]
+ pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp {
+ UidIndexOp::BumpUidvalidity(count)
+ }
+
+ // INTERNAL functions to keep state consistent
+
+ fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) {
+ // Insert the email in our table
+ self.table.insert(ident, (uid, modseq, flags.to_owned()));
+
+ // Update the indexes/caches
+ self.idx_by_uid.insert(uid, ident);
+ self.idx_by_flag.insert(uid, flags);
+ self.idx_by_modseq.insert(modseq, ident);
+ }
+
+ fn unreg_email(&mut self, ident: &UniqueIdent) {
+ // We do nothing if the mail does not exist
+ let (uid, modseq, flags) = match self.table.get(ident) {
+ Some(v) => v,
+ None => return,
+ };
+
+ // Delete all cache entries
+ self.idx_by_uid.remove(uid);
+ self.idx_by_flag.remove(*uid, flags);
+ self.idx_by_modseq.remove(modseq);
+
+ // Remove from source of trust
+ self.table.remove(ident);
+ }
+}
+
+impl Default for UidIndex {
+ fn default() -> Self {
+ Self {
+ table: OrdMap::new(),
+
+ idx_by_uid: OrdMap::new(),
+ idx_by_modseq: OrdMap::new(),
+ idx_by_flag: FlagIndex::new(),
+
+ uidvalidity: NonZeroU32::new(1).unwrap(),
+ uidnext: NonZeroU32::new(1).unwrap(),
+ highestmodseq: NonZeroU64::new(1).unwrap(),
+
+ internalseq: NonZeroU32::new(1).unwrap(),
+ internalmodseq: NonZeroU64::new(1).unwrap(),
+ }
+ }
+}
+
+impl BayouState for UidIndex {
+ type Op = UidIndexOp;
+
+ fn apply(&self, op: &UidIndexOp) -> Self {
+ let mut new = self.clone();
+ match op {
+ UidIndexOp::MailAdd(ident, uid, modseq, flags) => {
+ // Change UIDValidity if there is a UID conflict or a MODSEQ conflict
+ // @FIXME Need to prove that summing work
+ // The intuition: we increase the UIDValidity by the number of possible conflicts
+ if *uid < new.internalseq || *modseq < new.internalmodseq {
+ let bump_uid = new.internalseq.get() - uid.get();
+ let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
+ new.uidvalidity =
+ NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap();
+ }
+
+ // Assign the real uid of the email
+ let new_uid = new.internalseq;
+
+ // Assign the real modseq of the email and its new flags
+ let new_modseq = new.internalmodseq;
+
+ // Delete the previous entry if any.
+ // Our proof has no assumption on `ident` uniqueness,
+ // so we must handle this case even it is very unlikely
+ // In this case, we overwrite the email.
+ // Note: assigning a new UID is mandatory.
+ new.unreg_email(ident);
+
+ // We record our email and update ou caches
+ new.reg_email(*ident, new_uid, new_modseq, flags);
+
+ // Update counters
+ new.highestmodseq = new.internalmodseq;
+
+ new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
+ new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+
+ new.uidnext = new.internalseq;
+ }
+ UidIndexOp::MailDel(ident) => {
+ // If the email is known locally, we remove its references in all our indexes
+ new.unreg_email(ident);
+
+ // We update the counter
+ new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
+ }
+ UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => {
+ if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
+ // Bump UIDValidity if required
+ if *candidate_modseq < new.internalmodseq {
+ let bump_modseq =
+ (new.internalmodseq.get() - candidate_modseq.get()) as u32;
+ new.uidvalidity =
+ NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
+ }
+
+ // Add flags to the source of trust and the cache
+ let mut to_add: Vec<Flag> = new_flags
+ .iter()
+ .filter(|f| !existing_flags.contains(f))
+ .cloned()
+ .collect();
+ new.idx_by_flag.insert(*uid, &to_add);
+ *email_modseq = new.internalmodseq;
+ new.idx_by_modseq.insert(new.internalmodseq, *ident);
+ existing_flags.append(&mut to_add);
+
+ // Update counters
+ new.highestmodseq = new.internalmodseq;
+ new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+ }
+ }
+ UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => {
+ if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
+ // Bump UIDValidity if required
+ if *candidate_modseq < new.internalmodseq {
+ let bump_modseq =
+ (new.internalmodseq.get() - candidate_modseq.get()) as u32;
+ new.uidvalidity =
+ NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
+ }
+
+ // Remove flags from the source of trust and the cache
+ existing_flags.retain(|x| !rm_flags.contains(x));
+ new.idx_by_flag.remove(*uid, rm_flags);
+
+ // Register that email has been modified
+ new.idx_by_modseq.insert(new.internalmodseq, *ident);
+ *email_modseq = new.internalmodseq;
+
+ // Update counters
+ new.highestmodseq = new.internalmodseq;
+ new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+ }
+ }
+ UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => {
+ if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
+ // Bump UIDValidity if required
+ if *candidate_modseq < new.internalmodseq {
+ let bump_modseq =
+ (new.internalmodseq.get() - candidate_modseq.get()) as u32;
+ new.uidvalidity =
+ NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
+ }
+
+ // Remove flags from the source of trust and the cache
+ let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags
+ .iter()
+ .cloned()
+ .partition(|x| new_flags.contains(x));
+ *existing_flags = keep_flags;
+ let mut to_add: Vec<Flag> = new_flags
+ .iter()
+ .filter(|f| !existing_flags.contains(f))
+ .cloned()
+ .collect();
+ existing_flags.append(&mut to_add);
+ new.idx_by_flag.remove(*uid, &rm_flags);
+ new.idx_by_flag.insert(*uid, &to_add);
+
+ // Register that email has been modified
+ new.idx_by_modseq.insert(new.internalmodseq, *ident);
+ *email_modseq = new.internalmodseq;
+
+ // Update counters
+ new.highestmodseq = new.internalmodseq;
+ new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
+ }
+ }
+ UidIndexOp::BumpUidvalidity(count) => {
+ new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count)
+ .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap());
+ }
+ }
+ new
+ }
+}
+
+// ---- FlagIndex implementation ----
+
+#[derive(Clone)]
+pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>);
+pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet<ImapUid>>;
+
+impl FlagIndex {
+ fn new() -> Self {
+ Self(HashMap::new())
+ }
+ fn insert(&mut self, uid: ImapUid, flags: &[Flag]) {
+ flags.iter().for_each(|flag| {
+ self.0
+ .entry(flag.clone())
+ .or_insert(OrdSet::new())
+ .insert(uid);
+ });
+ }
+ fn remove(&mut self, uid: ImapUid, flags: &[Flag]) {
+ for flag in flags.iter() {
+ if let Some(set) = self.0.get_mut(flag) {
+ set.remove(&uid);
+ if set.is_empty() {
+ self.0.remove(flag);
+ }
+ }
+ }
+ }
+
+ pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> {
+ self.0.get(f)
+ }
+
+ pub fn flags(&self) -> FlagIter {
+ self.0.keys()
+ }
+}
+
+// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ----
+
+#[derive(Serialize, Deserialize)]
+struct UidIndexSerializedRepr {
+ mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec<Flag>)>,
+
+ uidvalidity: ImapUidvalidity,
+ uidnext: ImapUid,
+ highestmodseq: ModSeq,
+
+ internalseq: ImapUid,
+ internalmodseq: ModSeq,
+}
+
+impl<'de> Deserialize<'de> for UidIndex {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?;
+
+ let mut uidindex = UidIndex {
+ table: OrdMap::new(),
+
+ idx_by_uid: OrdMap::new(),
+ idx_by_modseq: OrdMap::new(),
+ idx_by_flag: FlagIndex::new(),
+
+ uidvalidity: val.uidvalidity,
+ uidnext: val.uidnext,
+ highestmodseq: val.highestmodseq,
+
+ internalseq: val.internalseq,
+ internalmodseq: val.internalmodseq,
+ };
+
+ val.mails
+ .iter()
+ .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags));
+
+ Ok(uidindex)
+ }
+}
+
+impl Serialize for UidIndex {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let mut mails = vec![];
+ for (ident, (uid, modseq, flags)) in self.table.iter() {
+ mails.push((*uid, *modseq, *ident, flags.clone()));
+ }
+
+ let val = UidIndexSerializedRepr {
+ mails,
+ uidvalidity: self.uidvalidity,
+ uidnext: self.uidnext,
+ highestmodseq: self.highestmodseq,
+ internalseq: self.internalseq,
+ internalmodseq: self.internalmodseq,
+ };
+
+ val.serialize(serializer)
+ }
+}
+
+// ---- TESTS ----
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_uidindex() {
+ let mut state = UidIndex::default();
+
+ // Add message 1
+ {
+ let m = UniqueIdent([0x01; 24]);
+ let f = vec!["\\Recent".to_string(), "\\Archive".to_string()];
+ let ev = state.op_mail_add(m, f);
+ state = state.apply(&ev);
+
+ // Early checks
+ assert_eq!(state.table.len(), 1);
+ let (uid, modseq, flags) = state.table.get(&m).unwrap();
+ assert_eq!(*uid, NonZeroU32::new(1).unwrap());
+ assert_eq!(*modseq, NonZeroU64::new(1).unwrap());
+ assert_eq!(flags.len(), 2);
+ let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap();
+ assert_eq!(&m, ident);
+ let recent = state.idx_by_flag.0.get("\\Recent").unwrap();
+ assert_eq!(recent.len(), 1);
+ assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap());
+ assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap());
+ assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap());
+ }
+
+ // Add message 2
+ {
+ let m = UniqueIdent([0x02; 24]);
+ let f = vec!["\\Seen".to_string(), "\\Archive".to_string()];
+ let ev = state.op_mail_add(m, f);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ }
+
+ // Add flags to message 1
+ {
+ let m = UniqueIdent([0x01; 24]);
+ let f = vec!["Important".to_string(), "$cl_1".to_string()];
+ let ev = state.op_flag_add(m, f);
+ state = state.apply(&ev);
+ }
+
+ // Delete flags from message 1
+ {
+ let m = UniqueIdent([0x01; 24]);
+ let f = vec!["\\Recent".to_string()];
+ let ev = state.op_flag_del(m, f);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ }
+
+ // Delete message 2
+ {
+ let m = UniqueIdent([0x02; 24]);
+ let ev = state.op_mail_del(m);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 1);
+ }
+
+ // Add a message 3 concurrent to message 1 (trigger a uid validity change)
+ {
+ let m = UniqueIdent([0x03; 24]);
+ let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
+ let ev = UidIndexOp::MailAdd(
+ m,
+ NonZeroU32::new(1).unwrap(),
+ NonZeroU64::new(1).unwrap(),
+ f,
+ );
+ state = state.apply(&ev);
+ }
+
+ // Checks
+ {
+ assert_eq!(state.table.len(), 2);
+ assert!(state.uidvalidity > NonZeroU32::new(1).unwrap());
+
+ let (last_uid, ident) = state.idx_by_uid.get_max().unwrap();
+ assert_eq!(ident, &UniqueIdent([0x03; 24]));
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ let mut iter = archive.iter();
+ assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap());
+ assert_eq!(iter.next().unwrap(), last_uid);
+ }
+ }
+}
diff --git a/aero-collections/src/unique_ident.rs b/aero-collections/src/unique_ident.rs
new file mode 100644
index 0000000..e4eea7a
--- /dev/null
+++ b/aero-collections/src/unique_ident.rs
@@ -0,0 +1,101 @@
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+
+use lazy_static::lazy_static;
+use rand::prelude::*;
+use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
+
+use aero_bayou::timestamp::now_msec;
+
+/// An internal Aerogramme identifier is composed of two components:
+/// - a process identifier, 128 bits, itself composed of:
+/// - the timestamp of when the process started, 64 bits
+/// - a 64-bit random number
+/// - a sequence number, 64 bits
+/// They are not part of the protocol but an internal representation
+/// required by Aerogramme.
+/// Their main property is to be unique without having to rely
+/// on synchronization between (IMAP) processes.
+#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
+pub struct UniqueIdent(pub [u8; 24]);
+
+struct IdentGenerator {
+ pid: u128,
+ sn: AtomicU64,
+}
+
+impl IdentGenerator {
+ fn new() -> Self {
+ let time = now_msec() as u128;
+ let rand = thread_rng().gen::<u64>() as u128;
+ Self {
+ pid: (time << 64) | rand,
+ sn: AtomicU64::new(0),
+ }
+ }
+
+ fn gen(&self) -> UniqueIdent {
+ let sn = self.sn.fetch_add(1, Ordering::Relaxed);
+ let mut res = [0u8; 24];
+ res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid));
+ res[16..24].copy_from_slice(&u64::to_be_bytes(sn));
+ UniqueIdent(res)
+ }
+}
+
+lazy_static! {
+ static ref GENERATOR: IdentGenerator = IdentGenerator::new();
+}
+
+pub fn gen_ident() -> UniqueIdent {
+ GENERATOR.gen()
+}
+
+// -- serde --
+
+impl<'de> Deserialize<'de> for UniqueIdent {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let v = String::deserialize(d)?;
+ UniqueIdent::from_str(&v).map_err(D::Error::custom)
+ }
+}
+
+impl Serialize for UniqueIdent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&self.to_string())
+ }
+}
+
+impl std::fmt::Display for UniqueIdent {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", hex::encode(self.0))
+ }
+}
+
+impl std::fmt::Debug for UniqueIdent {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", hex::encode(self.0))
+ }
+}
+
+impl FromStr for UniqueIdent {
+ type Err = &'static str;
+
+ fn from_str(s: &str) -> Result<UniqueIdent, &'static str> {
+ let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
+
+ if bytes.len() != 24 {
+ return Err("bad length");
+ }
+
+ let mut tmp = [0u8; 24];
+ tmp[..].copy_from_slice(&bytes);
+ Ok(UniqueIdent(tmp))
+ }
+}
diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs
new file mode 100644
index 0000000..f125c46
--- /dev/null
+++ b/aero-collections/src/user.rs
@@ -0,0 +1,327 @@
+use std::collections::HashMap;
+use std::sync::{Arc, Weak};
+
+use anyhow::{anyhow, bail, Result};
+use lazy_static::lazy_static;
+use tokio::sync::watch;
+
+use aero_user::cryptoblob::{open_deserialize, seal_serialize};
+use aero_user::login::Credentials;
+use aero_user::storage;
+
+use crate::calendar::namespace::CalendarNs;
+use crate::mail::incoming::incoming_mail_watch_process;
+use crate::mail::mailbox::Mailbox;
+use crate::mail::namespace::{
+ CreatedMailbox, MailboxList, ARCHIVE, DRAFTS, INBOX, MAILBOX_HIERARCHY_DELIMITER,
+ MAILBOX_LIST_PK, MAILBOX_LIST_SK, SENT, TRASH,
+};
+use crate::mail::uidindex::ImapUidvalidity;
+use crate::unique_ident::UniqueIdent;
+
+//@FIXME User should be totally rewriten
+// to extract the local mailbox list
+// to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace)
+
+//@FIXME User should be run in a LocalSet
+// to remove most - if not all - synchronizations types.
+// Especially RwLock & co.
+
+pub struct User {
+ pub username: String,
+ pub creds: Credentials,
+ pub storage: storage::Store,
+ pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
+ pub calendars: CalendarNs,
+
+ // Handle on worker processing received email
+ // (moving emails from the mailqueue to the user's INBOX)
+ tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
+}
+
+impl User {
+ pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
+ let cache_key = (username.clone(), creds.storage.unique());
+
+ {
+ let cache = USER_CACHE.lock().unwrap();
+ if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) {
+ return Ok(u);
+ }
+ }
+
+ let user = Self::open(username, creds).await?;
+
+ let mut cache = USER_CACHE.lock().unwrap();
+ if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) {
+ drop(user);
+ Ok(concurrent_user)
+ } else {
+ cache.insert(cache_key, Arc::downgrade(&user));
+ Ok(user)
+ }
+ }
+
+ /// Lists user's available mailboxes
+ pub async fn list_mailboxes(&self) -> Result<Vec<String>> {
+ let (list, _ct) = self.load_mailbox_list().await?;
+ Ok(list.existing_mailbox_names())
+ }
+
+ /// Opens an existing mailbox given its IMAP name.
+ pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
+ let (mut list, ct) = self.load_mailbox_list().await?;
+
+ //@FIXME it could be a trace or an opentelemtry trace thing.
+ // Be careful to not leak sensible data
+ /*
+ eprintln!("List of mailboxes:");
+ for ent in list.0.iter() {
+ eprintln!(" - {:?}", ent);
+ }
+ */
+
+ if let Some((uidvalidity, Some(mbid))) = list.get_mailbox(name) {
+ let mb = self.open_mailbox_by_id(mbid, uidvalidity).await?;
+ let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
+ if mb_uidvalidity > uidvalidity {
+ list.update_uidvalidity(name, mb_uidvalidity);
+ self.save_mailbox_list(&list, ct).await?;
+ }
+ Ok(Some(mb))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Check whether mailbox exists
+ pub async fn has_mailbox(&self, name: &str) -> Result<bool> {
+ let (list, _ct) = self.load_mailbox_list().await?;
+ Ok(list.has_mailbox(name))
+ }
+
+ /// Creates a new mailbox in the user's IMAP namespace.
+ pub async fn create_mailbox(&self, name: &str) -> Result<()> {
+ if name.ends_with(MAILBOX_HIERARCHY_DELIMITER) {
+ bail!("Invalid mailbox name: {}", name);
+ }
+
+ let (mut list, ct) = self.load_mailbox_list().await?;
+ match list.create_mailbox(name) {
+ CreatedMailbox::Created(_, _) => {
+ self.save_mailbox_list(&list, ct).await?;
+ Ok(())
+ }
+ CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
+ }
+ }
+
+ /// Deletes a mailbox in the user's IMAP namespace.
+ pub async fn delete_mailbox(&self, name: &str) -> Result<()> {
+ if name == INBOX {
+ bail!("Cannot delete INBOX");
+ }
+
+ let (mut list, ct) = self.load_mailbox_list().await?;
+ if list.has_mailbox(name) {
+ //@TODO: actually delete mailbox contents
+ list.set_mailbox(name, None);
+ self.save_mailbox_list(&list, ct).await?;
+ Ok(())
+ } else {
+ bail!("Mailbox {} does not exist", name);
+ }
+ }
+
+ /// Renames a mailbox in the user's IMAP namespace.
+ pub async fn rename_mailbox(&self, old_name: &str, new_name: &str) -> Result<()> {
+ let (mut list, ct) = self.load_mailbox_list().await?;
+
+ if old_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) {
+ bail!("Invalid mailbox name: {}", old_name);
+ }
+ if new_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) {
+ bail!("Invalid mailbox name: {}", new_name);
+ }
+
+ if old_name == INBOX {
+ list.rename_mailbox(old_name, new_name)?;
+ if !self.ensure_inbox_exists(&mut list, &ct).await? {
+ self.save_mailbox_list(&list, ct).await?;
+ }
+ } else {
+ let names = list.existing_mailbox_names();
+
+ let old_name_w_delim = format!("{}{}", old_name, MAILBOX_HIERARCHY_DELIMITER);
+ let new_name_w_delim = format!("{}{}", new_name, MAILBOX_HIERARCHY_DELIMITER);
+
+ if names
+ .iter()
+ .any(|x| x == new_name || x.starts_with(&new_name_w_delim))
+ {
+ bail!("Mailbox {} already exists", new_name);
+ }
+
+ for name in names.iter() {
+ if name == old_name {
+ list.rename_mailbox(name, new_name)?;
+ } else if let Some(tail) = name.strip_prefix(&old_name_w_delim) {
+ let nnew = format!("{}{}", new_name_w_delim, tail);
+ list.rename_mailbox(name, &nnew)?;
+ }
+ }
+
+ self.save_mailbox_list(&list, ct).await?;
+ }
+ Ok(())
+ }
+
+ // ---- Internal user & mailbox management ----
+
+ async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
+ let storage = creds.storage.build().await?;
+
+ let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
+
+ let user = Arc::new(Self {
+ username,
+ creds: creds.clone(),
+ storage,
+ tx_inbox_id,
+ mailboxes: std::sync::Mutex::new(HashMap::new()),
+ calendars: CalendarNs::new(),
+ });
+
+ // Ensure INBOX exists (done inside load_mailbox_list)
+ user.load_mailbox_list().await?;
+
+ tokio::spawn(incoming_mail_watch_process(
+ Arc::downgrade(&user),
+ user.creds.clone(),
+ rx_inbox_id,
+ ));
+
+ Ok(user)
+ }
+
+ pub(super) async fn open_mailbox_by_id(
+ &self,
+ id: UniqueIdent,
+ min_uidvalidity: ImapUidvalidity,
+ ) -> Result<Arc<Mailbox>> {
+ {
+ let cache = self.mailboxes.lock().unwrap();
+ if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) {
+ return Ok(mb);
+ }
+ }
+
+ // The idea here is that:
+ // 1. Opening a mailbox that is not already opened takes a significant amount of time
+ // 2. We don't want to lock the whole HashMap that contain the mailboxes during this
+ // operation which is why we droppped the lock above but take it again below.
+ let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?);
+
+ let mut cache = self.mailboxes.lock().unwrap();
+ if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) {
+ drop(mb); // we worked for nothing but at least we didn't starve someone else
+ Ok(concurrent_mb)
+ } else {
+ cache.insert(id, Arc::downgrade(&mb));
+ Ok(mb)
+ }
+ }
+
+ // ---- Mailbox list management ----
+
+ async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
+ let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
+ let (mut list, row) = match self
+ .storage
+ .row_fetch(&storage::Selector::Single(&row_ref))
+ .await
+ {
+ Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
+ Err(e) => return Err(e.into()),
+ Ok(rv) => {
+ let mut list = MailboxList::new();
+ let (row_ref, row_vals) = match rv.into_iter().next() {
+ Some(row_val) => (row_val.row_ref, row_val.value),
+ None => (row_ref, vec![]),
+ };
+
+ for v in row_vals {
+ if let storage::Alternative::Value(vbytes) = v {
+ let list2 =
+ open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
+ list.merge(list2);
+ }
+ }
+ (list, Some(row_ref))
+ }
+ };
+
+ let is_default_mbx_missing = [DRAFTS, ARCHIVE, SENT, TRASH]
+ .iter()
+ .map(|mbx| list.create_mailbox(mbx))
+ .fold(false, |acc, r| {
+ acc || matches!(r, CreatedMailbox::Created(..))
+ });
+ let is_inbox_missing = self.ensure_inbox_exists(&mut list, &row).await?;
+ if is_default_mbx_missing && !is_inbox_missing {
+ // It's the only case where we created some mailboxes and not saved them
+ // So we save them!
+ self.save_mailbox_list(&list, row.clone()).await?;
+ }
+
+ Ok((list, row))
+ }
+
+ async fn ensure_inbox_exists(
+ &self,
+ list: &mut MailboxList,
+ ct: &Option<storage::RowRef>,
+ ) -> Result<bool> {
+ // If INBOX doesn't exist, create a new mailbox with that name
+ // and save new mailbox list.
+ // Also, ensure that the mpsc::watch that keeps track of the
+ // inbox id is up-to-date.
+ let saved;
+ let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) {
+ CreatedMailbox::Created(i, v) => {
+ self.save_mailbox_list(list, ct.clone()).await?;
+ saved = true;
+ (i, v)
+ }
+ CreatedMailbox::Existed(i, v) => {
+ saved = false;
+ (i, v)
+ }
+ };
+ let inbox_id = Some((inbox_id, inbox_uidvalidity));
+ if *self.tx_inbox_id.borrow() != inbox_id {
+ self.tx_inbox_id.send(inbox_id).unwrap();
+ }
+
+ Ok(saved)
+ }
+
+ async fn save_mailbox_list(
+ &self,
+ list: &MailboxList,
+ ct: Option<storage::RowRef>,
+ ) -> Result<()> {
+ let list_blob = seal_serialize(list, &self.creds.keys.master)?;
+ let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK));
+ let row_val = storage::RowVal::new(rref, list_blob);
+ self.storage.row_insert(vec![row_val]).await?;
+ Ok(())
+ }
+}
+
+// ---- User cache ----
+
+lazy_static! {
+ static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> =
+ std::sync::Mutex::new(HashMap::new());
+}