From 1a43ce5ac7033c148f64a033f2b1d335e95e11d5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 8 Mar 2024 08:17:03 +0100 Subject: WIP refactor --- aero-collections/mail/incoming.rs | 445 +++++++++++++++++++++++++++++ aero-collections/mail/mailbox.rs | 524 ++++++++++++++++++++++++++++++++++ aero-collections/mail/mod.rs | 27 ++ aero-collections/mail/namespace.rs | 209 ++++++++++++++ aero-collections/mail/query.rs | 137 +++++++++ aero-collections/mail/snapshot.rs | 60 ++++ aero-collections/mail/uidindex.rs | 474 ++++++++++++++++++++++++++++++ aero-collections/mail/unique_ident.rs | 101 +++++++ aero-collections/user.rs | 313 ++++++++++++++++++++ 9 files changed, 2290 insertions(+) create mode 100644 aero-collections/mail/incoming.rs create mode 100644 aero-collections/mail/mailbox.rs create mode 100644 aero-collections/mail/mod.rs create mode 100644 aero-collections/mail/namespace.rs create mode 100644 aero-collections/mail/query.rs create mode 100644 aero-collections/mail/snapshot.rs create mode 100644 aero-collections/mail/uidindex.rs create mode 100644 aero-collections/mail/unique_ident.rs create mode 100644 aero-collections/user.rs (limited to 'aero-collections') diff --git a/aero-collections/mail/incoming.rs b/aero-collections/mail/incoming.rs new file mode 100644 index 0000000..e2ad97d --- /dev/null +++ b/aero-collections/mail/incoming.rs @@ -0,0 +1,445 @@ +//use std::collections::HashMap; +use std::convert::TryFrom; + +use std::sync::{Arc, Weak}; +use std::time::Duration; + +use anyhow::{anyhow, bail, Result}; +use base64::Engine; +use futures::{future::BoxFuture, FutureExt}; +//use tokio::io::AsyncReadExt; +use tokio::sync::watch; +use tracing::{debug, error, info, warn}; + +use crate::cryptoblob; +use crate::login::{Credentials, PublicCredentials}; +use crate::mail::mailbox::Mailbox; +use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::*; +use crate::user::User; +use crate::mail::IMF; +use crate::storage; +use crate::timestamp::now_msec; + +const INCOMING_PK: &str = "incoming"; +const INCOMING_LOCK_SK: &str = "lock"; +const INCOMING_WATCH_SK: &str = "watch"; + +const MESSAGE_KEY: &str = "message-key"; + +// When a lock is held, it is held for LOCK_DURATION (here 5 minutes) +// It is renewed every LOCK_DURATION/3 +// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we +// lost the lock. +const LOCK_DURATION: Duration = Duration::from_secs(300); + +// In addition to checking when notified, also check for new mail every 10 minutes +const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600); + +pub async fn incoming_mail_watch_process( + user: Weak, + creds: Credentials, + rx_inbox_id: watch::Receiver>, +) { + if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await { + error!("Error in incoming mail watch process: {}", e); + } +} + +async fn incoming_mail_watch_process_internal( + user: Weak, + creds: Credentials, + mut rx_inbox_id: watch::Receiver>, +) -> Result<()> { + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); + let storage = creds.storage.build().await?; + + let mut inbox: Option> = None; + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + + loop { + let maybe_updated_incoming_key = if *lock_held.borrow() { + debug!("incoming lock held"); + + let wait_new_mail = async { + loop { + match storage.row_poll(&incoming_key).await { + Ok(row_val) => break row_val.row_ref, + Err(e) => { + error!("Error in wait_new_mail: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + }; + + tokio::select! { + inc_k = wait_new_mail => Some(inc_k), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, + } + } else { + debug!("incoming lock not held"); + tokio::select! { + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, + } + }; + + let user = match Weak::upgrade(&user) { + Some(user) => user, + None => { + debug!("User no longer available, exiting incoming loop."); + break; + } + }; + debug!("User still available"); + + // If INBOX no longer is same mailbox, open new mailbox + let inbox_id = *rx_inbox_id.borrow(); + if let Some((id, uidvalidity)) = inbox_id { + if Some(id) != inbox.as_ref().map(|b| b.id) { + match user.open_mailbox_by_id(id, uidvalidity).await { + Ok(mb) => { + inbox = Some(mb); + } + Err(e) => { + inbox = None; + error!("Error when opening inbox ({}): {}", id, e); + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } + } + } + } + + // If we were able to open INBOX, and we have mail, + // fetch new mail + if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { + Ok(()) => { + incoming_key = updated_incoming_key; + } + Err(e) => { + error!("Could not fetch incoming mail: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + } + drop(rx_inbox_id); + Ok(()) +} + +async fn handle_incoming_mail( + user: &Arc, + storage: &storage::Store, + inbox: &Arc, + lock_held: &watch::Receiver, +) -> Result<()> { + let mails_res = storage.blob_list("incoming/").await?; + + for object in mails_res { + if !*lock_held.borrow() { + break; + } + let key = object.0; + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::() { + move_incoming_message(user, storage, inbox, mail_id).await?; + } + } + } + + Ok(()) +} + +async fn move_incoming_message( + user: &Arc, + storage: &storage::Store, + inbox: &Arc, + id: UniqueIdent, +) -> Result<()> { + info!("Moving incoming message: {}", id); + + let object_key = format!("incoming/{}", id); + + // 1. Fetch message from S3 + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; + + // 1.a decrypt message key from headers + //info!("Object metadata: {:?}", get_result.metadata); + let key_encrypted_b64 = object + .meta + .get(MESSAGE_KEY) + .ok_or(anyhow!("Missing key in metadata"))?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; + let message_key = sodiumoxide::crypto::sealedbox::open( + &key_encrypted, + &user.creds.keys.public, + &user.creds.keys.secret, + ) + .map_err(|_| anyhow!("Cannot decrypt message key"))?; + let message_key = + cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; + + // 1.b retrieve message body + let obj_body = object.value; + let plain_mail = cryptoblob::open(&obj_body, &message_key) + .map_err(|_| anyhow!("Cannot decrypt email content"))?; + + // 2 parse mail and add to inbox + let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; + inbox + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) + .await?; + + // 3 delete from incoming + storage.blob_rm(&object.blob_ref).await?; + + Ok(()) +} + +// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- + +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver { + let (held_tx, held_rx) = watch::channel(false); + + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); + + held_rx +} + +#[derive(Clone, Debug)] +enum LockState { + Unknown, + Empty, + Held(UniqueIdent, u64, storage::RowRef), +} + +async fn k2v_lock_loop_internal( + storage: storage::Store, + row_ref: storage::RowRef, + held_tx: watch::Sender, +) { + let (state_tx, mut state_rx) = watch::channel::(LockState::Unknown); + let mut state_rx_2 = state_rx.clone(); + + let our_pid = gen_ident(); + + // Loop 1: watch state of lock in K2V, save that in corresponding watch channel + let watch_lock_loop: BoxFuture> = async { + let mut ct = row_ref.clone(); + loop { + debug!("k2v watch lock loop iter: ct = {:?}", ct); + match storage.row_poll(&ct).await { + Err(e) => { + error!( + "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", + e + ); + state_tx.send(LockState::Unknown)?; + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(cv) => { + let mut lock_state = None; + for v in cv.value.iter() { + if let storage::Alternative::Value(vbytes) = v { + if vbytes.len() == 32 { + let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); + let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); + if lock_state + .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2)) + .unwrap_or(true) + { + lock_state = Some((pid, ts)); + } + } + } + } + let new_ct = cv.row_ref; + + debug!( + "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", + ct, new_ct, lock_state + ); + state_tx.send( + lock_state + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) + .unwrap_or(LockState::Empty), + )?; + ct = new_ct; + } + } + } + } + .boxed(); + + // Loop 2: notify user whether we are holding the lock or not + let lock_notify_loop: BoxFuture> = async { + loop { + let now = now_msec(); + let held_with_expiration_time = match &*state_rx.borrow_and_update() { + LockState::Held(pid, ts, _ct) if *pid == our_pid => { + let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64; + if now < expiration_time { + Some(expiration_time) + } else { + None + } + } + _ => None, + }; + let held = held_with_expiration_time.is_some(); + if held != *held_tx.borrow() { + held_tx.send(held)?; + } + + let await_expired = async { + match held_with_expiration_time { + None => futures::future::pending().await, + Some(expiration_time) => { + tokio::time::sleep(Duration::from_millis(expiration_time - now)).await + } + }; + }; + + tokio::select!( + r = state_rx.changed() => { + r?; + } + _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"), + _ = await_expired => continue, + ); + } + } + .boxed(); + + // Loop 3: acquire lock when relevant + let take_lock_loop: BoxFuture> = async { + loop { + let now = now_msec(); + let state: LockState = state_rx_2.borrow_and_update().clone(); + let (acquire_at, ct) = match state { + LockState::Unknown => { + // If state of the lock is unknown, don't try to acquire + state_rx_2.changed().await?; + continue; + } + LockState::Empty => (now, None), + LockState::Held(pid, ts, ct) => { + if pid == our_pid { + (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct)) + } else { + (ts, Some(ct)) + } + } + }; + + // Wait until it is time to acquire lock + if acquire_at > now { + tokio::select!( + r = state_rx_2.changed() => { + // If lock state changed in the meantime, don't acquire and loop around + r?; + continue; + } + _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => () + ); + } + + // Acquire lock + let mut lock = vec![0u8; 32]; + lock[..8].copy_from_slice(&u64::to_be_bytes( + now_msec() + LOCK_DURATION.as_millis() as u64, + )); + lock[8..].copy_from_slice(&our_pid.0); + let row = match ct { + Some(existing) => existing, + None => row_ref.clone(), + }; + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { + error!("Could not take lock: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + + // Wait for new information to loop back + state_rx_2.changed().await?; + } + } + .boxed(); + + let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop); + + debug!("lock loop exited, releasing"); + + if !held_tx.is_closed() { + warn!("weird..."); + let _ = held_tx.send(false); + } + + // If lock is ours, release it + let release = match &*state_rx.borrow() { + LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()), + _ => None, + }; + if let Some(ct) = release { + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; + } +} + +// ---- LMTP SIDE: storing messages encrypted with user's pubkey ---- + +pub struct EncryptedMessage { + key: cryptoblob::Key, + encrypted_body: Vec, +} + +impl EncryptedMessage { + pub fn new(body: Vec) -> Result { + let key = cryptoblob::gen_key(); + let encrypted_body = cryptoblob::seal(&body, &key)?; + Ok(Self { + key, + encrypted_body, + }) + } + + pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { + let storage = creds.storage.build().await?; + + // Get causality token of previous watch key + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { + Err(_) => query, + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), + }; + + // Write mail to encrypted storage + let encrypted_key = + sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); + + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(blob_val).await?; + + // Update watch key to signal new mail + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); + storage.row_insert(vec![watch_val]).await?; + Ok(()) + } +} diff --git a/aero-collections/mail/mailbox.rs b/aero-collections/mail/mailbox.rs new file mode 100644 index 0000000..d1a5473 --- /dev/null +++ b/aero-collections/mail/mailbox.rs @@ -0,0 +1,524 @@ +use anyhow::{anyhow, bail, Result}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +use crate::bayou::Bayou; +use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; +use crate::login::Credentials; +use crate::mail::uidindex::*; +use crate::mail::unique_ident::*; +use crate::mail::IMF; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; +use crate::timestamp::now_msec; + +pub struct Mailbox { + pub(super) id: UniqueIdent, + mbox: RwLock, +} + +impl Mailbox { + pub(crate) async fn open( + creds: &Credentials, + id: UniqueIdent, + min_uidvalidity: ImapUidvalidity, + ) -> Result { + let index_path = format!("index/{}", id); + let mail_path = format!("mail/{}", id); + + let mut uid_index = Bayou::::new(creds, index_path).await?; + uid_index.sync().await?; + + let uidvalidity = uid_index.state().uidvalidity; + if uidvalidity < min_uidvalidity { + uid_index + .push( + uid_index + .state() + .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()), + ) + .await?; + } + + // @FIXME reporting through opentelemetry or some logs + // info on the "shape" of the mailbox would be welcomed + /* + dump(&uid_index); + */ + + let mbox = RwLock::new(MailboxInternal { + id, + encryption_key: creds.keys.master.clone(), + storage: creds.storage.build().await?, + uid_index, + mail_path, + }); + + Ok(Self { id, mbox }) + } + + /// Sync data with backing store + pub async fn force_sync(&self) -> Result<()> { + self.mbox.write().await.force_sync().await + } + + /// Sync data with backing store only if changes are detected + /// or last sync is too old + pub async fn opportunistic_sync(&self) -> Result<()> { + self.mbox.write().await.opportunistic_sync().await + } + + /// Block until a sync has been done (due to changes in the event log) + pub async fn notify(&self) -> std::sync::Weak { + self.mbox.read().await.notifier() + } + + // ---- Functions for reading the mailbox ---- + + /// Get a clone of the current UID Index of this mailbox + /// (cloning is cheap so don't hesitate to use this) + pub async fn current_uid_index(&self) -> UidIndex { + self.mbox.read().await.uid_index.state().clone() + } + + /// Fetch the metadata (headers + some more info) of the specified + /// mail IDs + pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { + self.mbox.read().await.fetch_meta(ids).await + } + + /// Fetch an entire e-mail + pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { + self.mbox.read().await.fetch_full(id, message_key).await + } + + pub async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { + super::snapshot::FrozenMailbox::new(self.clone()).await + } + + // ---- Functions for changing the mailbox ---- + + /// Add flags to message + pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { + self.mbox.write().await.add_flags(id, flags).await + } + + /// Delete flags from message + pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { + self.mbox.write().await.del_flags(id, flags).await + } + + /// Define the new flags for this message + pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { + self.mbox.write().await.set_flags(id, flags).await + } + + /// Insert an email into the mailbox + pub async fn append<'a>( + &self, + msg: IMF<'a>, + ident: Option, + flags: &[Flag], + ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { + self.mbox.write().await.append(msg, ident, flags).await + } + + /// Insert an email into the mailbox, copying it from an existing S3 object + pub async fn append_from_s3<'a>( + &self, + msg: IMF<'a>, + ident: UniqueIdent, + blob_ref: storage::BlobRef, + message_key: Key, + ) -> Result<()> { + self.mbox + .write() + .await + .append_from_s3(msg, ident, blob_ref, message_key) + .await + } + + /// Delete a message definitively from the mailbox + pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> { + self.mbox.write().await.delete(id).await + } + + /// Copy an email from an other Mailbox to this mailbox + /// (use this when possible, as it allows for a certain number of storage optimizations) + pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result { + if self.id == from.id { + bail!("Cannot copy into same mailbox"); + } + + let (mut selflock, fromlock); + if self.id < from.id { + selflock = self.mbox.write().await; + fromlock = from.mbox.write().await; + } else { + fromlock = from.mbox.write().await; + selflock = self.mbox.write().await; + }; + selflock.copy_from(&fromlock, uuid).await + } + + /// Move an email from an other Mailbox to this mailbox + /// (use this when possible, as it allows for a certain number of storage optimizations) + pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { + if self.id == from.id { + bail!("Cannot copy move same mailbox"); + } + + let (mut selflock, mut fromlock); + if self.id < from.id { + selflock = self.mbox.write().await; + fromlock = from.mbox.write().await; + } else { + fromlock = from.mbox.write().await; + selflock = self.mbox.write().await; + }; + selflock.move_from(&mut fromlock, uuid).await + } +} + +// ---- + +// Non standard but common flags: +// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml +struct MailboxInternal { + // 2023-05-15 will probably be used later. + #[allow(dead_code)] + id: UniqueIdent, + mail_path: String, + encryption_key: Key, + storage: Store, + uid_index: Bayou, +} + +impl MailboxInternal { + async fn force_sync(&mut self) -> Result<()> { + self.uid_index.sync().await?; + Ok(()) + } + + async fn opportunistic_sync(&mut self) -> Result<()> { + self.uid_index.opportunistic_sync().await?; + Ok(()) + } + + fn notifier(&self) -> std::sync::Weak { + self.uid_index.notifier() + } + + // ---- Functions for reading the mailbox ---- + + async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { + let ids = ids.iter().map(|x| x.to_string()).collect::>(); + let ops = ids + .iter() + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) + .collect::>(); + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; + + let mut meta_vec = vec![]; + for res in res_vec.into_iter() { + let mut meta_opt = None; + + // Resolve conflicts + for v in res.value.iter() { + match v { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { + let meta = open_deserialize::(v, &self.encryption_key)?; + match meta_opt.as_mut() { + None => { + meta_opt = Some(meta); + } + Some(prevmeta) => { + prevmeta.try_merge(meta)?; + } + } + } + } + } + if let Some(meta) = meta_opt { + meta_vec.push(meta); + } else { + bail!("No valid meta value in k2v for {:?}", res.row_ref); + } + } + + Ok(meta_vec) + } + + async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) + } + + // ---- Functions for changing the mailbox ---- + + async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { + let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec()); + self.uid_index.push(add_flag_op).await + } + + async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { + let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec()); + self.uid_index.push(del_flag_op).await + } + + async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { + let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec()); + self.uid_index.push(set_flag_op).await + } + + async fn append( + &mut self, + mail: IMF<'_>, + ident: Option, + flags: &[Flag], + ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { + let ident = ident.unwrap_or_else(gen_ident); + let message_key = gen_key(); + + futures::try_join!( + async { + // Encrypt and save mail body + let message_blob = cryptoblob::seal(mail.raw, &message_key)?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.parsed.raw_headers.to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = seal_serialize(&meta, &self.encryption_key)?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; + Ok::<_, anyhow::Error>(()) + }, + self.uid_index.opportunistic_sync() + )?; + + // Add mail to Bayou mail index + let uid_state = self.uid_index.state(); + let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec()); + + let uidvalidity = uid_state.uidvalidity; + let (uid, modseq) = match add_mail_op { + UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq), + _ => unreachable!(), + }; + + self.uid_index.push(add_mail_op).await?; + + Ok((uidvalidity, uid, modseq)) + } + + async fn append_from_s3<'a>( + &mut self, + mail: IMF<'a>, + ident: UniqueIdent, + blob_src: storage::BlobRef, + message_key: Key, + ) -> Result<()> { + futures::try_join!( + async { + // Copy mail body from previous location + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.parsed.raw_headers.to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = seal_serialize(&meta, &self.encryption_key)?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; + Ok::<_, anyhow::Error>(()) + }, + self.uid_index.opportunistic_sync() + )?; + + // Add mail to Bayou mail index + let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]); + self.uid_index.push(add_mail_op).await?; + + Ok(()) + } + + async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { + if !self.uid_index.state().table.contains_key(&ident) { + bail!("Cannot delete mail that doesn't exit"); + } + + let del_mail_op = self.uid_index.state().op_mail_del(ident); + self.uid_index.push(del_mail_op).await?; + + futures::try_join!( + async { + // Delete mail body from S3 + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Delete mail meta from K2V + let sk = ident.to_string(); + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) + .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; + } + Ok::<_, anyhow::Error>(()) + } + )?; + Ok(()) + } + + async fn copy_from( + &mut self, + from: &MailboxInternal, + source_id: UniqueIdent, + ) -> Result { + let new_id = gen_ident(); + self.copy_internal(from, source_id, new_id).await?; + Ok(new_id) + } + + async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> { + self.copy_internal(from, id, id).await?; + from.delete(id).await?; + Ok(()) + } + + async fn copy_internal( + &mut self, + from: &MailboxInternal, + source_id: UniqueIdent, + new_id: UniqueIdent, + ) -> Result<()> { + if self.encryption_key != from.encryption_key { + bail!("Message to be copied/moved does not belong to same account."); + } + + let flags = from + .uid_index + .state() + .table + .get(&source_id) + .ok_or(anyhow!("Source mail not found"))? + .2 + .clone(); + + futures::try_join!( + async { + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Copy mail meta in K2V + let meta = &from.fetch_meta(&[source_id]).await?[0]; + let meta_blob = seal_serialize(meta, &self.encryption_key)?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) + .await?; + Ok::<_, anyhow::Error>(()) + }, + self.uid_index.opportunistic_sync(), + )?; + + // Add mail to Bayou mail index + let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags); + self.uid_index.push(add_mail_op).await?; + + Ok(()) + } +} + +// Can be useful to debug so we want this code +// to be available to developers +#[allow(dead_code)] +fn dump(uid_index: &Bayou) { + let s = uid_index.state(); + println!("---- MAILBOX STATE ----"); + println!("UIDVALIDITY {}", s.uidvalidity); + println!("UIDNEXT {}", s.uidnext); + println!("INTERNALSEQ {}", s.internalseq); + for (uid, ident) in s.idx_by_uid.iter() { + println!( + "{} {} {}", + uid, + hex::encode(ident.0), + s.table.get(ident).cloned().unwrap().2.join(", ") + ); + } + println!(); +} + +// ---- + +/// The metadata of a message that is stored in K2V +/// at pk = mail/, sk = +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MailMeta { + /// INTERNALDATE field (milliseconds since epoch) + pub internaldate: u64, + /// Headers of the message + pub headers: Vec, + /// Secret key for decrypting entire message + pub message_key: Key, + /// RFC822 size + pub rfc822_size: usize, +} + +impl MailMeta { + fn try_merge(&mut self, other: Self) -> Result<()> { + if self.headers != other.headers + || self.message_key != other.message_key + || self.rfc822_size != other.rfc822_size + { + bail!("Conflicting MailMeta values."); + } + self.internaldate = std::cmp::max(self.internaldate, other.internaldate); + Ok(()) + } +} diff --git a/aero-collections/mail/mod.rs b/aero-collections/mail/mod.rs new file mode 100644 index 0000000..03e85cd --- /dev/null +++ b/aero-collections/mail/mod.rs @@ -0,0 +1,27 @@ +use std::convert::TryFrom; + +pub mod incoming; +pub mod mailbox; +pub mod query; +pub mod snapshot; +pub mod uidindex; +pub mod unique_ident; +pub mod namespace; + +// Internet Message Format +// aka RFC 822 - RFC 2822 - RFC 5322 +// 2023-05-15 don't want to refactor this struct now. +#[allow(clippy::upper_case_acronyms)] +pub struct IMF<'a> { + raw: &'a [u8], + parsed: eml_codec::part::composite::Message<'a>, +} + +impl<'a> TryFrom<&'a [u8]> for IMF<'a> { + type Error = (); + + fn try_from(body: &'a [u8]) -> Result, ()> { + let parsed = eml_codec::parse_message(body).or(Err(()))?.1; + Ok(Self { raw: body, parsed }) + } +} diff --git a/aero-collections/mail/namespace.rs b/aero-collections/mail/namespace.rs new file mode 100644 index 0000000..5e67173 --- /dev/null +++ b/aero-collections/mail/namespace.rs @@ -0,0 +1,209 @@ +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Weak}; + +use anyhow::{anyhow, bail, Result}; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::cryptoblob::{open_deserialize, seal_serialize}; +use crate::login::Credentials; +use crate::mail::incoming::incoming_mail_watch_process; +use crate::mail::mailbox::Mailbox; +use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::{gen_ident, UniqueIdent}; +use crate::storage; +use crate::timestamp::now_msec; + +pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; + +/// INBOX is the only mailbox that must always exist. +/// It is created automatically when the account is created. +/// IMAP allows the user to rename INBOX to something else, +/// in this case all messages from INBOX are moved to a mailbox +/// with the new name and the INBOX mailbox still exists and is empty. +/// In our implementation, we indeed move the underlying mailbox +/// to the new name (i.e. the new name has the same id as the previous +/// INBOX), and we create a new empty mailbox for INBOX. +pub const INBOX: &str = "INBOX"; + +/// For convenience purpose, we also create some special mailbox +/// that are described in RFC6154 SPECIAL-USE +/// @FIXME maybe it should be a configuration parameter +/// @FIXME maybe we should have a per-mailbox flag mechanism, either an enum or a string, so we +/// track which mailbox is used for what. +/// @FIXME Junk could be useful but we don't have any antispam solution yet so... +/// @FIXME IMAP supports virtual mailbox. \All or \Flagged are intended to be virtual mailboxes. +/// \Trash might be one, or not one. I don't know what we should do there. +pub const DRAFTS: &str = "Drafts"; +pub const ARCHIVE: &str = "Archive"; +pub const SENT: &str = "Sent"; +pub const TRASH: &str = "Trash"; + +pub(crate) const MAILBOX_LIST_PK: &str = "mailboxes"; +pub(crate) const MAILBOX_LIST_SK: &str = "list"; + +// ---- User's mailbox list (serialized in K2V) ---- + +#[derive(Serialize, Deserialize)] +pub(crate) struct MailboxList(BTreeMap); + +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +pub(crate) struct MailboxListEntry { + id_lww: (u64, Option), + uidvalidity: ImapUidvalidity, +} + +impl MailboxListEntry { + fn merge(&mut self, other: &Self) { + // Simple CRDT merge rule + if other.id_lww.0 > self.id_lww.0 + || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1) + { + self.id_lww = other.id_lww; + } + self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity); + } +} + +impl MailboxList { + pub(crate) fn new() -> Self { + Self(BTreeMap::new()) + } + + pub(crate) fn merge(&mut self, list2: Self) { + for (k, v) in list2.0.into_iter() { + if let Some(e) = self.0.get_mut(&k) { + e.merge(&v); + } else { + self.0.insert(k, v); + } + } + } + + pub(crate) fn existing_mailbox_names(&self) -> Vec { + self.0 + .iter() + .filter(|(_, v)| v.id_lww.1.is_some()) + .map(|(k, _)| k.to_string()) + .collect() + } + + pub(crate) fn has_mailbox(&self, name: &str) -> bool { + matches!( + self.0.get(name), + Some(MailboxListEntry { + id_lww: (_, Some(_)), + .. + }) + ) + } + + pub(crate) fn get_mailbox(&self, name: &str) -> Option<(ImapUidvalidity, Option)> { + self.0.get(name).map( + |MailboxListEntry { + id_lww: (_, mailbox_id), + uidvalidity, + }| (*uidvalidity, *mailbox_id), + ) + } + + /// Ensures mailbox `name` maps to id `id`. + /// If it already mapped to that, returns None. + /// If a change had to be done, returns Some(new uidvalidity in mailbox). + pub(crate) fn set_mailbox(&mut self, name: &str, id: Option) -> Option { + let (ts, id, uidvalidity) = match self.0.get_mut(name) { + None => { + if id.is_none() { + return None; + } else { + (now_msec(), id, ImapUidvalidity::new(1).unwrap()) + } + } + Some(MailboxListEntry { + id_lww, + uidvalidity, + }) => { + if id_lww.1 == id { + return None; + } else { + ( + std::cmp::max(id_lww.0 + 1, now_msec()), + id, + ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(), + ) + } + } + }; + + self.0.insert( + name.into(), + MailboxListEntry { + id_lww: (ts, id), + uidvalidity, + }, + ); + Some(uidvalidity) + } + + pub(crate) fn update_uidvalidity(&mut self, name: &str, new_uidvalidity: ImapUidvalidity) { + match self.0.get_mut(name) { + None => { + self.0.insert( + name.into(), + MailboxListEntry { + id_lww: (now_msec(), None), + uidvalidity: new_uidvalidity, + }, + ); + } + Some(MailboxListEntry { uidvalidity, .. }) => { + *uidvalidity = std::cmp::max(*uidvalidity, new_uidvalidity); + } + } + } + + pub(crate) fn create_mailbox(&mut self, name: &str) -> CreatedMailbox { + if let Some(MailboxListEntry { + id_lww: (_, Some(id)), + uidvalidity, + }) = self.0.get(name) + { + return CreatedMailbox::Existed(*id, *uidvalidity); + } + + let id = gen_ident(); + let uidvalidity = self.set_mailbox(name, Some(id)).unwrap(); + CreatedMailbox::Created(id, uidvalidity) + } + + pub(crate) fn rename_mailbox(&mut self, old_name: &str, new_name: &str) -> Result<()> { + if let Some((uidvalidity, Some(mbid))) = self.get_mailbox(old_name) { + if self.has_mailbox(new_name) { + bail!( + "Cannot rename {} into {}: {} already exists", + old_name, + new_name, + new_name + ); + } + + self.set_mailbox(old_name, None); + self.set_mailbox(new_name, Some(mbid)); + self.update_uidvalidity(new_name, uidvalidity); + Ok(()) + } else { + bail!( + "Cannot rename {} into {}: {} doesn't exist", + old_name, + new_name, + old_name + ); + } + } +} + +pub(crate) enum CreatedMailbox { + Created(UniqueIdent, ImapUidvalidity), + Existed(UniqueIdent, ImapUidvalidity), +} diff --git a/aero-collections/mail/query.rs b/aero-collections/mail/query.rs new file mode 100644 index 0000000..3e6fe99 --- /dev/null +++ b/aero-collections/mail/query.rs @@ -0,0 +1,137 @@ +use super::mailbox::MailMeta; +use super::snapshot::FrozenMailbox; +use super::unique_ident::UniqueIdent; +use anyhow::Result; +use futures::future::FutureExt; +use futures::stream::{BoxStream, Stream, StreamExt}; + +/// Query is in charge of fetching efficiently +/// requested data for a list of emails +pub struct Query<'a, 'b> { + pub frozen: &'a FrozenMailbox, + pub emails: &'b [UniqueIdent], + pub scope: QueryScope, +} + +#[derive(Debug)] +pub enum QueryScope { + Index, + Partial, + Full, +} +impl QueryScope { + pub fn union(&self, other: &QueryScope) -> QueryScope { + match (self, other) { + (QueryScope::Full, _) | (_, QueryScope::Full) => QueryScope::Full, + (QueryScope::Partial, _) | (_, QueryScope::Partial) => QueryScope::Partial, + (QueryScope::Index, QueryScope::Index) => QueryScope::Index, + } + } +} + +//type QueryResultStream = Box>>; + +impl<'a, 'b> Query<'a, 'b> { + pub fn fetch(&self) -> BoxStream> { + match self.scope { + QueryScope::Index => Box::pin( + futures::stream::iter(self.emails) + .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), + ), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), + } + } + + // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { + async move { + let maybe_meta_list: Result> = + self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| { + meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + }) + .unwrap_or_else(|e| vec![Err(e)]); + + futures::stream::iter(list_res) + } + .flatten_stream() + } + + fn full<'d>(&'d self) -> impl Stream> + 'd + Send { + self.partial().then(move |maybe_meta| async move { + let meta = maybe_meta?; + + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) + } +} + +#[derive(Debug, Clone)] +pub enum QueryResult { + IndexResult { + uuid: UniqueIdent, + }, + PartialResult { + uuid: UniqueIdent, + metadata: MailMeta, + }, + FullResult { + uuid: UniqueIdent, + metadata: MailMeta, + content: Vec, + }, +} +impl QueryResult { + pub fn uuid(&self) -> &UniqueIdent { + match self { + Self::IndexResult { uuid, .. } => uuid, + Self::PartialResult { uuid, .. } => uuid, + Self::FullResult { uuid, .. } => uuid, + } + } + + pub fn metadata(&self) -> Option<&MailMeta> { + match self { + Self::IndexResult { .. } => None, + Self::PartialResult { metadata, .. } => Some(metadata), + Self::FullResult { metadata, .. } => Some(metadata), + } + } + + #[allow(dead_code)] + pub fn content(&self) -> Option<&[u8]> { + match self { + Self::FullResult { content, .. } => Some(content), + _ => None, + } + } + + fn into_full(self, content: Vec) -> Option { + match self { + Self::PartialResult { uuid, metadata } => Some(Self::FullResult { + uuid, + metadata, + content, + }), + _ => None, + } + } +} diff --git a/aero-collections/mail/snapshot.rs b/aero-collections/mail/snapshot.rs new file mode 100644 index 0000000..ed756b5 --- /dev/null +++ b/aero-collections/mail/snapshot.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use anyhow::Result; + +use super::mailbox::Mailbox; +use super::query::{Query, QueryScope}; +use super::uidindex::UidIndex; +use super::unique_ident::UniqueIdent; + +/// A Frozen Mailbox has a snapshot of the current mailbox +/// state that is desynchronized with the real mailbox state. +/// It's up to the user to choose when their snapshot must be updated +/// to give useful information to their clients +pub struct FrozenMailbox { + pub mailbox: Arc, + pub snapshot: UidIndex, +} + +impl FrozenMailbox { + /// Create a snapshot from a mailbox, the mailbox + the snapshot + /// becomes the "Frozen Mailbox". + pub async fn new(mailbox: Arc) -> Self { + let state = mailbox.current_uid_index().await; + + Self { + mailbox, + snapshot: state, + } + } + + /// Force the synchronization of the inner mailbox + /// but do not update the local snapshot + pub async fn sync(&self) -> Result<()> { + self.mailbox.opportunistic_sync().await + } + + /// Peek snapshot without updating the frozen mailbox + /// Can be useful if you want to plan some writes + /// while sending a diff to the client later + pub async fn peek(&self) -> UidIndex { + self.mailbox.current_uid_index().await + } + + /// Update the FrozenMailbox local snapshot. + /// Returns the old snapshot, so you can build a diff + pub async fn update(&mut self) -> UidIndex { + let old_snapshot = self.snapshot.clone(); + self.snapshot = self.mailbox.current_uid_index().await; + + old_snapshot + } + + pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { + Query { + frozen: self, + emails: uuids, + scope, + } + } +} diff --git a/aero-collections/mail/uidindex.rs b/aero-collections/mail/uidindex.rs new file mode 100644 index 0000000..5a06670 --- /dev/null +++ b/aero-collections/mail/uidindex.rs @@ -0,0 +1,474 @@ +use std::num::{NonZeroU32, NonZeroU64}; + +use im::{HashMap, OrdMap, OrdSet}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::bayou::*; +use crate::mail::unique_ident::UniqueIdent; + +pub type ModSeq = NonZeroU64; +pub type ImapUid = NonZeroU32; +pub type ImapUidvalidity = NonZeroU32; +pub type Flag = String; +pub type IndexEntry = (ImapUid, ModSeq, Vec); + +/// A UidIndex handles the mutable part of a mailbox +/// It is built by running the event log on it +/// Each applied log generates a new UidIndex by cloning the previous one +/// and applying the event. This is why we use immutable datastructures: +/// they are cheap to clone. +#[derive(Clone)] +pub struct UidIndex { + // Source of trust + pub table: OrdMap, + + // Indexes optimized for queries + pub idx_by_uid: OrdMap, + pub idx_by_modseq: OrdMap, + pub idx_by_flag: FlagIndex, + + // "Public" Counters + pub uidvalidity: ImapUidvalidity, + pub uidnext: ImapUid, + pub highestmodseq: ModSeq, + + // "Internal" Counters + pub internalseq: ImapUid, + pub internalmodseq: ModSeq, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum UidIndexOp { + MailAdd(UniqueIdent, ImapUid, ModSeq, Vec), + MailDel(UniqueIdent), + FlagAdd(UniqueIdent, ModSeq, Vec), + FlagDel(UniqueIdent, ModSeq, Vec), + FlagSet(UniqueIdent, ModSeq, Vec), + BumpUidvalidity(u32), +} + +impl UidIndex { + #[must_use] + pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp { + UidIndexOp::MailDel(ident) + } + + #[must_use] + pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagAdd(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagDel(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagSet(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp { + UidIndexOp::BumpUidvalidity(count) + } + + // INTERNAL functions to keep state consistent + + fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) { + // Insert the email in our table + self.table.insert(ident, (uid, modseq, flags.to_owned())); + + // Update the indexes/caches + self.idx_by_uid.insert(uid, ident); + self.idx_by_flag.insert(uid, flags); + self.idx_by_modseq.insert(modseq, ident); + } + + fn unreg_email(&mut self, ident: &UniqueIdent) { + // We do nothing if the mail does not exist + let (uid, modseq, flags) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + + // Delete all cache entries + self.idx_by_uid.remove(uid); + self.idx_by_flag.remove(*uid, flags); + self.idx_by_modseq.remove(modseq); + + // Remove from source of trust + self.table.remove(ident); + } +} + +impl Default for UidIndex { + fn default() -> Self { + Self { + table: OrdMap::new(), + + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + + uidvalidity: NonZeroU32::new(1).unwrap(), + uidnext: NonZeroU32::new(1).unwrap(), + highestmodseq: NonZeroU64::new(1).unwrap(), + + internalseq: NonZeroU32::new(1).unwrap(), + internalmodseq: NonZeroU64::new(1).unwrap(), + } + } +} + +impl BayouState for UidIndex { + type Op = UidIndexOp; + + fn apply(&self, op: &UidIndexOp) -> Self { + let mut new = self.clone(); + match op { + UidIndexOp::MailAdd(ident, uid, modseq, flags) => { + // Change UIDValidity if there is a UID conflict or a MODSEQ conflict + // @FIXME Need to prove that summing work + // The intuition: we increase the UIDValidity by the number of possible conflicts + if *uid < new.internalseq || *modseq < new.internalmodseq { + let bump_uid = new.internalseq.get() - uid.get(); + let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap(); + } + + // Assign the real uid of the email + let new_uid = new.internalseq; + + // Assign the real modseq of the email and its new flags + let new_modseq = new.internalmodseq; + + // Delete the previous entry if any. + // Our proof has no assumption on `ident` uniqueness, + // so we must handle this case even it is very unlikely + // In this case, we overwrite the email. + // Note: assigning a new UID is mandatory. + new.unreg_email(ident); + + // We record our email and update ou caches + new.reg_email(*ident, new_uid, new_modseq, flags); + + // Update counters + new.highestmodseq = new.internalmodseq; + + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + + new.uidnext = new.internalseq; + } + UidIndexOp::MailDel(ident) => { + // If the email is known locally, we remove its references in all our indexes + new.unreg_email(ident); + + // We update the counter + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + } + UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Add flags to the source of trust and the cache + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + new.idx_by_flag.insert(*uid, &to_add); + *email_modseq = new.internalmodseq; + new.idx_by_modseq.insert(new.internalmodseq, *ident); + existing_flags.append(&mut to_add); + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Remove flags from the source of trust and the cache + existing_flags.retain(|x| !rm_flags.contains(x)); + new.idx_by_flag.remove(*uid, rm_flags); + + // Register that email has been modified + new.idx_by_modseq.insert(new.internalmodseq, *ident); + *email_modseq = new.internalmodseq; + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Remove flags from the source of trust and the cache + let (keep_flags, rm_flags): (Vec, Vec) = existing_flags + .iter() + .cloned() + .partition(|x| new_flags.contains(x)); + *existing_flags = keep_flags; + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + existing_flags.append(&mut to_add); + new.idx_by_flag.remove(*uid, &rm_flags); + new.idx_by_flag.insert(*uid, &to_add); + + // Register that email has been modified + new.idx_by_modseq.insert(new.internalmodseq, *ident); + *email_modseq = new.internalmodseq; + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::BumpUidvalidity(count) => { + new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count) + .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); + } + } + new + } +} + +// ---- FlagIndex implementation ---- + +#[derive(Clone)] +pub struct FlagIndex(HashMap>); +pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>; + +impl FlagIndex { + fn new() -> Self { + Self(HashMap::new()) + } + fn insert(&mut self, uid: ImapUid, flags: &[Flag]) { + flags.iter().for_each(|flag| { + self.0 + .entry(flag.clone()) + .or_insert(OrdSet::new()) + .insert(uid); + }); + } + fn remove(&mut self, uid: ImapUid, flags: &[Flag]) { + for flag in flags.iter() { + if let Some(set) = self.0.get_mut(flag) { + set.remove(&uid); + if set.is_empty() { + self.0.remove(flag); + } + } + } + } + + pub fn get(&self, f: &Flag) -> Option<&OrdSet> { + self.0.get(f) + } + + pub fn flags(&self) -> FlagIter { + self.0.keys() + } +} + +// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- + +#[derive(Serialize, Deserialize)] +struct UidIndexSerializedRepr { + mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec)>, + + uidvalidity: ImapUidvalidity, + uidnext: ImapUid, + highestmodseq: ModSeq, + + internalseq: ImapUid, + internalmodseq: ModSeq, +} + +impl<'de> Deserialize<'de> for UidIndex { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; + + let mut uidindex = UidIndex { + table: OrdMap::new(), + + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + + uidvalidity: val.uidvalidity, + uidnext: val.uidnext, + highestmodseq: val.highestmodseq, + + internalseq: val.internalseq, + internalmodseq: val.internalmodseq, + }; + + val.mails + .iter() + .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags)); + + Ok(uidindex) + } +} + +impl Serialize for UidIndex { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut mails = vec![]; + for (ident, (uid, modseq, flags)) in self.table.iter() { + mails.push((*uid, *modseq, *ident, flags.clone())); + } + + let val = UidIndexSerializedRepr { + mails, + uidvalidity: self.uidvalidity, + uidnext: self.uidnext, + highestmodseq: self.highestmodseq, + internalseq: self.internalseq, + internalmodseq: self.internalmodseq, + }; + + val.serialize(serializer) + } +} + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uidindex() { + let mut state = UidIndex::default(); + + // Add message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + // Early checks + assert_eq!(state.table.len(), 1); + let (uid, modseq, flags) = state.table.get(&m).unwrap(); + assert_eq!(*uid, NonZeroU32::new(1).unwrap()); + assert_eq!(*modseq, NonZeroU64::new(1).unwrap()); + assert_eq!(flags.len(), 2); + let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); + assert_eq!(&m, ident); + let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); + assert_eq!(recent.len(), 1); + assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap()); + assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap()); + assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap()); + } + + // Add message 2 + { + let m = UniqueIdent([0x02; 24]); + let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Add flags to message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["Important".to_string(), "$cl_1".to_string()]; + let ev = state.op_flag_add(m, f); + state = state.apply(&ev); + } + + // Delete flags from message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["\\Recent".to_string()]; + let ev = state.op_flag_del(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Delete message 2 + { + let m = UniqueIdent([0x02; 24]); + let ev = state.op_mail_del(m); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 1); + } + + // Add a message 3 concurrent to message 1 (trigger a uid validity change) + { + let m = UniqueIdent([0x03; 24]); + let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; + let ev = UidIndexOp::MailAdd( + m, + NonZeroU32::new(1).unwrap(), + NonZeroU64::new(1).unwrap(), + f, + ); + state = state.apply(&ev); + } + + // Checks + { + assert_eq!(state.table.len(), 2); + assert!(state.uidvalidity > NonZeroU32::new(1).unwrap()); + + let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); + assert_eq!(ident, &UniqueIdent([0x03; 24])); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + let mut iter = archive.iter(); + assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap()); + assert_eq!(iter.next().unwrap(), last_uid); + } + } +} diff --git a/aero-collections/mail/unique_ident.rs b/aero-collections/mail/unique_ident.rs new file mode 100644 index 0000000..0e629db --- /dev/null +++ b/aero-collections/mail/unique_ident.rs @@ -0,0 +1,101 @@ +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; + +use lazy_static::lazy_static; +use rand::prelude::*; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use crate::timestamp::now_msec; + +/// An internal Mail Identifier is composed of two components: +/// - a process identifier, 128 bits, itself composed of: +/// - the timestamp of when the process started, 64 bits +/// - a 64-bit random number +/// - a sequence number, 64 bits +/// They are not part of the protocol but an internal representation +/// required by Aerogramme. +/// Their main property is to be unique without having to rely +/// on synchronization between IMAP processes. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] +pub struct UniqueIdent(pub [u8; 24]); + +struct IdentGenerator { + pid: u128, + sn: AtomicU64, +} + +impl IdentGenerator { + fn new() -> Self { + let time = now_msec() as u128; + let rand = thread_rng().gen::() as u128; + Self { + pid: (time << 64) | rand, + sn: AtomicU64::new(0), + } + } + + fn gen(&self) -> UniqueIdent { + let sn = self.sn.fetch_add(1, Ordering::Relaxed); + let mut res = [0u8; 24]; + res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); + res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); + UniqueIdent(res) + } +} + +lazy_static! { + static ref GENERATOR: IdentGenerator = IdentGenerator::new(); +} + +pub fn gen_ident() -> UniqueIdent { + GENERATOR.gen() +} + +// -- serde -- + +impl<'de> Deserialize<'de> for UniqueIdent { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + UniqueIdent::from_str(&v).map_err(D::Error::custom) + } +} + +impl Serialize for UniqueIdent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl std::fmt::Display for UniqueIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +impl std::fmt::Debug for UniqueIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +impl FromStr for UniqueIdent { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + + if bytes.len() != 24 { + return Err("bad length"); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(UniqueIdent(tmp)) + } +} diff --git a/aero-collections/user.rs b/aero-collections/user.rs new file mode 100644 index 0000000..a38b9c1 --- /dev/null +++ b/aero-collections/user.rs @@ -0,0 +1,313 @@ +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Weak}; + +use anyhow::{anyhow, bail, Result}; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::cryptoblob::{open_deserialize, seal_serialize}; +use crate::login::Credentials; +use crate::mail::incoming::incoming_mail_watch_process; +use crate::mail::mailbox::Mailbox; +use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::{gen_ident, UniqueIdent}; +use crate::storage; +use crate::timestamp::now_msec; + +use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; + +//@FIXME User should be totally rewriten +//to extract the local mailbox list +//to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace) + +pub struct User { + pub username: String, + pub creds: Credentials, + pub storage: storage::Store, + pub mailboxes: std::sync::Mutex>>, + + tx_inbox_id: watch::Sender>, +} + +impl User { + pub async fn new(username: String, creds: Credentials) -> Result> { + let cache_key = (username.clone(), creds.storage.unique()); + + { + let cache = USER_CACHE.lock().unwrap(); + if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) { + return Ok(u); + } + } + + let user = Self::open(username, creds).await?; + + let mut cache = USER_CACHE.lock().unwrap(); + if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) { + drop(user); + Ok(concurrent_user) + } else { + cache.insert(cache_key, Arc::downgrade(&user)); + Ok(user) + } + } + + /// Lists user's available mailboxes + pub async fn list_mailboxes(&self) -> Result> { + let (list, _ct) = self.load_mailbox_list().await?; + Ok(list.existing_mailbox_names()) + } + + /// Opens an existing mailbox given its IMAP name. + pub async fn open_mailbox(&self, name: &str) -> Result>> { + let (mut list, ct) = self.load_mailbox_list().await?; + + //@FIXME it could be a trace or an opentelemtry trace thing. + // Be careful to not leak sensible data + /* + eprintln!("List of mailboxes:"); + for ent in list.0.iter() { + eprintln!(" - {:?}", ent); + } + */ + + if let Some((uidvalidity, Some(mbid))) = list.get_mailbox(name) { + let mb = self.open_mailbox_by_id(mbid, uidvalidity).await?; + let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; + if mb_uidvalidity > uidvalidity { + list.update_uidvalidity(name, mb_uidvalidity); + self.save_mailbox_list(&list, ct).await?; + } + Ok(Some(mb)) + } else { + Ok(None) + } + } + + /// Check whether mailbox exists + pub async fn has_mailbox(&self, name: &str) -> Result { + let (list, _ct) = self.load_mailbox_list().await?; + Ok(list.has_mailbox(name)) + } + + /// Creates a new mailbox in the user's IMAP namespace. + pub async fn create_mailbox(&self, name: &str) -> Result<()> { + if name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { + bail!("Invalid mailbox name: {}", name); + } + + let (mut list, ct) = self.load_mailbox_list().await?; + match list.create_mailbox(name) { + CreatedMailbox::Created(_, _) => { + self.save_mailbox_list(&list, ct).await?; + Ok(()) + } + CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), + } + } + + /// Deletes a mailbox in the user's IMAP namespace. + pub async fn delete_mailbox(&self, name: &str) -> Result<()> { + if name == INBOX { + bail!("Cannot delete INBOX"); + } + + let (mut list, ct) = self.load_mailbox_list().await?; + if list.has_mailbox(name) { + //@TODO: actually delete mailbox contents + list.set_mailbox(name, None); + self.save_mailbox_list(&list, ct).await?; + Ok(()) + } else { + bail!("Mailbox {} does not exist", name); + } + } + + /// Renames a mailbox in the user's IMAP namespace. + pub async fn rename_mailbox(&self, old_name: &str, new_name: &str) -> Result<()> { + let (mut list, ct) = self.load_mailbox_list().await?; + + if old_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { + bail!("Invalid mailbox name: {}", old_name); + } + if new_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { + bail!("Invalid mailbox name: {}", new_name); + } + + if old_name == INBOX { + list.rename_mailbox(old_name, new_name)?; + if !self.ensure_inbox_exists(&mut list, &ct).await? { + self.save_mailbox_list(&list, ct).await?; + } + } else { + let names = list.existing_mailbox_names(); + + let old_name_w_delim = format!("{}{}", old_name, MAILBOX_HIERARCHY_DELIMITER); + let new_name_w_delim = format!("{}{}", new_name, MAILBOX_HIERARCHY_DELIMITER); + + if names + .iter() + .any(|x| x == new_name || x.starts_with(&new_name_w_delim)) + { + bail!("Mailbox {} already exists", new_name); + } + + for name in names.iter() { + if name == old_name { + list.rename_mailbox(name, new_name)?; + } else if let Some(tail) = name.strip_prefix(&old_name_w_delim) { + let nnew = format!("{}{}", new_name_w_delim, tail); + list.rename_mailbox(name, &nnew)?; + } + } + + self.save_mailbox_list(&list, ct).await?; + } + Ok(()) + } + + // ---- Internal user & mailbox management ---- + + async fn open(username: String, creds: Credentials) -> Result> { + let storage = creds.storage.build().await?; + + let (tx_inbox_id, rx_inbox_id) = watch::channel(None); + + let user = Arc::new(Self { + username, + creds: creds.clone(), + storage, + tx_inbox_id, + mailboxes: std::sync::Mutex::new(HashMap::new()), + }); + + // Ensure INBOX exists (done inside load_mailbox_list) + user.load_mailbox_list().await?; + + tokio::spawn(incoming_mail_watch_process( + Arc::downgrade(&user), + user.creds.clone(), + rx_inbox_id, + )); + + Ok(user) + } + + pub(super) async fn open_mailbox_by_id( + &self, + id: UniqueIdent, + min_uidvalidity: ImapUidvalidity, + ) -> Result> { + { + let cache = self.mailboxes.lock().unwrap(); + if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) { + return Ok(mb); + } + } + + let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?); + + let mut cache = self.mailboxes.lock().unwrap(); + if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) { + drop(mb); // we worked for nothing but at least we didn't starve someone else + Ok(concurrent_mb) + } else { + cache.insert(id, Arc::downgrade(&mb)); + Ok(mb) + } + } + + // ---- Mailbox list management ---- + + async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { + Err(storage::StorageError::NotFound) => (MailboxList::new(), None), + Err(e) => return Err(e.into()), + Ok(rv) => { + let mut list = MailboxList::new(); + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { + if let storage::Alternative::Value(vbytes) = v { + let list2 = + open_deserialize::(&vbytes, &self.creds.keys.master)?; + list.merge(list2); + } + } + (list, Some(row_ref)) + } + }; + + let is_default_mbx_missing = [DRAFTS, ARCHIVE, SENT, TRASH] + .iter() + .map(|mbx| list.create_mailbox(mbx)) + .fold(false, |acc, r| { + acc || matches!(r, CreatedMailbox::Created(..)) + }); + let is_inbox_missing = self.ensure_inbox_exists(&mut list, &row).await?; + if is_default_mbx_missing && !is_inbox_missing { + // It's the only case where we created some mailboxes and not saved them + // So we save them! + self.save_mailbox_list(&list, row.clone()).await?; + } + + Ok((list, row)) + } + + async fn ensure_inbox_exists( + &self, + list: &mut MailboxList, + ct: &Option, + ) -> Result { + // If INBOX doesn't exist, create a new mailbox with that name + // and save new mailbox list. + // Also, ensure that the mpsc::watch that keeps track of the + // inbox id is up-to-date. + let saved; + let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { + CreatedMailbox::Created(i, v) => { + self.save_mailbox_list(list, ct.clone()).await?; + saved = true; + (i, v) + } + CreatedMailbox::Existed(i, v) => { + saved = false; + (i, v) + } + }; + let inbox_id = Some((inbox_id, inbox_uidvalidity)); + if *self.tx_inbox_id.borrow() != inbox_id { + self.tx_inbox_id.send(inbox_id).unwrap(); + } + + Ok(saved) + } + + async fn save_mailbox_list( + &self, + list: &MailboxList, + ct: Option, + ) -> Result<()> { + let list_blob = seal_serialize(list, &self.creds.keys.master)?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; + Ok(()) + } +} + +// ---- User cache ---- + +lazy_static! { + static ref USER_CACHE: std::sync::Mutex>> = + std::sync::Mutex::new(HashMap::new()); +} -- cgit v1.2.3 From 1edf0b15ecaa73d55bb72c6f3c6e25d4f231f322 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 8 Mar 2024 08:43:28 +0100 Subject: Re-enable collections --- aero-collections/Cargo.toml | 24 ++ aero-collections/mail/incoming.rs | 445 ------------------------- aero-collections/mail/mailbox.rs | 524 ----------------------------- aero-collections/mail/mod.rs | 27 -- aero-collections/mail/namespace.rs | 209 ------------ aero-collections/mail/query.rs | 137 -------- aero-collections/mail/snapshot.rs | 60 ---- aero-collections/mail/uidindex.rs | 474 --------------------------- aero-collections/mail/unique_ident.rs | 101 ------ aero-collections/src/calendar/mod.rs | 1 + aero-collections/src/lib.rs | 3 + aero-collections/src/mail/incoming.rs | 443 +++++++++++++++++++++++++ aero-collections/src/mail/mailbox.rs | 525 ++++++++++++++++++++++++++++++ aero-collections/src/mail/mod.rs | 25 ++ aero-collections/src/mail/namespace.rs | 202 ++++++++++++ aero-collections/src/mail/query.rs | 137 ++++++++ aero-collections/src/mail/snapshot.rs | 60 ++++ aero-collections/src/mail/uidindex.rs | 474 +++++++++++++++++++++++++++ aero-collections/src/mail/unique_ident.rs | 101 ++++++ aero-collections/src/user.rs | 311 ++++++++++++++++++ aero-collections/user.rs | 313 ------------------ 21 files changed, 2306 insertions(+), 2290 deletions(-) create mode 100644 aero-collections/Cargo.toml delete mode 100644 aero-collections/mail/incoming.rs delete mode 100644 aero-collections/mail/mailbox.rs delete mode 100644 aero-collections/mail/mod.rs delete mode 100644 aero-collections/mail/namespace.rs delete mode 100644 aero-collections/mail/query.rs delete mode 100644 aero-collections/mail/snapshot.rs delete mode 100644 aero-collections/mail/uidindex.rs delete mode 100644 aero-collections/mail/unique_ident.rs create mode 100644 aero-collections/src/calendar/mod.rs create mode 100644 aero-collections/src/lib.rs create mode 100644 aero-collections/src/mail/incoming.rs create mode 100644 aero-collections/src/mail/mailbox.rs create mode 100644 aero-collections/src/mail/mod.rs create mode 100644 aero-collections/src/mail/namespace.rs create mode 100644 aero-collections/src/mail/query.rs create mode 100644 aero-collections/src/mail/snapshot.rs create mode 100644 aero-collections/src/mail/uidindex.rs create mode 100644 aero-collections/src/mail/unique_ident.rs create mode 100644 aero-collections/src/user.rs delete mode 100644 aero-collections/user.rs (limited to 'aero-collections') diff --git a/aero-collections/Cargo.toml b/aero-collections/Cargo.toml new file mode 100644 index 0000000..90d285e --- /dev/null +++ b/aero-collections/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "aero-collections" +version = "0.3.0" +authors = ["Alex Auvolat ", "Quentin Dufour "] +edition = "2021" +license = "EUPL-1.2" +description = "Aerogramme own representation of the different objects it manipulates" + +[dependencies] +aero-user.workspace = true +aero-bayou.workspace = true + +anyhow.workspace = true +base64.workspace = true +futures.workspace = true +lazy_static.workspace = true +serde.workspace = true +hex.workspace = true +tokio.workspace = true +tracing.workspace = true +rand.workspace = true +im.workspace = true +sodiumoxide.workspace = true +eml-codec.workspace = true diff --git a/aero-collections/mail/incoming.rs b/aero-collections/mail/incoming.rs deleted file mode 100644 index e2ad97d..0000000 --- a/aero-collections/mail/incoming.rs +++ /dev/null @@ -1,445 +0,0 @@ -//use std::collections::HashMap; -use std::convert::TryFrom; - -use std::sync::{Arc, Weak}; -use std::time::Duration; - -use anyhow::{anyhow, bail, Result}; -use base64::Engine; -use futures::{future::BoxFuture, FutureExt}; -//use tokio::io::AsyncReadExt; -use tokio::sync::watch; -use tracing::{debug, error, info, warn}; - -use crate::cryptoblob; -use crate::login::{Credentials, PublicCredentials}; -use crate::mail::mailbox::Mailbox; -use crate::mail::uidindex::ImapUidvalidity; -use crate::mail::unique_ident::*; -use crate::user::User; -use crate::mail::IMF; -use crate::storage; -use crate::timestamp::now_msec; - -const INCOMING_PK: &str = "incoming"; -const INCOMING_LOCK_SK: &str = "lock"; -const INCOMING_WATCH_SK: &str = "watch"; - -const MESSAGE_KEY: &str = "message-key"; - -// When a lock is held, it is held for LOCK_DURATION (here 5 minutes) -// It is renewed every LOCK_DURATION/3 -// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we -// lost the lock. -const LOCK_DURATION: Duration = Duration::from_secs(300); - -// In addition to checking when notified, also check for new mail every 10 minutes -const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600); - -pub async fn incoming_mail_watch_process( - user: Weak, - creds: Credentials, - rx_inbox_id: watch::Receiver>, -) { - if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await { - error!("Error in incoming mail watch process: {}", e); - } -} - -async fn incoming_mail_watch_process_internal( - user: Weak, - creds: Credentials, - mut rx_inbox_id: watch::Receiver>, -) -> Result<()> { - let mut lock_held = k2v_lock_loop( - creds.storage.build().await?, - storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), - ); - let storage = creds.storage.build().await?; - - let mut inbox: Option> = None; - let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); - - loop { - let maybe_updated_incoming_key = if *lock_held.borrow() { - debug!("incoming lock held"); - - let wait_new_mail = async { - loop { - match storage.row_poll(&incoming_key).await { - Ok(row_val) => break row_val.row_ref, - Err(e) => { - error!("Error in wait_new_mail: {}", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - } - } - }; - - tokio::select! { - inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), - _ = lock_held.changed() => None, - _ = rx_inbox_id.changed() => None, - } - } else { - debug!("incoming lock not held"); - tokio::select! { - _ = lock_held.changed() => None, - _ = rx_inbox_id.changed() => None, - } - }; - - let user = match Weak::upgrade(&user) { - Some(user) => user, - None => { - debug!("User no longer available, exiting incoming loop."); - break; - } - }; - debug!("User still available"); - - // If INBOX no longer is same mailbox, open new mailbox - let inbox_id = *rx_inbox_id.borrow(); - if let Some((id, uidvalidity)) = inbox_id { - if Some(id) != inbox.as_ref().map(|b| b.id) { - match user.open_mailbox_by_id(id, uidvalidity).await { - Ok(mb) => { - inbox = Some(mb); - } - Err(e) => { - inbox = None; - error!("Error when opening inbox ({}): {}", id, e); - tokio::time::sleep(Duration::from_secs(30)).await; - continue; - } - } - } - } - - // If we were able to open INBOX, and we have mail, - // fetch new mail - if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { - match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { - Ok(()) => { - incoming_key = updated_incoming_key; - } - Err(e) => { - error!("Could not fetch incoming mail: {}", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - } - } - } - drop(rx_inbox_id); - Ok(()) -} - -async fn handle_incoming_mail( - user: &Arc, - storage: &storage::Store, - inbox: &Arc, - lock_held: &watch::Receiver, -) -> Result<()> { - let mails_res = storage.blob_list("incoming/").await?; - - for object in mails_res { - if !*lock_held.borrow() { - break; - } - let key = object.0; - if let Some(mail_id) = key.strip_prefix("incoming/") { - if let Ok(mail_id) = mail_id.parse::() { - move_incoming_message(user, storage, inbox, mail_id).await?; - } - } - } - - Ok(()) -} - -async fn move_incoming_message( - user: &Arc, - storage: &storage::Store, - inbox: &Arc, - id: UniqueIdent, -) -> Result<()> { - info!("Moving incoming message: {}", id); - - let object_key = format!("incoming/{}", id); - - // 1. Fetch message from S3 - let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; - - // 1.a decrypt message key from headers - //info!("Object metadata: {:?}", get_result.metadata); - let key_encrypted_b64 = object - .meta - .get(MESSAGE_KEY) - .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; - let message_key = sodiumoxide::crypto::sealedbox::open( - &key_encrypted, - &user.creds.keys.public, - &user.creds.keys.secret, - ) - .map_err(|_| anyhow!("Cannot decrypt message key"))?; - let message_key = - cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; - - // 1.b retrieve message body - let obj_body = object.value; - let plain_mail = cryptoblob::open(&obj_body, &message_key) - .map_err(|_| anyhow!("Cannot decrypt email content"))?; - - // 2 parse mail and add to inbox - let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; - inbox - .append_from_s3(msg, id, object.blob_ref.clone(), message_key) - .await?; - - // 3 delete from incoming - storage.blob_rm(&object.blob_ref).await?; - - Ok(()) -} - -// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- - -fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver { - let (held_tx, held_rx) = watch::channel(false); - - tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); - - held_rx -} - -#[derive(Clone, Debug)] -enum LockState { - Unknown, - Empty, - Held(UniqueIdent, u64, storage::RowRef), -} - -async fn k2v_lock_loop_internal( - storage: storage::Store, - row_ref: storage::RowRef, - held_tx: watch::Sender, -) { - let (state_tx, mut state_rx) = watch::channel::(LockState::Unknown); - let mut state_rx_2 = state_rx.clone(); - - let our_pid = gen_ident(); - - // Loop 1: watch state of lock in K2V, save that in corresponding watch channel - let watch_lock_loop: BoxFuture> = async { - let mut ct = row_ref.clone(); - loop { - debug!("k2v watch lock loop iter: ct = {:?}", ct); - match storage.row_poll(&ct).await { - Err(e) => { - error!( - "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", - e - ); - state_tx.send(LockState::Unknown)?; - tokio::time::sleep(Duration::from_secs(30)).await; - } - Ok(cv) => { - let mut lock_state = None; - for v in cv.value.iter() { - if let storage::Alternative::Value(vbytes) = v { - if vbytes.len() == 32 { - let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); - let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); - if lock_state - .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2)) - .unwrap_or(true) - { - lock_state = Some((pid, ts)); - } - } - } - } - let new_ct = cv.row_ref; - - debug!( - "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", - ct, new_ct, lock_state - ); - state_tx.send( - lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) - .unwrap_or(LockState::Empty), - )?; - ct = new_ct; - } - } - } - } - .boxed(); - - // Loop 2: notify user whether we are holding the lock or not - let lock_notify_loop: BoxFuture> = async { - loop { - let now = now_msec(); - let held_with_expiration_time = match &*state_rx.borrow_and_update() { - LockState::Held(pid, ts, _ct) if *pid == our_pid => { - let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64; - if now < expiration_time { - Some(expiration_time) - } else { - None - } - } - _ => None, - }; - let held = held_with_expiration_time.is_some(); - if held != *held_tx.borrow() { - held_tx.send(held)?; - } - - let await_expired = async { - match held_with_expiration_time { - None => futures::future::pending().await, - Some(expiration_time) => { - tokio::time::sleep(Duration::from_millis(expiration_time - now)).await - } - }; - }; - - tokio::select!( - r = state_rx.changed() => { - r?; - } - _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"), - _ = await_expired => continue, - ); - } - } - .boxed(); - - // Loop 3: acquire lock when relevant - let take_lock_loop: BoxFuture> = async { - loop { - let now = now_msec(); - let state: LockState = state_rx_2.borrow_and_update().clone(); - let (acquire_at, ct) = match state { - LockState::Unknown => { - // If state of the lock is unknown, don't try to acquire - state_rx_2.changed().await?; - continue; - } - LockState::Empty => (now, None), - LockState::Held(pid, ts, ct) => { - if pid == our_pid { - (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct)) - } else { - (ts, Some(ct)) - } - } - }; - - // Wait until it is time to acquire lock - if acquire_at > now { - tokio::select!( - r = state_rx_2.changed() => { - // If lock state changed in the meantime, don't acquire and loop around - r?; - continue; - } - _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => () - ); - } - - // Acquire lock - let mut lock = vec![0u8; 32]; - lock[..8].copy_from_slice(&u64::to_be_bytes( - now_msec() + LOCK_DURATION.as_millis() as u64, - )); - lock[8..].copy_from_slice(&our_pid.0); - let row = match ct { - Some(existing) => existing, - None => row_ref.clone(), - }; - if let Err(e) = storage - .row_insert(vec![storage::RowVal::new(row, lock)]) - .await - { - error!("Could not take lock: {}", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - - // Wait for new information to loop back - state_rx_2.changed().await?; - } - } - .boxed(); - - let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop); - - debug!("lock loop exited, releasing"); - - if !held_tx.is_closed() { - warn!("weird..."); - let _ = held_tx.send(false); - } - - // If lock is ours, release it - let release = match &*state_rx.borrow() { - LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()), - _ => None, - }; - if let Some(ct) = release { - match storage.row_rm(&storage::Selector::Single(&ct)).await { - Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), - Ok(_) => (), - }; - } -} - -// ---- LMTP SIDE: storing messages encrypted with user's pubkey ---- - -pub struct EncryptedMessage { - key: cryptoblob::Key, - encrypted_body: Vec, -} - -impl EncryptedMessage { - pub fn new(body: Vec) -> Result { - let key = cryptoblob::gen_key(); - let encrypted_body = cryptoblob::seal(&body, &key)?; - Ok(Self { - key, - encrypted_body, - }) - } - - pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let storage = creds.storage.build().await?; - - // Get causality token of previous watch key - let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); - let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { - Err(_) => query, - Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), - }; - - // Write mail to encrypted storage - let encrypted_key = - sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - - let blob_val = storage::BlobVal::new( - storage::BlobRef(format!("incoming/{}", gen_ident())), - self.encrypted_body.clone().into(), - ) - .with_meta(MESSAGE_KEY.to_string(), key_header); - storage.blob_insert(blob_val).await?; - - // Update watch key to signal new mail - let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); - storage.row_insert(vec![watch_val]).await?; - Ok(()) - } -} diff --git a/aero-collections/mail/mailbox.rs b/aero-collections/mail/mailbox.rs deleted file mode 100644 index d1a5473..0000000 --- a/aero-collections/mail/mailbox.rs +++ /dev/null @@ -1,524 +0,0 @@ -use anyhow::{anyhow, bail, Result}; -use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; - -use crate::bayou::Bayou; -use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; -use crate::login::Credentials; -use crate::mail::uidindex::*; -use crate::mail::unique_ident::*; -use crate::mail::IMF; -use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; -use crate::timestamp::now_msec; - -pub struct Mailbox { - pub(super) id: UniqueIdent, - mbox: RwLock, -} - -impl Mailbox { - pub(crate) async fn open( - creds: &Credentials, - id: UniqueIdent, - min_uidvalidity: ImapUidvalidity, - ) -> Result { - let index_path = format!("index/{}", id); - let mail_path = format!("mail/{}", id); - - let mut uid_index = Bayou::::new(creds, index_path).await?; - uid_index.sync().await?; - - let uidvalidity = uid_index.state().uidvalidity; - if uidvalidity < min_uidvalidity { - uid_index - .push( - uid_index - .state() - .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()), - ) - .await?; - } - - // @FIXME reporting through opentelemetry or some logs - // info on the "shape" of the mailbox would be welcomed - /* - dump(&uid_index); - */ - - let mbox = RwLock::new(MailboxInternal { - id, - encryption_key: creds.keys.master.clone(), - storage: creds.storage.build().await?, - uid_index, - mail_path, - }); - - Ok(Self { id, mbox }) - } - - /// Sync data with backing store - pub async fn force_sync(&self) -> Result<()> { - self.mbox.write().await.force_sync().await - } - - /// Sync data with backing store only if changes are detected - /// or last sync is too old - pub async fn opportunistic_sync(&self) -> Result<()> { - self.mbox.write().await.opportunistic_sync().await - } - - /// Block until a sync has been done (due to changes in the event log) - pub async fn notify(&self) -> std::sync::Weak { - self.mbox.read().await.notifier() - } - - // ---- Functions for reading the mailbox ---- - - /// Get a clone of the current UID Index of this mailbox - /// (cloning is cheap so don't hesitate to use this) - pub async fn current_uid_index(&self) -> UidIndex { - self.mbox.read().await.uid_index.state().clone() - } - - /// Fetch the metadata (headers + some more info) of the specified - /// mail IDs - pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { - self.mbox.read().await.fetch_meta(ids).await - } - - /// Fetch an entire e-mail - pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - self.mbox.read().await.fetch_full(id, message_key).await - } - - pub async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { - super::snapshot::FrozenMailbox::new(self.clone()).await - } - - // ---- Functions for changing the mailbox ---- - - /// Add flags to message - pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { - self.mbox.write().await.add_flags(id, flags).await - } - - /// Delete flags from message - pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { - self.mbox.write().await.del_flags(id, flags).await - } - - /// Define the new flags for this message - pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { - self.mbox.write().await.set_flags(id, flags).await - } - - /// Insert an email into the mailbox - pub async fn append<'a>( - &self, - msg: IMF<'a>, - ident: Option, - flags: &[Flag], - ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { - self.mbox.write().await.append(msg, ident, flags).await - } - - /// Insert an email into the mailbox, copying it from an existing S3 object - pub async fn append_from_s3<'a>( - &self, - msg: IMF<'a>, - ident: UniqueIdent, - blob_ref: storage::BlobRef, - message_key: Key, - ) -> Result<()> { - self.mbox - .write() - .await - .append_from_s3(msg, ident, blob_ref, message_key) - .await - } - - /// Delete a message definitively from the mailbox - pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> { - self.mbox.write().await.delete(id).await - } - - /// Copy an email from an other Mailbox to this mailbox - /// (use this when possible, as it allows for a certain number of storage optimizations) - pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result { - if self.id == from.id { - bail!("Cannot copy into same mailbox"); - } - - let (mut selflock, fromlock); - if self.id < from.id { - selflock = self.mbox.write().await; - fromlock = from.mbox.write().await; - } else { - fromlock = from.mbox.write().await; - selflock = self.mbox.write().await; - }; - selflock.copy_from(&fromlock, uuid).await - } - - /// Move an email from an other Mailbox to this mailbox - /// (use this when possible, as it allows for a certain number of storage optimizations) - pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { - if self.id == from.id { - bail!("Cannot copy move same mailbox"); - } - - let (mut selflock, mut fromlock); - if self.id < from.id { - selflock = self.mbox.write().await; - fromlock = from.mbox.write().await; - } else { - fromlock = from.mbox.write().await; - selflock = self.mbox.write().await; - }; - selflock.move_from(&mut fromlock, uuid).await - } -} - -// ---- - -// Non standard but common flags: -// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml -struct MailboxInternal { - // 2023-05-15 will probably be used later. - #[allow(dead_code)] - id: UniqueIdent, - mail_path: String, - encryption_key: Key, - storage: Store, - uid_index: Bayou, -} - -impl MailboxInternal { - async fn force_sync(&mut self) -> Result<()> { - self.uid_index.sync().await?; - Ok(()) - } - - async fn opportunistic_sync(&mut self) -> Result<()> { - self.uid_index.opportunistic_sync().await?; - Ok(()) - } - - fn notifier(&self) -> std::sync::Weak { - self.uid_index.notifier() - } - - // ---- Functions for reading the mailbox ---- - - async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { - let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids - .iter() - .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) - .collect::>(); - let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; - - let mut meta_vec = vec![]; - for res in res_vec.into_iter() { - let mut meta_opt = None; - - // Resolve conflicts - for v in res.value.iter() { - match v { - storage::Alternative::Tombstone => (), - storage::Alternative::Value(v) => { - let meta = open_deserialize::(v, &self.encryption_key)?; - match meta_opt.as_mut() { - None => { - meta_opt = Some(meta); - } - Some(prevmeta) => { - prevmeta.try_merge(meta)?; - } - } - } - } - } - if let Some(meta) = meta_opt { - meta_vec.push(meta); - } else { - bail!("No valid meta value in k2v for {:?}", res.row_ref); - } - } - - Ok(meta_vec) - } - - async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let obj_res = self - .storage - .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) - .await?; - let body = obj_res.value; - cryptoblob::open(&body, message_key) - } - - // ---- Functions for changing the mailbox ---- - - async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { - let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec()); - self.uid_index.push(add_flag_op).await - } - - async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { - let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec()); - self.uid_index.push(del_flag_op).await - } - - async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { - let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec()); - self.uid_index.push(set_flag_op).await - } - - async fn append( - &mut self, - mail: IMF<'_>, - ident: Option, - flags: &[Flag], - ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { - let ident = ident.unwrap_or_else(gen_ident); - let message_key = gen_key(); - - futures::try_join!( - async { - // Encrypt and save mail body - let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.storage - .blob_insert(BlobVal::new( - BlobRef(format!("{}/{}", self.mail_path, ident)), - message_blob, - )) - .await?; - Ok::<_, anyhow::Error>(()) - }, - async { - // Save mail meta - let meta = MailMeta { - internaldate: now_msec(), - headers: mail.parsed.raw_headers.to_vec(), - message_key: message_key.clone(), - rfc822_size: mail.raw.len(), - }; - let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage - .row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &ident.to_string()), - meta_blob, - )]) - .await?; - Ok::<_, anyhow::Error>(()) - }, - self.uid_index.opportunistic_sync() - )?; - - // Add mail to Bayou mail index - let uid_state = self.uid_index.state(); - let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec()); - - let uidvalidity = uid_state.uidvalidity; - let (uid, modseq) = match add_mail_op { - UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq), - _ => unreachable!(), - }; - - self.uid_index.push(add_mail_op).await?; - - Ok((uidvalidity, uid, modseq)) - } - - async fn append_from_s3<'a>( - &mut self, - mail: IMF<'a>, - ident: UniqueIdent, - blob_src: storage::BlobRef, - message_key: Key, - ) -> Result<()> { - futures::try_join!( - async { - // Copy mail body from previous location - let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); - self.storage.blob_copy(&blob_src, &blob_dst).await?; - Ok::<_, anyhow::Error>(()) - }, - async { - // Save mail meta - let meta = MailMeta { - internaldate: now_msec(), - headers: mail.parsed.raw_headers.to_vec(), - message_key: message_key.clone(), - rfc822_size: mail.raw.len(), - }; - let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage - .row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &ident.to_string()), - meta_blob, - )]) - .await?; - Ok::<_, anyhow::Error>(()) - }, - self.uid_index.opportunistic_sync() - )?; - - // Add mail to Bayou mail index - let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]); - self.uid_index.push(add_mail_op).await?; - - Ok(()) - } - - async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { - if !self.uid_index.state().table.contains_key(&ident) { - bail!("Cannot delete mail that doesn't exit"); - } - - let del_mail_op = self.uid_index.state().op_mail_del(ident); - self.uid_index.push(del_mail_op).await?; - - futures::try_join!( - async { - // Delete mail body from S3 - self.storage - .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) - .await?; - Ok::<_, anyhow::Error>(()) - }, - async { - // Delete mail meta from K2V - let sk = ident.to_string(); - let res = self - .storage - .row_fetch(&storage::Selector::Single(&RowRef::new( - &self.mail_path, - &sk, - ))) - .await?; - if let Some(row_val) = res.into_iter().next() { - self.storage - .row_rm(&storage::Selector::Single(&row_val.row_ref)) - .await?; - } - Ok::<_, anyhow::Error>(()) - } - )?; - Ok(()) - } - - async fn copy_from( - &mut self, - from: &MailboxInternal, - source_id: UniqueIdent, - ) -> Result { - let new_id = gen_ident(); - self.copy_internal(from, source_id, new_id).await?; - Ok(new_id) - } - - async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> { - self.copy_internal(from, id, id).await?; - from.delete(id).await?; - Ok(()) - } - - async fn copy_internal( - &mut self, - from: &MailboxInternal, - source_id: UniqueIdent, - new_id: UniqueIdent, - ) -> Result<()> { - if self.encryption_key != from.encryption_key { - bail!("Message to be copied/moved does not belong to same account."); - } - - let flags = from - .uid_index - .state() - .table - .get(&source_id) - .ok_or(anyhow!("Source mail not found"))? - .2 - .clone(); - - futures::try_join!( - async { - let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); - let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); - self.storage.blob_copy(&src, &dst).await?; - Ok::<_, anyhow::Error>(()) - }, - async { - // Copy mail meta in K2V - let meta = &from.fetch_meta(&[source_id]).await?[0]; - let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.storage - .row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &new_id.to_string()), - meta_blob, - )]) - .await?; - Ok::<_, anyhow::Error>(()) - }, - self.uid_index.opportunistic_sync(), - )?; - - // Add mail to Bayou mail index - let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags); - self.uid_index.push(add_mail_op).await?; - - Ok(()) - } -} - -// Can be useful to debug so we want this code -// to be available to developers -#[allow(dead_code)] -fn dump(uid_index: &Bayou) { - let s = uid_index.state(); - println!("---- MAILBOX STATE ----"); - println!("UIDVALIDITY {}", s.uidvalidity); - println!("UIDNEXT {}", s.uidnext); - println!("INTERNALSEQ {}", s.internalseq); - for (uid, ident) in s.idx_by_uid.iter() { - println!( - "{} {} {}", - uid, - hex::encode(ident.0), - s.table.get(ident).cloned().unwrap().2.join(", ") - ); - } - println!(); -} - -// ---- - -/// The metadata of a message that is stored in K2V -/// at pk = mail/, sk = -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct MailMeta { - /// INTERNALDATE field (milliseconds since epoch) - pub internaldate: u64, - /// Headers of the message - pub headers: Vec, - /// Secret key for decrypting entire message - pub message_key: Key, - /// RFC822 size - pub rfc822_size: usize, -} - -impl MailMeta { - fn try_merge(&mut self, other: Self) -> Result<()> { - if self.headers != other.headers - || self.message_key != other.message_key - || self.rfc822_size != other.rfc822_size - { - bail!("Conflicting MailMeta values."); - } - self.internaldate = std::cmp::max(self.internaldate, other.internaldate); - Ok(()) - } -} diff --git a/aero-collections/mail/mod.rs b/aero-collections/mail/mod.rs deleted file mode 100644 index 03e85cd..0000000 --- a/aero-collections/mail/mod.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::convert::TryFrom; - -pub mod incoming; -pub mod mailbox; -pub mod query; -pub mod snapshot; -pub mod uidindex; -pub mod unique_ident; -pub mod namespace; - -// Internet Message Format -// aka RFC 822 - RFC 2822 - RFC 5322 -// 2023-05-15 don't want to refactor this struct now. -#[allow(clippy::upper_case_acronyms)] -pub struct IMF<'a> { - raw: &'a [u8], - parsed: eml_codec::part::composite::Message<'a>, -} - -impl<'a> TryFrom<&'a [u8]> for IMF<'a> { - type Error = (); - - fn try_from(body: &'a [u8]) -> Result, ()> { - let parsed = eml_codec::parse_message(body).or(Err(()))?.1; - Ok(Self { raw: body, parsed }) - } -} diff --git a/aero-collections/mail/namespace.rs b/aero-collections/mail/namespace.rs deleted file mode 100644 index 5e67173..0000000 --- a/aero-collections/mail/namespace.rs +++ /dev/null @@ -1,209 +0,0 @@ -use std::collections::{BTreeMap, HashMap}; -use std::sync::{Arc, Weak}; - -use anyhow::{anyhow, bail, Result}; -use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; -use tokio::sync::watch; - -use crate::cryptoblob::{open_deserialize, seal_serialize}; -use crate::login::Credentials; -use crate::mail::incoming::incoming_mail_watch_process; -use crate::mail::mailbox::Mailbox; -use crate::mail::uidindex::ImapUidvalidity; -use crate::mail::unique_ident::{gen_ident, UniqueIdent}; -use crate::storage; -use crate::timestamp::now_msec; - -pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; - -/// INBOX is the only mailbox that must always exist. -/// It is created automatically when the account is created. -/// IMAP allows the user to rename INBOX to something else, -/// in this case all messages from INBOX are moved to a mailbox -/// with the new name and the INBOX mailbox still exists and is empty. -/// In our implementation, we indeed move the underlying mailbox -/// to the new name (i.e. the new name has the same id as the previous -/// INBOX), and we create a new empty mailbox for INBOX. -pub const INBOX: &str = "INBOX"; - -/// For convenience purpose, we also create some special mailbox -/// that are described in RFC6154 SPECIAL-USE -/// @FIXME maybe it should be a configuration parameter -/// @FIXME maybe we should have a per-mailbox flag mechanism, either an enum or a string, so we -/// track which mailbox is used for what. -/// @FIXME Junk could be useful but we don't have any antispam solution yet so... -/// @FIXME IMAP supports virtual mailbox. \All or \Flagged are intended to be virtual mailboxes. -/// \Trash might be one, or not one. I don't know what we should do there. -pub const DRAFTS: &str = "Drafts"; -pub const ARCHIVE: &str = "Archive"; -pub const SENT: &str = "Sent"; -pub const TRASH: &str = "Trash"; - -pub(crate) const MAILBOX_LIST_PK: &str = "mailboxes"; -pub(crate) const MAILBOX_LIST_SK: &str = "list"; - -// ---- User's mailbox list (serialized in K2V) ---- - -#[derive(Serialize, Deserialize)] -pub(crate) struct MailboxList(BTreeMap); - -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] -pub(crate) struct MailboxListEntry { - id_lww: (u64, Option), - uidvalidity: ImapUidvalidity, -} - -impl MailboxListEntry { - fn merge(&mut self, other: &Self) { - // Simple CRDT merge rule - if other.id_lww.0 > self.id_lww.0 - || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1) - { - self.id_lww = other.id_lww; - } - self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity); - } -} - -impl MailboxList { - pub(crate) fn new() -> Self { - Self(BTreeMap::new()) - } - - pub(crate) fn merge(&mut self, list2: Self) { - for (k, v) in list2.0.into_iter() { - if let Some(e) = self.0.get_mut(&k) { - e.merge(&v); - } else { - self.0.insert(k, v); - } - } - } - - pub(crate) fn existing_mailbox_names(&self) -> Vec { - self.0 - .iter() - .filter(|(_, v)| v.id_lww.1.is_some()) - .map(|(k, _)| k.to_string()) - .collect() - } - - pub(crate) fn has_mailbox(&self, name: &str) -> bool { - matches!( - self.0.get(name), - Some(MailboxListEntry { - id_lww: (_, Some(_)), - .. - }) - ) - } - - pub(crate) fn get_mailbox(&self, name: &str) -> Option<(ImapUidvalidity, Option)> { - self.0.get(name).map( - |MailboxListEntry { - id_lww: (_, mailbox_id), - uidvalidity, - }| (*uidvalidity, *mailbox_id), - ) - } - - /// Ensures mailbox `name` maps to id `id`. - /// If it already mapped to that, returns None. - /// If a change had to be done, returns Some(new uidvalidity in mailbox). - pub(crate) fn set_mailbox(&mut self, name: &str, id: Option) -> Option { - let (ts, id, uidvalidity) = match self.0.get_mut(name) { - None => { - if id.is_none() { - return None; - } else { - (now_msec(), id, ImapUidvalidity::new(1).unwrap()) - } - } - Some(MailboxListEntry { - id_lww, - uidvalidity, - }) => { - if id_lww.1 == id { - return None; - } else { - ( - std::cmp::max(id_lww.0 + 1, now_msec()), - id, - ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(), - ) - } - } - }; - - self.0.insert( - name.into(), - MailboxListEntry { - id_lww: (ts, id), - uidvalidity, - }, - ); - Some(uidvalidity) - } - - pub(crate) fn update_uidvalidity(&mut self, name: &str, new_uidvalidity: ImapUidvalidity) { - match self.0.get_mut(name) { - None => { - self.0.insert( - name.into(), - MailboxListEntry { - id_lww: (now_msec(), None), - uidvalidity: new_uidvalidity, - }, - ); - } - Some(MailboxListEntry { uidvalidity, .. }) => { - *uidvalidity = std::cmp::max(*uidvalidity, new_uidvalidity); - } - } - } - - pub(crate) fn create_mailbox(&mut self, name: &str) -> CreatedMailbox { - if let Some(MailboxListEntry { - id_lww: (_, Some(id)), - uidvalidity, - }) = self.0.get(name) - { - return CreatedMailbox::Existed(*id, *uidvalidity); - } - - let id = gen_ident(); - let uidvalidity = self.set_mailbox(name, Some(id)).unwrap(); - CreatedMailbox::Created(id, uidvalidity) - } - - pub(crate) fn rename_mailbox(&mut self, old_name: &str, new_name: &str) -> Result<()> { - if let Some((uidvalidity, Some(mbid))) = self.get_mailbox(old_name) { - if self.has_mailbox(new_name) { - bail!( - "Cannot rename {} into {}: {} already exists", - old_name, - new_name, - new_name - ); - } - - self.set_mailbox(old_name, None); - self.set_mailbox(new_name, Some(mbid)); - self.update_uidvalidity(new_name, uidvalidity); - Ok(()) - } else { - bail!( - "Cannot rename {} into {}: {} doesn't exist", - old_name, - new_name, - old_name - ); - } - } -} - -pub(crate) enum CreatedMailbox { - Created(UniqueIdent, ImapUidvalidity), - Existed(UniqueIdent, ImapUidvalidity), -} diff --git a/aero-collections/mail/query.rs b/aero-collections/mail/query.rs deleted file mode 100644 index 3e6fe99..0000000 --- a/aero-collections/mail/query.rs +++ /dev/null @@ -1,137 +0,0 @@ -use super::mailbox::MailMeta; -use super::snapshot::FrozenMailbox; -use super::unique_ident::UniqueIdent; -use anyhow::Result; -use futures::future::FutureExt; -use futures::stream::{BoxStream, Stream, StreamExt}; - -/// Query is in charge of fetching efficiently -/// requested data for a list of emails -pub struct Query<'a, 'b> { - pub frozen: &'a FrozenMailbox, - pub emails: &'b [UniqueIdent], - pub scope: QueryScope, -} - -#[derive(Debug)] -pub enum QueryScope { - Index, - Partial, - Full, -} -impl QueryScope { - pub fn union(&self, other: &QueryScope) -> QueryScope { - match (self, other) { - (QueryScope::Full, _) | (_, QueryScope::Full) => QueryScope::Full, - (QueryScope::Partial, _) | (_, QueryScope::Partial) => QueryScope::Partial, - (QueryScope::Index, QueryScope::Index) => QueryScope::Index, - } - } -} - -//type QueryResultStream = Box>>; - -impl<'a, 'b> Query<'a, 'b> { - pub fn fetch(&self) -> BoxStream> { - match self.scope { - QueryScope::Index => Box::pin( - futures::stream::iter(self.emails) - .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), - ), - QueryScope::Partial => Box::pin(self.partial()), - QueryScope::Full => Box::pin(self.full()), - } - } - - // --- functions below are private *for reasons* - fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { - async move { - let maybe_meta_list: Result> = - self.frozen.mailbox.fetch_meta(self.emails).await; - let list_res = maybe_meta_list - .map(|meta_list| { - meta_list - .into_iter() - .zip(self.emails) - .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) - .collect() - }) - .unwrap_or_else(|e| vec![Err(e)]); - - futures::stream::iter(list_res) - } - .flatten_stream() - } - - fn full<'d>(&'d self) -> impl Stream> + 'd + Send { - self.partial().then(move |maybe_meta| async move { - let meta = maybe_meta?; - - let content = self - .frozen - .mailbox - .fetch_full( - *meta.uuid(), - &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) - .await?; - - Ok(meta.into_full(content).expect("meta to be PartialResult")) - }) - } -} - -#[derive(Debug, Clone)] -pub enum QueryResult { - IndexResult { - uuid: UniqueIdent, - }, - PartialResult { - uuid: UniqueIdent, - metadata: MailMeta, - }, - FullResult { - uuid: UniqueIdent, - metadata: MailMeta, - content: Vec, - }, -} -impl QueryResult { - pub fn uuid(&self) -> &UniqueIdent { - match self { - Self::IndexResult { uuid, .. } => uuid, - Self::PartialResult { uuid, .. } => uuid, - Self::FullResult { uuid, .. } => uuid, - } - } - - pub fn metadata(&self) -> Option<&MailMeta> { - match self { - Self::IndexResult { .. } => None, - Self::PartialResult { metadata, .. } => Some(metadata), - Self::FullResult { metadata, .. } => Some(metadata), - } - } - - #[allow(dead_code)] - pub fn content(&self) -> Option<&[u8]> { - match self { - Self::FullResult { content, .. } => Some(content), - _ => None, - } - } - - fn into_full(self, content: Vec) -> Option { - match self { - Self::PartialResult { uuid, metadata } => Some(Self::FullResult { - uuid, - metadata, - content, - }), - _ => None, - } - } -} diff --git a/aero-collections/mail/snapshot.rs b/aero-collections/mail/snapshot.rs deleted file mode 100644 index ed756b5..0000000 --- a/aero-collections/mail/snapshot.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; - -use super::mailbox::Mailbox; -use super::query::{Query, QueryScope}; -use super::uidindex::UidIndex; -use super::unique_ident::UniqueIdent; - -/// A Frozen Mailbox has a snapshot of the current mailbox -/// state that is desynchronized with the real mailbox state. -/// It's up to the user to choose when their snapshot must be updated -/// to give useful information to their clients -pub struct FrozenMailbox { - pub mailbox: Arc, - pub snapshot: UidIndex, -} - -impl FrozenMailbox { - /// Create a snapshot from a mailbox, the mailbox + the snapshot - /// becomes the "Frozen Mailbox". - pub async fn new(mailbox: Arc) -> Self { - let state = mailbox.current_uid_index().await; - - Self { - mailbox, - snapshot: state, - } - } - - /// Force the synchronization of the inner mailbox - /// but do not update the local snapshot - pub async fn sync(&self) -> Result<()> { - self.mailbox.opportunistic_sync().await - } - - /// Peek snapshot without updating the frozen mailbox - /// Can be useful if you want to plan some writes - /// while sending a diff to the client later - pub async fn peek(&self) -> UidIndex { - self.mailbox.current_uid_index().await - } - - /// Update the FrozenMailbox local snapshot. - /// Returns the old snapshot, so you can build a diff - pub async fn update(&mut self) -> UidIndex { - let old_snapshot = self.snapshot.clone(); - self.snapshot = self.mailbox.current_uid_index().await; - - old_snapshot - } - - pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { - Query { - frozen: self, - emails: uuids, - scope, - } - } -} diff --git a/aero-collections/mail/uidindex.rs b/aero-collections/mail/uidindex.rs deleted file mode 100644 index 5a06670..0000000 --- a/aero-collections/mail/uidindex.rs +++ /dev/null @@ -1,474 +0,0 @@ -use std::num::{NonZeroU32, NonZeroU64}; - -use im::{HashMap, OrdMap, OrdSet}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; - -use crate::bayou::*; -use crate::mail::unique_ident::UniqueIdent; - -pub type ModSeq = NonZeroU64; -pub type ImapUid = NonZeroU32; -pub type ImapUidvalidity = NonZeroU32; -pub type Flag = String; -pub type IndexEntry = (ImapUid, ModSeq, Vec); - -/// A UidIndex handles the mutable part of a mailbox -/// It is built by running the event log on it -/// Each applied log generates a new UidIndex by cloning the previous one -/// and applying the event. This is why we use immutable datastructures: -/// they are cheap to clone. -#[derive(Clone)] -pub struct UidIndex { - // Source of trust - pub table: OrdMap, - - // Indexes optimized for queries - pub idx_by_uid: OrdMap, - pub idx_by_modseq: OrdMap, - pub idx_by_flag: FlagIndex, - - // "Public" Counters - pub uidvalidity: ImapUidvalidity, - pub uidnext: ImapUid, - pub highestmodseq: ModSeq, - - // "Internal" Counters - pub internalseq: ImapUid, - pub internalmodseq: ModSeq, -} - -#[derive(Clone, Serialize, Deserialize, Debug)] -pub enum UidIndexOp { - MailAdd(UniqueIdent, ImapUid, ModSeq, Vec), - MailDel(UniqueIdent), - FlagAdd(UniqueIdent, ModSeq, Vec), - FlagDel(UniqueIdent, ModSeq, Vec), - FlagSet(UniqueIdent, ModSeq, Vec), - BumpUidvalidity(u32), -} - -impl UidIndex { - #[must_use] - pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags) - } - - #[must_use] - pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp { - UidIndexOp::MailDel(ident) - } - - #[must_use] - pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagAdd(ident, self.internalmodseq, flags) - } - - #[must_use] - pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagDel(ident, self.internalmodseq, flags) - } - - #[must_use] - pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagSet(ident, self.internalmodseq, flags) - } - - #[must_use] - pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp { - UidIndexOp::BumpUidvalidity(count) - } - - // INTERNAL functions to keep state consistent - - fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) { - // Insert the email in our table - self.table.insert(ident, (uid, modseq, flags.to_owned())); - - // Update the indexes/caches - self.idx_by_uid.insert(uid, ident); - self.idx_by_flag.insert(uid, flags); - self.idx_by_modseq.insert(modseq, ident); - } - - fn unreg_email(&mut self, ident: &UniqueIdent) { - // We do nothing if the mail does not exist - let (uid, modseq, flags) = match self.table.get(ident) { - Some(v) => v, - None => return, - }; - - // Delete all cache entries - self.idx_by_uid.remove(uid); - self.idx_by_flag.remove(*uid, flags); - self.idx_by_modseq.remove(modseq); - - // Remove from source of trust - self.table.remove(ident); - } -} - -impl Default for UidIndex { - fn default() -> Self { - Self { - table: OrdMap::new(), - - idx_by_uid: OrdMap::new(), - idx_by_modseq: OrdMap::new(), - idx_by_flag: FlagIndex::new(), - - uidvalidity: NonZeroU32::new(1).unwrap(), - uidnext: NonZeroU32::new(1).unwrap(), - highestmodseq: NonZeroU64::new(1).unwrap(), - - internalseq: NonZeroU32::new(1).unwrap(), - internalmodseq: NonZeroU64::new(1).unwrap(), - } - } -} - -impl BayouState for UidIndex { - type Op = UidIndexOp; - - fn apply(&self, op: &UidIndexOp) -> Self { - let mut new = self.clone(); - match op { - UidIndexOp::MailAdd(ident, uid, modseq, flags) => { - // Change UIDValidity if there is a UID conflict or a MODSEQ conflict - // @FIXME Need to prove that summing work - // The intuition: we increase the UIDValidity by the number of possible conflicts - if *uid < new.internalseq || *modseq < new.internalmodseq { - let bump_uid = new.internalseq.get() - uid.get(); - let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; - new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap(); - } - - // Assign the real uid of the email - let new_uid = new.internalseq; - - // Assign the real modseq of the email and its new flags - let new_modseq = new.internalmodseq; - - // Delete the previous entry if any. - // Our proof has no assumption on `ident` uniqueness, - // so we must handle this case even it is very unlikely - // In this case, we overwrite the email. - // Note: assigning a new UID is mandatory. - new.unreg_email(ident); - - // We record our email and update ou caches - new.reg_email(*ident, new_uid, new_modseq, flags); - - // Update counters - new.highestmodseq = new.internalmodseq; - - new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); - new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); - - new.uidnext = new.internalseq; - } - UidIndexOp::MailDel(ident) => { - // If the email is known locally, we remove its references in all our indexes - new.unreg_email(ident); - - // We update the counter - new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); - } - UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => { - if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { - // Bump UIDValidity if required - if *candidate_modseq < new.internalmodseq { - let bump_modseq = - (new.internalmodseq.get() - candidate_modseq.get()) as u32; - new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); - } - - // Add flags to the source of trust and the cache - let mut to_add: Vec = new_flags - .iter() - .filter(|f| !existing_flags.contains(f)) - .cloned() - .collect(); - new.idx_by_flag.insert(*uid, &to_add); - *email_modseq = new.internalmodseq; - new.idx_by_modseq.insert(new.internalmodseq, *ident); - existing_flags.append(&mut to_add); - - // Update counters - new.highestmodseq = new.internalmodseq; - new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); - } - } - UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => { - if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { - // Bump UIDValidity if required - if *candidate_modseq < new.internalmodseq { - let bump_modseq = - (new.internalmodseq.get() - candidate_modseq.get()) as u32; - new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); - } - - // Remove flags from the source of trust and the cache - existing_flags.retain(|x| !rm_flags.contains(x)); - new.idx_by_flag.remove(*uid, rm_flags); - - // Register that email has been modified - new.idx_by_modseq.insert(new.internalmodseq, *ident); - *email_modseq = new.internalmodseq; - - // Update counters - new.highestmodseq = new.internalmodseq; - new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); - } - } - UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => { - if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { - // Bump UIDValidity if required - if *candidate_modseq < new.internalmodseq { - let bump_modseq = - (new.internalmodseq.get() - candidate_modseq.get()) as u32; - new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); - } - - // Remove flags from the source of trust and the cache - let (keep_flags, rm_flags): (Vec, Vec) = existing_flags - .iter() - .cloned() - .partition(|x| new_flags.contains(x)); - *existing_flags = keep_flags; - let mut to_add: Vec = new_flags - .iter() - .filter(|f| !existing_flags.contains(f)) - .cloned() - .collect(); - existing_flags.append(&mut to_add); - new.idx_by_flag.remove(*uid, &rm_flags); - new.idx_by_flag.insert(*uid, &to_add); - - // Register that email has been modified - new.idx_by_modseq.insert(new.internalmodseq, *ident); - *email_modseq = new.internalmodseq; - - // Update counters - new.highestmodseq = new.internalmodseq; - new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); - } - } - UidIndexOp::BumpUidvalidity(count) => { - new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count) - .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); - } - } - new - } -} - -// ---- FlagIndex implementation ---- - -#[derive(Clone)] -pub struct FlagIndex(HashMap>); -pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>; - -impl FlagIndex { - fn new() -> Self { - Self(HashMap::new()) - } - fn insert(&mut self, uid: ImapUid, flags: &[Flag]) { - flags.iter().for_each(|flag| { - self.0 - .entry(flag.clone()) - .or_insert(OrdSet::new()) - .insert(uid); - }); - } - fn remove(&mut self, uid: ImapUid, flags: &[Flag]) { - for flag in flags.iter() { - if let Some(set) = self.0.get_mut(flag) { - set.remove(&uid); - if set.is_empty() { - self.0.remove(flag); - } - } - } - } - - pub fn get(&self, f: &Flag) -> Option<&OrdSet> { - self.0.get(f) - } - - pub fn flags(&self) -> FlagIter { - self.0.keys() - } -} - -// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- - -#[derive(Serialize, Deserialize)] -struct UidIndexSerializedRepr { - mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec)>, - - uidvalidity: ImapUidvalidity, - uidnext: ImapUid, - highestmodseq: ModSeq, - - internalseq: ImapUid, - internalmodseq: ModSeq, -} - -impl<'de> Deserialize<'de> for UidIndex { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; - - let mut uidindex = UidIndex { - table: OrdMap::new(), - - idx_by_uid: OrdMap::new(), - idx_by_modseq: OrdMap::new(), - idx_by_flag: FlagIndex::new(), - - uidvalidity: val.uidvalidity, - uidnext: val.uidnext, - highestmodseq: val.highestmodseq, - - internalseq: val.internalseq, - internalmodseq: val.internalmodseq, - }; - - val.mails - .iter() - .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags)); - - Ok(uidindex) - } -} - -impl Serialize for UidIndex { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut mails = vec![]; - for (ident, (uid, modseq, flags)) in self.table.iter() { - mails.push((*uid, *modseq, *ident, flags.clone())); - } - - let val = UidIndexSerializedRepr { - mails, - uidvalidity: self.uidvalidity, - uidnext: self.uidnext, - highestmodseq: self.highestmodseq, - internalseq: self.internalseq, - internalmodseq: self.internalmodseq, - }; - - val.serialize(serializer) - } -} - -// ---- TESTS ---- - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_uidindex() { - let mut state = UidIndex::default(); - - // Add message 1 - { - let m = UniqueIdent([0x01; 24]); - let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; - let ev = state.op_mail_add(m, f); - state = state.apply(&ev); - - // Early checks - assert_eq!(state.table.len(), 1); - let (uid, modseq, flags) = state.table.get(&m).unwrap(); - assert_eq!(*uid, NonZeroU32::new(1).unwrap()); - assert_eq!(*modseq, NonZeroU64::new(1).unwrap()); - assert_eq!(flags.len(), 2); - let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); - assert_eq!(&m, ident); - let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); - assert_eq!(recent.len(), 1); - assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap()); - assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap()); - assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap()); - } - - // Add message 2 - { - let m = UniqueIdent([0x02; 24]); - let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; - let ev = state.op_mail_add(m, f); - state = state.apply(&ev); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 2); - } - - // Add flags to message 1 - { - let m = UniqueIdent([0x01; 24]); - let f = vec!["Important".to_string(), "$cl_1".to_string()]; - let ev = state.op_flag_add(m, f); - state = state.apply(&ev); - } - - // Delete flags from message 1 - { - let m = UniqueIdent([0x01; 24]); - let f = vec!["\\Recent".to_string()]; - let ev = state.op_flag_del(m, f); - state = state.apply(&ev); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 2); - } - - // Delete message 2 - { - let m = UniqueIdent([0x02; 24]); - let ev = state.op_mail_del(m); - state = state.apply(&ev); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 1); - } - - // Add a message 3 concurrent to message 1 (trigger a uid validity change) - { - let m = UniqueIdent([0x03; 24]); - let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; - let ev = UidIndexOp::MailAdd( - m, - NonZeroU32::new(1).unwrap(), - NonZeroU64::new(1).unwrap(), - f, - ); - state = state.apply(&ev); - } - - // Checks - { - assert_eq!(state.table.len(), 2); - assert!(state.uidvalidity > NonZeroU32::new(1).unwrap()); - - let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); - assert_eq!(ident, &UniqueIdent([0x03; 24])); - - let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); - assert_eq!(archive.len(), 2); - let mut iter = archive.iter(); - assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap()); - assert_eq!(iter.next().unwrap(), last_uid); - } - } -} diff --git a/aero-collections/mail/unique_ident.rs b/aero-collections/mail/unique_ident.rs deleted file mode 100644 index 0e629db..0000000 --- a/aero-collections/mail/unique_ident.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; - -use lazy_static::lazy_static; -use rand::prelude::*; -use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; - -use crate::timestamp::now_msec; - -/// An internal Mail Identifier is composed of two components: -/// - a process identifier, 128 bits, itself composed of: -/// - the timestamp of when the process started, 64 bits -/// - a 64-bit random number -/// - a sequence number, 64 bits -/// They are not part of the protocol but an internal representation -/// required by Aerogramme. -/// Their main property is to be unique without having to rely -/// on synchronization between IMAP processes. -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct UniqueIdent(pub [u8; 24]); - -struct IdentGenerator { - pid: u128, - sn: AtomicU64, -} - -impl IdentGenerator { - fn new() -> Self { - let time = now_msec() as u128; - let rand = thread_rng().gen::() as u128; - Self { - pid: (time << 64) | rand, - sn: AtomicU64::new(0), - } - } - - fn gen(&self) -> UniqueIdent { - let sn = self.sn.fetch_add(1, Ordering::Relaxed); - let mut res = [0u8; 24]; - res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); - res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); - UniqueIdent(res) - } -} - -lazy_static! { - static ref GENERATOR: IdentGenerator = IdentGenerator::new(); -} - -pub fn gen_ident() -> UniqueIdent { - GENERATOR.gen() -} - -// -- serde -- - -impl<'de> Deserialize<'de> for UniqueIdent { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let v = String::deserialize(d)?; - UniqueIdent::from_str(&v).map_err(D::Error::custom) - } -} - -impl Serialize for UniqueIdent { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&self.to_string()) - } -} - -impl std::fmt::Display for UniqueIdent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex::encode(self.0)) - } -} - -impl std::fmt::Debug for UniqueIdent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex::encode(self.0)) - } -} - -impl FromStr for UniqueIdent { - type Err = &'static str; - - fn from_str(s: &str) -> Result { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - - if bytes.len() != 24 { - return Err("bad length"); - } - - let mut tmp = [0u8; 24]; - tmp[..].copy_from_slice(&bytes); - Ok(UniqueIdent(tmp)) - } -} diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs new file mode 100644 index 0000000..19e3340 --- /dev/null +++ b/aero-collections/src/calendar/mod.rs @@ -0,0 +1 @@ +//@FIXME Event Index diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs new file mode 100644 index 0000000..adcfc93 --- /dev/null +++ b/aero-collections/src/lib.rs @@ -0,0 +1,3 @@ +pub mod user; +pub mod mail; +pub mod calendar; diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs new file mode 100644 index 0000000..8220461 --- /dev/null +++ b/aero-collections/src/mail/incoming.rs @@ -0,0 +1,443 @@ +use std::sync::{Arc, Weak}; +use std::time::Duration; + +use anyhow::{anyhow, bail, Result}; +use base64::Engine; +use futures::{future::BoxFuture, FutureExt}; +//use tokio::io::AsyncReadExt; +use tokio::sync::watch; +use tracing::{debug, error, info, warn}; + +use aero_user::cryptoblob; +use aero_user::login::{Credentials, PublicCredentials}; +use aero_user::storage; +use aero_bayou::timestamp::now_msec; + +use crate::mail::mailbox::Mailbox; +use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::*; +use crate::user::User; +use crate::mail::IMF; + +const INCOMING_PK: &str = "incoming"; +const INCOMING_LOCK_SK: &str = "lock"; +const INCOMING_WATCH_SK: &str = "watch"; + +const MESSAGE_KEY: &str = "message-key"; + +// When a lock is held, it is held for LOCK_DURATION (here 5 minutes) +// It is renewed every LOCK_DURATION/3 +// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we +// lost the lock. +const LOCK_DURATION: Duration = Duration::from_secs(300); + +// In addition to checking when notified, also check for new mail every 10 minutes +const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600); + +pub async fn incoming_mail_watch_process( + user: Weak, + creds: Credentials, + rx_inbox_id: watch::Receiver>, +) { + if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await { + error!("Error in incoming mail watch process: {}", e); + } +} + +async fn incoming_mail_watch_process_internal( + user: Weak, + creds: Credentials, + mut rx_inbox_id: watch::Receiver>, +) -> Result<()> { + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); + let storage = creds.storage.build().await?; + + let mut inbox: Option> = None; + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + + loop { + let maybe_updated_incoming_key = if *lock_held.borrow() { + debug!("incoming lock held"); + + let wait_new_mail = async { + loop { + match storage.row_poll(&incoming_key).await { + Ok(row_val) => break row_val.row_ref, + Err(e) => { + error!("Error in wait_new_mail: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + }; + + tokio::select! { + inc_k = wait_new_mail => Some(inc_k), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, + } + } else { + debug!("incoming lock not held"); + tokio::select! { + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, + } + }; + + let user = match Weak::upgrade(&user) { + Some(user) => user, + None => { + debug!("User no longer available, exiting incoming loop."); + break; + } + }; + debug!("User still available"); + + // If INBOX no longer is same mailbox, open new mailbox + let inbox_id = *rx_inbox_id.borrow(); + if let Some((id, uidvalidity)) = inbox_id { + if Some(id) != inbox.as_ref().map(|b| b.id) { + match user.open_mailbox_by_id(id, uidvalidity).await { + Ok(mb) => { + inbox = Some(mb); + } + Err(e) => { + inbox = None; + error!("Error when opening inbox ({}): {}", id, e); + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } + } + } + } + + // If we were able to open INBOX, and we have mail, + // fetch new mail + if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { + Ok(()) => { + incoming_key = updated_incoming_key; + } + Err(e) => { + error!("Could not fetch incoming mail: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + } + drop(rx_inbox_id); + Ok(()) +} + +async fn handle_incoming_mail( + user: &Arc, + storage: &storage::Store, + inbox: &Arc, + lock_held: &watch::Receiver, +) -> Result<()> { + let mails_res = storage.blob_list("incoming/").await?; + + for object in mails_res { + if !*lock_held.borrow() { + break; + } + let key = object.0; + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::() { + move_incoming_message(user, storage, inbox, mail_id).await?; + } + } + } + + Ok(()) +} + +async fn move_incoming_message( + user: &Arc, + storage: &storage::Store, + inbox: &Arc, + id: UniqueIdent, +) -> Result<()> { + info!("Moving incoming message: {}", id); + + let object_key = format!("incoming/{}", id); + + // 1. Fetch message from S3 + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; + + // 1.a decrypt message key from headers + //info!("Object metadata: {:?}", get_result.metadata); + let key_encrypted_b64 = object + .meta + .get(MESSAGE_KEY) + .ok_or(anyhow!("Missing key in metadata"))?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; + let message_key = sodiumoxide::crypto::sealedbox::open( + &key_encrypted, + &user.creds.keys.public, + &user.creds.keys.secret, + ) + .map_err(|_| anyhow!("Cannot decrypt message key"))?; + let message_key = + cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; + + // 1.b retrieve message body + let obj_body = object.value; + let plain_mail = cryptoblob::open(&obj_body, &message_key) + .map_err(|_| anyhow!("Cannot decrypt email content"))?; + + // 2 parse mail and add to inbox + let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; + inbox + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) + .await?; + + // 3 delete from incoming + storage.blob_rm(&object.blob_ref).await?; + + Ok(()) +} + +// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- + +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver { + let (held_tx, held_rx) = watch::channel(false); + + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); + + held_rx +} + +#[derive(Clone, Debug)] +enum LockState { + Unknown, + Empty, + Held(UniqueIdent, u64, storage::RowRef), +} + +async fn k2v_lock_loop_internal( + storage: storage::Store, + row_ref: storage::RowRef, + held_tx: watch::Sender, +) { + let (state_tx, mut state_rx) = watch::channel::(LockState::Unknown); + let mut state_rx_2 = state_rx.clone(); + + let our_pid = gen_ident(); + + // Loop 1: watch state of lock in K2V, save that in corresponding watch channel + let watch_lock_loop: BoxFuture> = async { + let mut ct = row_ref.clone(); + loop { + debug!("k2v watch lock loop iter: ct = {:?}", ct); + match storage.row_poll(&ct).await { + Err(e) => { + error!( + "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", + e + ); + state_tx.send(LockState::Unknown)?; + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(cv) => { + let mut lock_state = None; + for v in cv.value.iter() { + if let storage::Alternative::Value(vbytes) = v { + if vbytes.len() == 32 { + let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); + let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); + if lock_state + .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2)) + .unwrap_or(true) + { + lock_state = Some((pid, ts)); + } + } + } + } + let new_ct = cv.row_ref; + + debug!( + "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", + ct, new_ct, lock_state + ); + state_tx.send( + lock_state + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) + .unwrap_or(LockState::Empty), + )?; + ct = new_ct; + } + } + } + } + .boxed(); + + // Loop 2: notify user whether we are holding the lock or not + let lock_notify_loop: BoxFuture> = async { + loop { + let now = now_msec(); + let held_with_expiration_time = match &*state_rx.borrow_and_update() { + LockState::Held(pid, ts, _ct) if *pid == our_pid => { + let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64; + if now < expiration_time { + Some(expiration_time) + } else { + None + } + } + _ => None, + }; + let held = held_with_expiration_time.is_some(); + if held != *held_tx.borrow() { + held_tx.send(held)?; + } + + let await_expired = async { + match held_with_expiration_time { + None => futures::future::pending().await, + Some(expiration_time) => { + tokio::time::sleep(Duration::from_millis(expiration_time - now)).await + } + }; + }; + + tokio::select!( + r = state_rx.changed() => { + r?; + } + _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"), + _ = await_expired => continue, + ); + } + } + .boxed(); + + // Loop 3: acquire lock when relevant + let take_lock_loop: BoxFuture> = async { + loop { + let now = now_msec(); + let state: LockState = state_rx_2.borrow_and_update().clone(); + let (acquire_at, ct) = match state { + LockState::Unknown => { + // If state of the lock is unknown, don't try to acquire + state_rx_2.changed().await?; + continue; + } + LockState::Empty => (now, None), + LockState::Held(pid, ts, ct) => { + if pid == our_pid { + (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct)) + } else { + (ts, Some(ct)) + } + } + }; + + // Wait until it is time to acquire lock + if acquire_at > now { + tokio::select!( + r = state_rx_2.changed() => { + // If lock state changed in the meantime, don't acquire and loop around + r?; + continue; + } + _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => () + ); + } + + // Acquire lock + let mut lock = vec![0u8; 32]; + lock[..8].copy_from_slice(&u64::to_be_bytes( + now_msec() + LOCK_DURATION.as_millis() as u64, + )); + lock[8..].copy_from_slice(&our_pid.0); + let row = match ct { + Some(existing) => existing, + None => row_ref.clone(), + }; + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { + error!("Could not take lock: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + + // Wait for new information to loop back + state_rx_2.changed().await?; + } + } + .boxed(); + + let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop); + + debug!("lock loop exited, releasing"); + + if !held_tx.is_closed() { + warn!("weird..."); + let _ = held_tx.send(false); + } + + // If lock is ours, release it + let release = match &*state_rx.borrow() { + LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()), + _ => None, + }; + if let Some(ct) = release { + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; + } +} + +// ---- LMTP SIDE: storing messages encrypted with user's pubkey ---- + +pub struct EncryptedMessage { + key: cryptoblob::Key, + encrypted_body: Vec, +} + +impl EncryptedMessage { + pub fn new(body: Vec) -> Result { + let key = cryptoblob::gen_key(); + let encrypted_body = cryptoblob::seal(&body, &key)?; + Ok(Self { + key, + encrypted_body, + }) + } + + pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { + let storage = creds.storage.build().await?; + + // Get causality token of previous watch key + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { + Err(_) => query, + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), + }; + + // Write mail to encrypted storage + let encrypted_key = + sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); + + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(blob_val).await?; + + // Update watch key to signal new mail + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); + storage.row_insert(vec![watch_val]).await?; + Ok(()) + } +} diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs new file mode 100644 index 0000000..a767678 --- /dev/null +++ b/aero-collections/src/mail/mailbox.rs @@ -0,0 +1,525 @@ +use anyhow::{anyhow, bail, Result}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; +use aero_user::login::Credentials; +use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; +use aero_bayou::Bayou; +use aero_bayou::timestamp::now_msec; + +use crate::mail::uidindex::*; +use crate::mail::unique_ident::*; +use crate::mail::IMF; + +pub struct Mailbox { + pub(super) id: UniqueIdent, + mbox: RwLock, +} + +impl Mailbox { + pub(crate) async fn open( + creds: &Credentials, + id: UniqueIdent, + min_uidvalidity: ImapUidvalidity, + ) -> Result { + let index_path = format!("index/{}", id); + let mail_path = format!("mail/{}", id); + + let mut uid_index = Bayou::::new(creds, index_path).await?; + uid_index.sync().await?; + + let uidvalidity = uid_index.state().uidvalidity; + if uidvalidity < min_uidvalidity { + uid_index + .push( + uid_index + .state() + .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()), + ) + .await?; + } + + // @FIXME reporting through opentelemetry or some logs + // info on the "shape" of the mailbox would be welcomed + /* + dump(&uid_index); + */ + + let mbox = RwLock::new(MailboxInternal { + id, + encryption_key: creds.keys.master.clone(), + storage: creds.storage.build().await?, + uid_index, + mail_path, + }); + + Ok(Self { id, mbox }) + } + + /// Sync data with backing store + pub async fn force_sync(&self) -> Result<()> { + self.mbox.write().await.force_sync().await + } + + /// Sync data with backing store only if changes are detected + /// or last sync is too old + pub async fn opportunistic_sync(&self) -> Result<()> { + self.mbox.write().await.opportunistic_sync().await + } + + /// Block until a sync has been done (due to changes in the event log) + pub async fn notify(&self) -> std::sync::Weak { + self.mbox.read().await.notifier() + } + + // ---- Functions for reading the mailbox ---- + + /// Get a clone of the current UID Index of this mailbox + /// (cloning is cheap so don't hesitate to use this) + pub async fn current_uid_index(&self) -> UidIndex { + self.mbox.read().await.uid_index.state().clone() + } + + /// Fetch the metadata (headers + some more info) of the specified + /// mail IDs + pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { + self.mbox.read().await.fetch_meta(ids).await + } + + /// Fetch an entire e-mail + pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { + self.mbox.read().await.fetch_full(id, message_key).await + } + + pub async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { + super::snapshot::FrozenMailbox::new(self.clone()).await + } + + // ---- Functions for changing the mailbox ---- + + /// Add flags to message + pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { + self.mbox.write().await.add_flags(id, flags).await + } + + /// Delete flags from message + pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { + self.mbox.write().await.del_flags(id, flags).await + } + + /// Define the new flags for this message + pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> { + self.mbox.write().await.set_flags(id, flags).await + } + + /// Insert an email into the mailbox + pub async fn append<'a>( + &self, + msg: IMF<'a>, + ident: Option, + flags: &[Flag], + ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { + self.mbox.write().await.append(msg, ident, flags).await + } + + /// Insert an email into the mailbox, copying it from an existing S3 object + pub async fn append_from_s3<'a>( + &self, + msg: IMF<'a>, + ident: UniqueIdent, + blob_ref: storage::BlobRef, + message_key: Key, + ) -> Result<()> { + self.mbox + .write() + .await + .append_from_s3(msg, ident, blob_ref, message_key) + .await + } + + /// Delete a message definitively from the mailbox + pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> { + self.mbox.write().await.delete(id).await + } + + /// Copy an email from an other Mailbox to this mailbox + /// (use this when possible, as it allows for a certain number of storage optimizations) + pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result { + if self.id == from.id { + bail!("Cannot copy into same mailbox"); + } + + let (mut selflock, fromlock); + if self.id < from.id { + selflock = self.mbox.write().await; + fromlock = from.mbox.write().await; + } else { + fromlock = from.mbox.write().await; + selflock = self.mbox.write().await; + }; + selflock.copy_from(&fromlock, uuid).await + } + + /// Move an email from an other Mailbox to this mailbox + /// (use this when possible, as it allows for a certain number of storage optimizations) + pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { + if self.id == from.id { + bail!("Cannot copy move same mailbox"); + } + + let (mut selflock, mut fromlock); + if self.id < from.id { + selflock = self.mbox.write().await; + fromlock = from.mbox.write().await; + } else { + fromlock = from.mbox.write().await; + selflock = self.mbox.write().await; + }; + selflock.move_from(&mut fromlock, uuid).await + } +} + +// ---- + +// Non standard but common flags: +// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml +struct MailboxInternal { + // 2023-05-15 will probably be used later. + #[allow(dead_code)] + id: UniqueIdent, + mail_path: String, + encryption_key: Key, + storage: Store, + uid_index: Bayou, +} + +impl MailboxInternal { + async fn force_sync(&mut self) -> Result<()> { + self.uid_index.sync().await?; + Ok(()) + } + + async fn opportunistic_sync(&mut self) -> Result<()> { + self.uid_index.opportunistic_sync().await?; + Ok(()) + } + + fn notifier(&self) -> std::sync::Weak { + self.uid_index.notifier() + } + + // ---- Functions for reading the mailbox ---- + + async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { + let ids = ids.iter().map(|x| x.to_string()).collect::>(); + let ops = ids + .iter() + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) + .collect::>(); + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; + + let mut meta_vec = vec![]; + for res in res_vec.into_iter() { + let mut meta_opt = None; + + // Resolve conflicts + for v in res.value.iter() { + match v { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { + let meta = open_deserialize::(v, &self.encryption_key)?; + match meta_opt.as_mut() { + None => { + meta_opt = Some(meta); + } + Some(prevmeta) => { + prevmeta.try_merge(meta)?; + } + } + } + } + } + if let Some(meta) = meta_opt { + meta_vec.push(meta); + } else { + bail!("No valid meta value in k2v for {:?}", res.row_ref); + } + } + + Ok(meta_vec) + } + + async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) + } + + // ---- Functions for changing the mailbox ---- + + async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { + let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec()); + self.uid_index.push(add_flag_op).await + } + + async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { + let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec()); + self.uid_index.push(del_flag_op).await + } + + async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> { + let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec()); + self.uid_index.push(set_flag_op).await + } + + async fn append( + &mut self, + mail: IMF<'_>, + ident: Option, + flags: &[Flag], + ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { + let ident = ident.unwrap_or_else(gen_ident); + let message_key = gen_key(); + + futures::try_join!( + async { + // Encrypt and save mail body + let message_blob = cryptoblob::seal(mail.raw, &message_key)?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.parsed.raw_headers.to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = seal_serialize(&meta, &self.encryption_key)?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; + Ok::<_, anyhow::Error>(()) + }, + self.uid_index.opportunistic_sync() + )?; + + // Add mail to Bayou mail index + let uid_state = self.uid_index.state(); + let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec()); + + let uidvalidity = uid_state.uidvalidity; + let (uid, modseq) = match add_mail_op { + UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq), + _ => unreachable!(), + }; + + self.uid_index.push(add_mail_op).await?; + + Ok((uidvalidity, uid, modseq)) + } + + async fn append_from_s3<'a>( + &mut self, + mail: IMF<'a>, + ident: UniqueIdent, + blob_src: storage::BlobRef, + message_key: Key, + ) -> Result<()> { + futures::try_join!( + async { + // Copy mail body from previous location + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.parsed.raw_headers.to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = seal_serialize(&meta, &self.encryption_key)?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; + Ok::<_, anyhow::Error>(()) + }, + self.uid_index.opportunistic_sync() + )?; + + // Add mail to Bayou mail index + let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]); + self.uid_index.push(add_mail_op).await?; + + Ok(()) + } + + async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { + if !self.uid_index.state().table.contains_key(&ident) { + bail!("Cannot delete mail that doesn't exit"); + } + + let del_mail_op = self.uid_index.state().op_mail_del(ident); + self.uid_index.push(del_mail_op).await?; + + futures::try_join!( + async { + // Delete mail body from S3 + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Delete mail meta from K2V + let sk = ident.to_string(); + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) + .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; + } + Ok::<_, anyhow::Error>(()) + } + )?; + Ok(()) + } + + async fn copy_from( + &mut self, + from: &MailboxInternal, + source_id: UniqueIdent, + ) -> Result { + let new_id = gen_ident(); + self.copy_internal(from, source_id, new_id).await?; + Ok(new_id) + } + + async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> { + self.copy_internal(from, id, id).await?; + from.delete(id).await?; + Ok(()) + } + + async fn copy_internal( + &mut self, + from: &MailboxInternal, + source_id: UniqueIdent, + new_id: UniqueIdent, + ) -> Result<()> { + if self.encryption_key != from.encryption_key { + bail!("Message to be copied/moved does not belong to same account."); + } + + let flags = from + .uid_index + .state() + .table + .get(&source_id) + .ok_or(anyhow!("Source mail not found"))? + .2 + .clone(); + + futures::try_join!( + async { + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Copy mail meta in K2V + let meta = &from.fetch_meta(&[source_id]).await?[0]; + let meta_blob = seal_serialize(meta, &self.encryption_key)?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) + .await?; + Ok::<_, anyhow::Error>(()) + }, + self.uid_index.opportunistic_sync(), + )?; + + // Add mail to Bayou mail index + let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags); + self.uid_index.push(add_mail_op).await?; + + Ok(()) + } +} + +// Can be useful to debug so we want this code +// to be available to developers +#[allow(dead_code)] +fn dump(uid_index: &Bayou) { + let s = uid_index.state(); + println!("---- MAILBOX STATE ----"); + println!("UIDVALIDITY {}", s.uidvalidity); + println!("UIDNEXT {}", s.uidnext); + println!("INTERNALSEQ {}", s.internalseq); + for (uid, ident) in s.idx_by_uid.iter() { + println!( + "{} {} {}", + uid, + hex::encode(ident.0), + s.table.get(ident).cloned().unwrap().2.join(", ") + ); + } + println!(); +} + +// ---- + +/// The metadata of a message that is stored in K2V +/// at pk = mail/, sk = +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MailMeta { + /// INTERNALDATE field (milliseconds since epoch) + pub internaldate: u64, + /// Headers of the message + pub headers: Vec, + /// Secret key for decrypting entire message + pub message_key: Key, + /// RFC822 size + pub rfc822_size: usize, +} + +impl MailMeta { + fn try_merge(&mut self, other: Self) -> Result<()> { + if self.headers != other.headers + || self.message_key != other.message_key + || self.rfc822_size != other.rfc822_size + { + bail!("Conflicting MailMeta values."); + } + self.internaldate = std::cmp::max(self.internaldate, other.internaldate); + Ok(()) + } +} diff --git a/aero-collections/src/mail/mod.rs b/aero-collections/src/mail/mod.rs new file mode 100644 index 0000000..85361f3 --- /dev/null +++ b/aero-collections/src/mail/mod.rs @@ -0,0 +1,25 @@ +pub mod incoming; +pub mod mailbox; +pub mod query; +pub mod snapshot; +pub mod uidindex; +pub mod unique_ident; +pub mod namespace; + +// Internet Message Format +// aka RFC 822 - RFC 2822 - RFC 5322 +// 2023-05-15 don't want to refactor this struct now. +#[allow(clippy::upper_case_acronyms)] +pub struct IMF<'a> { + raw: &'a [u8], + parsed: eml_codec::part::composite::Message<'a>, +} + +impl<'a> TryFrom<&'a [u8]> for IMF<'a> { + type Error = (); + + fn try_from(body: &'a [u8]) -> Result, ()> { + let parsed = eml_codec::parse_message(body).or(Err(()))?.1; + Ok(Self { raw: body, parsed }) + } +} diff --git a/aero-collections/src/mail/namespace.rs b/aero-collections/src/mail/namespace.rs new file mode 100644 index 0000000..452ac68 --- /dev/null +++ b/aero-collections/src/mail/namespace.rs @@ -0,0 +1,202 @@ +use std::collections::BTreeMap; + +use anyhow::{bail, Result}; +use serde::{Deserialize, Serialize}; + +use aero_bayou::timestamp::now_msec; + +use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::{gen_ident, UniqueIdent}; + +pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; + +/// INBOX is the only mailbox that must always exist. +/// It is created automatically when the account is created. +/// IMAP allows the user to rename INBOX to something else, +/// in this case all messages from INBOX are moved to a mailbox +/// with the new name and the INBOX mailbox still exists and is empty. +/// In our implementation, we indeed move the underlying mailbox +/// to the new name (i.e. the new name has the same id as the previous +/// INBOX), and we create a new empty mailbox for INBOX. +pub const INBOX: &str = "INBOX"; + +/// For convenience purpose, we also create some special mailbox +/// that are described in RFC6154 SPECIAL-USE +/// @FIXME maybe it should be a configuration parameter +/// @FIXME maybe we should have a per-mailbox flag mechanism, either an enum or a string, so we +/// track which mailbox is used for what. +/// @FIXME Junk could be useful but we don't have any antispam solution yet so... +/// @FIXME IMAP supports virtual mailbox. \All or \Flagged are intended to be virtual mailboxes. +/// \Trash might be one, or not one. I don't know what we should do there. +pub const DRAFTS: &str = "Drafts"; +pub const ARCHIVE: &str = "Archive"; +pub const SENT: &str = "Sent"; +pub const TRASH: &str = "Trash"; + +pub(crate) const MAILBOX_LIST_PK: &str = "mailboxes"; +pub(crate) const MAILBOX_LIST_SK: &str = "list"; + +// ---- User's mailbox list (serialized in K2V) ---- + +#[derive(Serialize, Deserialize)] +pub(crate) struct MailboxList(BTreeMap); + +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +pub(crate) struct MailboxListEntry { + id_lww: (u64, Option), + uidvalidity: ImapUidvalidity, +} + +impl MailboxListEntry { + fn merge(&mut self, other: &Self) { + // Simple CRDT merge rule + if other.id_lww.0 > self.id_lww.0 + || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1) + { + self.id_lww = other.id_lww; + } + self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity); + } +} + +impl MailboxList { + pub(crate) fn new() -> Self { + Self(BTreeMap::new()) + } + + pub(crate) fn merge(&mut self, list2: Self) { + for (k, v) in list2.0.into_iter() { + if let Some(e) = self.0.get_mut(&k) { + e.merge(&v); + } else { + self.0.insert(k, v); + } + } + } + + pub(crate) fn existing_mailbox_names(&self) -> Vec { + self.0 + .iter() + .filter(|(_, v)| v.id_lww.1.is_some()) + .map(|(k, _)| k.to_string()) + .collect() + } + + pub(crate) fn has_mailbox(&self, name: &str) -> bool { + matches!( + self.0.get(name), + Some(MailboxListEntry { + id_lww: (_, Some(_)), + .. + }) + ) + } + + pub(crate) fn get_mailbox(&self, name: &str) -> Option<(ImapUidvalidity, Option)> { + self.0.get(name).map( + |MailboxListEntry { + id_lww: (_, mailbox_id), + uidvalidity, + }| (*uidvalidity, *mailbox_id), + ) + } + + /// Ensures mailbox `name` maps to id `id`. + /// If it already mapped to that, returns None. + /// If a change had to be done, returns Some(new uidvalidity in mailbox). + pub(crate) fn set_mailbox(&mut self, name: &str, id: Option) -> Option { + let (ts, id, uidvalidity) = match self.0.get_mut(name) { + None => { + if id.is_none() { + return None; + } else { + (now_msec(), id, ImapUidvalidity::new(1).unwrap()) + } + } + Some(MailboxListEntry { + id_lww, + uidvalidity, + }) => { + if id_lww.1 == id { + return None; + } else { + ( + std::cmp::max(id_lww.0 + 1, now_msec()), + id, + ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(), + ) + } + } + }; + + self.0.insert( + name.into(), + MailboxListEntry { + id_lww: (ts, id), + uidvalidity, + }, + ); + Some(uidvalidity) + } + + pub(crate) fn update_uidvalidity(&mut self, name: &str, new_uidvalidity: ImapUidvalidity) { + match self.0.get_mut(name) { + None => { + self.0.insert( + name.into(), + MailboxListEntry { + id_lww: (now_msec(), None), + uidvalidity: new_uidvalidity, + }, + ); + } + Some(MailboxListEntry { uidvalidity, .. }) => { + *uidvalidity = std::cmp::max(*uidvalidity, new_uidvalidity); + } + } + } + + pub(crate) fn create_mailbox(&mut self, name: &str) -> CreatedMailbox { + if let Some(MailboxListEntry { + id_lww: (_, Some(id)), + uidvalidity, + }) = self.0.get(name) + { + return CreatedMailbox::Existed(*id, *uidvalidity); + } + + let id = gen_ident(); + let uidvalidity = self.set_mailbox(name, Some(id)).unwrap(); + CreatedMailbox::Created(id, uidvalidity) + } + + pub(crate) fn rename_mailbox(&mut self, old_name: &str, new_name: &str) -> Result<()> { + if let Some((uidvalidity, Some(mbid))) = self.get_mailbox(old_name) { + if self.has_mailbox(new_name) { + bail!( + "Cannot rename {} into {}: {} already exists", + old_name, + new_name, + new_name + ); + } + + self.set_mailbox(old_name, None); + self.set_mailbox(new_name, Some(mbid)); + self.update_uidvalidity(new_name, uidvalidity); + Ok(()) + } else { + bail!( + "Cannot rename {} into {}: {} doesn't exist", + old_name, + new_name, + old_name + ); + } + } +} + +pub(crate) enum CreatedMailbox { + Created(UniqueIdent, ImapUidvalidity), + Existed(UniqueIdent, ImapUidvalidity), +} diff --git a/aero-collections/src/mail/query.rs b/aero-collections/src/mail/query.rs new file mode 100644 index 0000000..3e6fe99 --- /dev/null +++ b/aero-collections/src/mail/query.rs @@ -0,0 +1,137 @@ +use super::mailbox::MailMeta; +use super::snapshot::FrozenMailbox; +use super::unique_ident::UniqueIdent; +use anyhow::Result; +use futures::future::FutureExt; +use futures::stream::{BoxStream, Stream, StreamExt}; + +/// Query is in charge of fetching efficiently +/// requested data for a list of emails +pub struct Query<'a, 'b> { + pub frozen: &'a FrozenMailbox, + pub emails: &'b [UniqueIdent], + pub scope: QueryScope, +} + +#[derive(Debug)] +pub enum QueryScope { + Index, + Partial, + Full, +} +impl QueryScope { + pub fn union(&self, other: &QueryScope) -> QueryScope { + match (self, other) { + (QueryScope::Full, _) | (_, QueryScope::Full) => QueryScope::Full, + (QueryScope::Partial, _) | (_, QueryScope::Partial) => QueryScope::Partial, + (QueryScope::Index, QueryScope::Index) => QueryScope::Index, + } + } +} + +//type QueryResultStream = Box>>; + +impl<'a, 'b> Query<'a, 'b> { + pub fn fetch(&self) -> BoxStream> { + match self.scope { + QueryScope::Index => Box::pin( + futures::stream::iter(self.emails) + .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), + ), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), + } + } + + // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { + async move { + let maybe_meta_list: Result> = + self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| { + meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + }) + .unwrap_or_else(|e| vec![Err(e)]); + + futures::stream::iter(list_res) + } + .flatten_stream() + } + + fn full<'d>(&'d self) -> impl Stream> + 'd + Send { + self.partial().then(move |maybe_meta| async move { + let meta = maybe_meta?; + + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) + } +} + +#[derive(Debug, Clone)] +pub enum QueryResult { + IndexResult { + uuid: UniqueIdent, + }, + PartialResult { + uuid: UniqueIdent, + metadata: MailMeta, + }, + FullResult { + uuid: UniqueIdent, + metadata: MailMeta, + content: Vec, + }, +} +impl QueryResult { + pub fn uuid(&self) -> &UniqueIdent { + match self { + Self::IndexResult { uuid, .. } => uuid, + Self::PartialResult { uuid, .. } => uuid, + Self::FullResult { uuid, .. } => uuid, + } + } + + pub fn metadata(&self) -> Option<&MailMeta> { + match self { + Self::IndexResult { .. } => None, + Self::PartialResult { metadata, .. } => Some(metadata), + Self::FullResult { metadata, .. } => Some(metadata), + } + } + + #[allow(dead_code)] + pub fn content(&self) -> Option<&[u8]> { + match self { + Self::FullResult { content, .. } => Some(content), + _ => None, + } + } + + fn into_full(self, content: Vec) -> Option { + match self { + Self::PartialResult { uuid, metadata } => Some(Self::FullResult { + uuid, + metadata, + content, + }), + _ => None, + } + } +} diff --git a/aero-collections/src/mail/snapshot.rs b/aero-collections/src/mail/snapshot.rs new file mode 100644 index 0000000..ed756b5 --- /dev/null +++ b/aero-collections/src/mail/snapshot.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use anyhow::Result; + +use super::mailbox::Mailbox; +use super::query::{Query, QueryScope}; +use super::uidindex::UidIndex; +use super::unique_ident::UniqueIdent; + +/// A Frozen Mailbox has a snapshot of the current mailbox +/// state that is desynchronized with the real mailbox state. +/// It's up to the user to choose when their snapshot must be updated +/// to give useful information to their clients +pub struct FrozenMailbox { + pub mailbox: Arc, + pub snapshot: UidIndex, +} + +impl FrozenMailbox { + /// Create a snapshot from a mailbox, the mailbox + the snapshot + /// becomes the "Frozen Mailbox". + pub async fn new(mailbox: Arc) -> Self { + let state = mailbox.current_uid_index().await; + + Self { + mailbox, + snapshot: state, + } + } + + /// Force the synchronization of the inner mailbox + /// but do not update the local snapshot + pub async fn sync(&self) -> Result<()> { + self.mailbox.opportunistic_sync().await + } + + /// Peek snapshot without updating the frozen mailbox + /// Can be useful if you want to plan some writes + /// while sending a diff to the client later + pub async fn peek(&self) -> UidIndex { + self.mailbox.current_uid_index().await + } + + /// Update the FrozenMailbox local snapshot. + /// Returns the old snapshot, so you can build a diff + pub async fn update(&mut self) -> UidIndex { + let old_snapshot = self.snapshot.clone(); + self.snapshot = self.mailbox.current_uid_index().await; + + old_snapshot + } + + pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { + Query { + frozen: self, + emails: uuids, + scope, + } + } +} diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs new file mode 100644 index 0000000..637a1ac --- /dev/null +++ b/aero-collections/src/mail/uidindex.rs @@ -0,0 +1,474 @@ +use std::num::{NonZeroU32, NonZeroU64}; + +use im::{HashMap, OrdMap, OrdSet}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use aero_bayou::*; +use crate::mail::unique_ident::UniqueIdent; + +pub type ModSeq = NonZeroU64; +pub type ImapUid = NonZeroU32; +pub type ImapUidvalidity = NonZeroU32; +pub type Flag = String; +pub type IndexEntry = (ImapUid, ModSeq, Vec); + +/// A UidIndex handles the mutable part of a mailbox +/// It is built by running the event log on it +/// Each applied log generates a new UidIndex by cloning the previous one +/// and applying the event. This is why we use immutable datastructures: +/// they are cheap to clone. +#[derive(Clone)] +pub struct UidIndex { + // Source of trust + pub table: OrdMap, + + // Indexes optimized for queries + pub idx_by_uid: OrdMap, + pub idx_by_modseq: OrdMap, + pub idx_by_flag: FlagIndex, + + // "Public" Counters + pub uidvalidity: ImapUidvalidity, + pub uidnext: ImapUid, + pub highestmodseq: ModSeq, + + // "Internal" Counters + pub internalseq: ImapUid, + pub internalmodseq: ModSeq, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum UidIndexOp { + MailAdd(UniqueIdent, ImapUid, ModSeq, Vec), + MailDel(UniqueIdent), + FlagAdd(UniqueIdent, ModSeq, Vec), + FlagDel(UniqueIdent, ModSeq, Vec), + FlagSet(UniqueIdent, ModSeq, Vec), + BumpUidvalidity(u32), +} + +impl UidIndex { + #[must_use] + pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_mail_del(&self, ident: UniqueIdent) -> UidIndexOp { + UidIndexOp::MailDel(ident) + } + + #[must_use] + pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagAdd(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagDel(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { + UidIndexOp::FlagSet(ident, self.internalmodseq, flags) + } + + #[must_use] + pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp { + UidIndexOp::BumpUidvalidity(count) + } + + // INTERNAL functions to keep state consistent + + fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) { + // Insert the email in our table + self.table.insert(ident, (uid, modseq, flags.to_owned())); + + // Update the indexes/caches + self.idx_by_uid.insert(uid, ident); + self.idx_by_flag.insert(uid, flags); + self.idx_by_modseq.insert(modseq, ident); + } + + fn unreg_email(&mut self, ident: &UniqueIdent) { + // We do nothing if the mail does not exist + let (uid, modseq, flags) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + + // Delete all cache entries + self.idx_by_uid.remove(uid); + self.idx_by_flag.remove(*uid, flags); + self.idx_by_modseq.remove(modseq); + + // Remove from source of trust + self.table.remove(ident); + } +} + +impl Default for UidIndex { + fn default() -> Self { + Self { + table: OrdMap::new(), + + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + + uidvalidity: NonZeroU32::new(1).unwrap(), + uidnext: NonZeroU32::new(1).unwrap(), + highestmodseq: NonZeroU64::new(1).unwrap(), + + internalseq: NonZeroU32::new(1).unwrap(), + internalmodseq: NonZeroU64::new(1).unwrap(), + } + } +} + +impl BayouState for UidIndex { + type Op = UidIndexOp; + + fn apply(&self, op: &UidIndexOp) -> Self { + let mut new = self.clone(); + match op { + UidIndexOp::MailAdd(ident, uid, modseq, flags) => { + // Change UIDValidity if there is a UID conflict or a MODSEQ conflict + // @FIXME Need to prove that summing work + // The intuition: we increase the UIDValidity by the number of possible conflicts + if *uid < new.internalseq || *modseq < new.internalmodseq { + let bump_uid = new.internalseq.get() - uid.get(); + let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap(); + } + + // Assign the real uid of the email + let new_uid = new.internalseq; + + // Assign the real modseq of the email and its new flags + let new_modseq = new.internalmodseq; + + // Delete the previous entry if any. + // Our proof has no assumption on `ident` uniqueness, + // so we must handle this case even it is very unlikely + // In this case, we overwrite the email. + // Note: assigning a new UID is mandatory. + new.unreg_email(ident); + + // We record our email and update ou caches + new.reg_email(*ident, new_uid, new_modseq, flags); + + // Update counters + new.highestmodseq = new.internalmodseq; + + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + + new.uidnext = new.internalseq; + } + UidIndexOp::MailDel(ident) => { + // If the email is known locally, we remove its references in all our indexes + new.unreg_email(ident); + + // We update the counter + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + } + UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Add flags to the source of trust and the cache + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + new.idx_by_flag.insert(*uid, &to_add); + *email_modseq = new.internalmodseq; + new.idx_by_modseq.insert(new.internalmodseq, *ident); + existing_flags.append(&mut to_add); + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Remove flags from the source of trust and the cache + existing_flags.retain(|x| !rm_flags.contains(x)); + new.idx_by_flag.remove(*uid, rm_flags); + + // Register that email has been modified + new.idx_by_modseq.insert(new.internalmodseq, *ident); + *email_modseq = new.internalmodseq; + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => { + if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *candidate_modseq < new.internalmodseq { + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); + } + + // Remove flags from the source of trust and the cache + let (keep_flags, rm_flags): (Vec, Vec) = existing_flags + .iter() + .cloned() + .partition(|x| new_flags.contains(x)); + *existing_flags = keep_flags; + let mut to_add: Vec = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + existing_flags.append(&mut to_add); + new.idx_by_flag.remove(*uid, &rm_flags); + new.idx_by_flag.insert(*uid, &to_add); + + // Register that email has been modified + new.idx_by_modseq.insert(new.internalmodseq, *ident); + *email_modseq = new.internalmodseq; + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap(); + } + } + UidIndexOp::BumpUidvalidity(count) => { + new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count) + .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); + } + } + new + } +} + +// ---- FlagIndex implementation ---- + +#[derive(Clone)] +pub struct FlagIndex(HashMap>); +pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>; + +impl FlagIndex { + fn new() -> Self { + Self(HashMap::new()) + } + fn insert(&mut self, uid: ImapUid, flags: &[Flag]) { + flags.iter().for_each(|flag| { + self.0 + .entry(flag.clone()) + .or_insert(OrdSet::new()) + .insert(uid); + }); + } + fn remove(&mut self, uid: ImapUid, flags: &[Flag]) { + for flag in flags.iter() { + if let Some(set) = self.0.get_mut(flag) { + set.remove(&uid); + if set.is_empty() { + self.0.remove(flag); + } + } + } + } + + pub fn get(&self, f: &Flag) -> Option<&OrdSet> { + self.0.get(f) + } + + pub fn flags(&self) -> FlagIter { + self.0.keys() + } +} + +// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- + +#[derive(Serialize, Deserialize)] +struct UidIndexSerializedRepr { + mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec)>, + + uidvalidity: ImapUidvalidity, + uidnext: ImapUid, + highestmodseq: ModSeq, + + internalseq: ImapUid, + internalmodseq: ModSeq, +} + +impl<'de> Deserialize<'de> for UidIndex { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; + + let mut uidindex = UidIndex { + table: OrdMap::new(), + + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), + idx_by_flag: FlagIndex::new(), + + uidvalidity: val.uidvalidity, + uidnext: val.uidnext, + highestmodseq: val.highestmodseq, + + internalseq: val.internalseq, + internalmodseq: val.internalmodseq, + }; + + val.mails + .iter() + .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags)); + + Ok(uidindex) + } +} + +impl Serialize for UidIndex { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut mails = vec![]; + for (ident, (uid, modseq, flags)) in self.table.iter() { + mails.push((*uid, *modseq, *ident, flags.clone())); + } + + let val = UidIndexSerializedRepr { + mails, + uidvalidity: self.uidvalidity, + uidnext: self.uidnext, + highestmodseq: self.highestmodseq, + internalseq: self.internalseq, + internalmodseq: self.internalmodseq, + }; + + val.serialize(serializer) + } +} + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uidindex() { + let mut state = UidIndex::default(); + + // Add message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + // Early checks + assert_eq!(state.table.len(), 1); + let (uid, modseq, flags) = state.table.get(&m).unwrap(); + assert_eq!(*uid, NonZeroU32::new(1).unwrap()); + assert_eq!(*modseq, NonZeroU64::new(1).unwrap()); + assert_eq!(flags.len(), 2); + let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); + assert_eq!(&m, ident); + let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); + assert_eq!(recent.len(), 1); + assert_eq!(recent.iter().next().unwrap(), &NonZeroU32::new(1).unwrap()); + assert_eq!(state.uidnext, NonZeroU32::new(2).unwrap()); + assert_eq!(state.uidvalidity, NonZeroU32::new(1).unwrap()); + } + + // Add message 2 + { + let m = UniqueIdent([0x02; 24]); + let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Add flags to message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["Important".to_string(), "$cl_1".to_string()]; + let ev = state.op_flag_add(m, f); + state = state.apply(&ev); + } + + // Delete flags from message 1 + { + let m = UniqueIdent([0x01; 24]); + let f = vec!["\\Recent".to_string()]; + let ev = state.op_flag_del(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Delete message 2 + { + let m = UniqueIdent([0x02; 24]); + let ev = state.op_mail_del(m); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 1); + } + + // Add a message 3 concurrent to message 1 (trigger a uid validity change) + { + let m = UniqueIdent([0x03; 24]); + let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; + let ev = UidIndexOp::MailAdd( + m, + NonZeroU32::new(1).unwrap(), + NonZeroU64::new(1).unwrap(), + f, + ); + state = state.apply(&ev); + } + + // Checks + { + assert_eq!(state.table.len(), 2); + assert!(state.uidvalidity > NonZeroU32::new(1).unwrap()); + + let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); + assert_eq!(ident, &UniqueIdent([0x03; 24])); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + let mut iter = archive.iter(); + assert_eq!(iter.next().unwrap(), &NonZeroU32::new(1).unwrap()); + assert_eq!(iter.next().unwrap(), last_uid); + } + } +} diff --git a/aero-collections/src/mail/unique_ident.rs b/aero-collections/src/mail/unique_ident.rs new file mode 100644 index 0000000..0987a2c --- /dev/null +++ b/aero-collections/src/mail/unique_ident.rs @@ -0,0 +1,101 @@ +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; + +use lazy_static::lazy_static; +use rand::prelude::*; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use aero_bayou::timestamp::now_msec; + +/// An internal Mail Identifier is composed of two components: +/// - a process identifier, 128 bits, itself composed of: +/// - the timestamp of when the process started, 64 bits +/// - a 64-bit random number +/// - a sequence number, 64 bits +/// They are not part of the protocol but an internal representation +/// required by Aerogramme. +/// Their main property is to be unique without having to rely +/// on synchronization between IMAP processes. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] +pub struct UniqueIdent(pub [u8; 24]); + +struct IdentGenerator { + pid: u128, + sn: AtomicU64, +} + +impl IdentGenerator { + fn new() -> Self { + let time = now_msec() as u128; + let rand = thread_rng().gen::() as u128; + Self { + pid: (time << 64) | rand, + sn: AtomicU64::new(0), + } + } + + fn gen(&self) -> UniqueIdent { + let sn = self.sn.fetch_add(1, Ordering::Relaxed); + let mut res = [0u8; 24]; + res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); + res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); + UniqueIdent(res) + } +} + +lazy_static! { + static ref GENERATOR: IdentGenerator = IdentGenerator::new(); +} + +pub fn gen_ident() -> UniqueIdent { + GENERATOR.gen() +} + +// -- serde -- + +impl<'de> Deserialize<'de> for UniqueIdent { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + UniqueIdent::from_str(&v).map_err(D::Error::custom) + } +} + +impl Serialize for UniqueIdent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl std::fmt::Display for UniqueIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +impl std::fmt::Debug for UniqueIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +impl FromStr for UniqueIdent { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + + if bytes.len() != 24 { + return Err("bad length"); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(UniqueIdent(tmp)) + } +} diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs new file mode 100644 index 0000000..193ce90 --- /dev/null +++ b/aero-collections/src/user.rs @@ -0,0 +1,311 @@ +use std::collections::HashMap; +use std::sync::{Arc, Weak}; + +use anyhow::{anyhow, bail, Result}; +use lazy_static::lazy_static; +use tokio::sync::watch; + +use aero_user::cryptoblob::{open_deserialize, seal_serialize}; +use aero_user::login::Credentials; +use aero_user::storage; + +use crate::mail::incoming::incoming_mail_watch_process; +use crate::mail::mailbox::Mailbox; +use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::UniqueIdent; +use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; + +//@FIXME User should be totally rewriten +//to extract the local mailbox list +//to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace) + +pub struct User { + pub username: String, + pub creds: Credentials, + pub storage: storage::Store, + pub mailboxes: std::sync::Mutex>>, + + tx_inbox_id: watch::Sender>, +} + +impl User { + pub async fn new(username: String, creds: Credentials) -> Result> { + let cache_key = (username.clone(), creds.storage.unique()); + + { + let cache = USER_CACHE.lock().unwrap(); + if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) { + return Ok(u); + } + } + + let user = Self::open(username, creds).await?; + + let mut cache = USER_CACHE.lock().unwrap(); + if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) { + drop(user); + Ok(concurrent_user) + } else { + cache.insert(cache_key, Arc::downgrade(&user)); + Ok(user) + } + } + + /// Lists user's available mailboxes + pub async fn list_mailboxes(&self) -> Result> { + let (list, _ct) = self.load_mailbox_list().await?; + Ok(list.existing_mailbox_names()) + } + + /// Opens an existing mailbox given its IMAP name. + pub async fn open_mailbox(&self, name: &str) -> Result>> { + let (mut list, ct) = self.load_mailbox_list().await?; + + //@FIXME it could be a trace or an opentelemtry trace thing. + // Be careful to not leak sensible data + /* + eprintln!("List of mailboxes:"); + for ent in list.0.iter() { + eprintln!(" - {:?}", ent); + } + */ + + if let Some((uidvalidity, Some(mbid))) = list.get_mailbox(name) { + let mb = self.open_mailbox_by_id(mbid, uidvalidity).await?; + let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; + if mb_uidvalidity > uidvalidity { + list.update_uidvalidity(name, mb_uidvalidity); + self.save_mailbox_list(&list, ct).await?; + } + Ok(Some(mb)) + } else { + Ok(None) + } + } + + /// Check whether mailbox exists + pub async fn has_mailbox(&self, name: &str) -> Result { + let (list, _ct) = self.load_mailbox_list().await?; + Ok(list.has_mailbox(name)) + } + + /// Creates a new mailbox in the user's IMAP namespace. + pub async fn create_mailbox(&self, name: &str) -> Result<()> { + if name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { + bail!("Invalid mailbox name: {}", name); + } + + let (mut list, ct) = self.load_mailbox_list().await?; + match list.create_mailbox(name) { + CreatedMailbox::Created(_, _) => { + self.save_mailbox_list(&list, ct).await?; + Ok(()) + } + CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), + } + } + + /// Deletes a mailbox in the user's IMAP namespace. + pub async fn delete_mailbox(&self, name: &str) -> Result<()> { + if name == INBOX { + bail!("Cannot delete INBOX"); + } + + let (mut list, ct) = self.load_mailbox_list().await?; + if list.has_mailbox(name) { + //@TODO: actually delete mailbox contents + list.set_mailbox(name, None); + self.save_mailbox_list(&list, ct).await?; + Ok(()) + } else { + bail!("Mailbox {} does not exist", name); + } + } + + /// Renames a mailbox in the user's IMAP namespace. + pub async fn rename_mailbox(&self, old_name: &str, new_name: &str) -> Result<()> { + let (mut list, ct) = self.load_mailbox_list().await?; + + if old_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { + bail!("Invalid mailbox name: {}", old_name); + } + if new_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { + bail!("Invalid mailbox name: {}", new_name); + } + + if old_name == INBOX { + list.rename_mailbox(old_name, new_name)?; + if !self.ensure_inbox_exists(&mut list, &ct).await? { + self.save_mailbox_list(&list, ct).await?; + } + } else { + let names = list.existing_mailbox_names(); + + let old_name_w_delim = format!("{}{}", old_name, MAILBOX_HIERARCHY_DELIMITER); + let new_name_w_delim = format!("{}{}", new_name, MAILBOX_HIERARCHY_DELIMITER); + + if names + .iter() + .any(|x| x == new_name || x.starts_with(&new_name_w_delim)) + { + bail!("Mailbox {} already exists", new_name); + } + + for name in names.iter() { + if name == old_name { + list.rename_mailbox(name, new_name)?; + } else if let Some(tail) = name.strip_prefix(&old_name_w_delim) { + let nnew = format!("{}{}", new_name_w_delim, tail); + list.rename_mailbox(name, &nnew)?; + } + } + + self.save_mailbox_list(&list, ct).await?; + } + Ok(()) + } + + // ---- Internal user & mailbox management ---- + + async fn open(username: String, creds: Credentials) -> Result> { + let storage = creds.storage.build().await?; + + let (tx_inbox_id, rx_inbox_id) = watch::channel(None); + + let user = Arc::new(Self { + username, + creds: creds.clone(), + storage, + tx_inbox_id, + mailboxes: std::sync::Mutex::new(HashMap::new()), + }); + + // Ensure INBOX exists (done inside load_mailbox_list) + user.load_mailbox_list().await?; + + tokio::spawn(incoming_mail_watch_process( + Arc::downgrade(&user), + user.creds.clone(), + rx_inbox_id, + )); + + Ok(user) + } + + pub(super) async fn open_mailbox_by_id( + &self, + id: UniqueIdent, + min_uidvalidity: ImapUidvalidity, + ) -> Result> { + { + let cache = self.mailboxes.lock().unwrap(); + if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) { + return Ok(mb); + } + } + + let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?); + + let mut cache = self.mailboxes.lock().unwrap(); + if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) { + drop(mb); // we worked for nothing but at least we didn't starve someone else + Ok(concurrent_mb) + } else { + cache.insert(id, Arc::downgrade(&mb)); + Ok(mb) + } + } + + // ---- Mailbox list management ---- + + async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { + Err(storage::StorageError::NotFound) => (MailboxList::new(), None), + Err(e) => return Err(e.into()), + Ok(rv) => { + let mut list = MailboxList::new(); + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { + if let storage::Alternative::Value(vbytes) = v { + let list2 = + open_deserialize::(&vbytes, &self.creds.keys.master)?; + list.merge(list2); + } + } + (list, Some(row_ref)) + } + }; + + let is_default_mbx_missing = [DRAFTS, ARCHIVE, SENT, TRASH] + .iter() + .map(|mbx| list.create_mailbox(mbx)) + .fold(false, |acc, r| { + acc || matches!(r, CreatedMailbox::Created(..)) + }); + let is_inbox_missing = self.ensure_inbox_exists(&mut list, &row).await?; + if is_default_mbx_missing && !is_inbox_missing { + // It's the only case where we created some mailboxes and not saved them + // So we save them! + self.save_mailbox_list(&list, row.clone()).await?; + } + + Ok((list, row)) + } + + async fn ensure_inbox_exists( + &self, + list: &mut MailboxList, + ct: &Option, + ) -> Result { + // If INBOX doesn't exist, create a new mailbox with that name + // and save new mailbox list. + // Also, ensure that the mpsc::watch that keeps track of the + // inbox id is up-to-date. + let saved; + let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { + CreatedMailbox::Created(i, v) => { + self.save_mailbox_list(list, ct.clone()).await?; + saved = true; + (i, v) + } + CreatedMailbox::Existed(i, v) => { + saved = false; + (i, v) + } + }; + let inbox_id = Some((inbox_id, inbox_uidvalidity)); + if *self.tx_inbox_id.borrow() != inbox_id { + self.tx_inbox_id.send(inbox_id).unwrap(); + } + + Ok(saved) + } + + async fn save_mailbox_list( + &self, + list: &MailboxList, + ct: Option, + ) -> Result<()> { + let list_blob = seal_serialize(list, &self.creds.keys.master)?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; + Ok(()) + } +} + +// ---- User cache ---- + +lazy_static! { + static ref USER_CACHE: std::sync::Mutex>> = + std::sync::Mutex::new(HashMap::new()); +} diff --git a/aero-collections/user.rs b/aero-collections/user.rs deleted file mode 100644 index a38b9c1..0000000 --- a/aero-collections/user.rs +++ /dev/null @@ -1,313 +0,0 @@ -use std::collections::{BTreeMap, HashMap}; -use std::sync::{Arc, Weak}; - -use anyhow::{anyhow, bail, Result}; -use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; -use tokio::sync::watch; - -use crate::cryptoblob::{open_deserialize, seal_serialize}; -use crate::login::Credentials; -use crate::mail::incoming::incoming_mail_watch_process; -use crate::mail::mailbox::Mailbox; -use crate::mail::uidindex::ImapUidvalidity; -use crate::mail::unique_ident::{gen_ident, UniqueIdent}; -use crate::storage; -use crate::timestamp::now_msec; - -use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; - -//@FIXME User should be totally rewriten -//to extract the local mailbox list -//to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace) - -pub struct User { - pub username: String, - pub creds: Credentials, - pub storage: storage::Store, - pub mailboxes: std::sync::Mutex>>, - - tx_inbox_id: watch::Sender>, -} - -impl User { - pub async fn new(username: String, creds: Credentials) -> Result> { - let cache_key = (username.clone(), creds.storage.unique()); - - { - let cache = USER_CACHE.lock().unwrap(); - if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) { - return Ok(u); - } - } - - let user = Self::open(username, creds).await?; - - let mut cache = USER_CACHE.lock().unwrap(); - if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) { - drop(user); - Ok(concurrent_user) - } else { - cache.insert(cache_key, Arc::downgrade(&user)); - Ok(user) - } - } - - /// Lists user's available mailboxes - pub async fn list_mailboxes(&self) -> Result> { - let (list, _ct) = self.load_mailbox_list().await?; - Ok(list.existing_mailbox_names()) - } - - /// Opens an existing mailbox given its IMAP name. - pub async fn open_mailbox(&self, name: &str) -> Result>> { - let (mut list, ct) = self.load_mailbox_list().await?; - - //@FIXME it could be a trace or an opentelemtry trace thing. - // Be careful to not leak sensible data - /* - eprintln!("List of mailboxes:"); - for ent in list.0.iter() { - eprintln!(" - {:?}", ent); - } - */ - - if let Some((uidvalidity, Some(mbid))) = list.get_mailbox(name) { - let mb = self.open_mailbox_by_id(mbid, uidvalidity).await?; - let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; - if mb_uidvalidity > uidvalidity { - list.update_uidvalidity(name, mb_uidvalidity); - self.save_mailbox_list(&list, ct).await?; - } - Ok(Some(mb)) - } else { - Ok(None) - } - } - - /// Check whether mailbox exists - pub async fn has_mailbox(&self, name: &str) -> Result { - let (list, _ct) = self.load_mailbox_list().await?; - Ok(list.has_mailbox(name)) - } - - /// Creates a new mailbox in the user's IMAP namespace. - pub async fn create_mailbox(&self, name: &str) -> Result<()> { - if name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { - bail!("Invalid mailbox name: {}", name); - } - - let (mut list, ct) = self.load_mailbox_list().await?; - match list.create_mailbox(name) { - CreatedMailbox::Created(_, _) => { - self.save_mailbox_list(&list, ct).await?; - Ok(()) - } - CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), - } - } - - /// Deletes a mailbox in the user's IMAP namespace. - pub async fn delete_mailbox(&self, name: &str) -> Result<()> { - if name == INBOX { - bail!("Cannot delete INBOX"); - } - - let (mut list, ct) = self.load_mailbox_list().await?; - if list.has_mailbox(name) { - //@TODO: actually delete mailbox contents - list.set_mailbox(name, None); - self.save_mailbox_list(&list, ct).await?; - Ok(()) - } else { - bail!("Mailbox {} does not exist", name); - } - } - - /// Renames a mailbox in the user's IMAP namespace. - pub async fn rename_mailbox(&self, old_name: &str, new_name: &str) -> Result<()> { - let (mut list, ct) = self.load_mailbox_list().await?; - - if old_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { - bail!("Invalid mailbox name: {}", old_name); - } - if new_name.ends_with(MAILBOX_HIERARCHY_DELIMITER) { - bail!("Invalid mailbox name: {}", new_name); - } - - if old_name == INBOX { - list.rename_mailbox(old_name, new_name)?; - if !self.ensure_inbox_exists(&mut list, &ct).await? { - self.save_mailbox_list(&list, ct).await?; - } - } else { - let names = list.existing_mailbox_names(); - - let old_name_w_delim = format!("{}{}", old_name, MAILBOX_HIERARCHY_DELIMITER); - let new_name_w_delim = format!("{}{}", new_name, MAILBOX_HIERARCHY_DELIMITER); - - if names - .iter() - .any(|x| x == new_name || x.starts_with(&new_name_w_delim)) - { - bail!("Mailbox {} already exists", new_name); - } - - for name in names.iter() { - if name == old_name { - list.rename_mailbox(name, new_name)?; - } else if let Some(tail) = name.strip_prefix(&old_name_w_delim) { - let nnew = format!("{}{}", new_name_w_delim, tail); - list.rename_mailbox(name, &nnew)?; - } - } - - self.save_mailbox_list(&list, ct).await?; - } - Ok(()) - } - - // ---- Internal user & mailbox management ---- - - async fn open(username: String, creds: Credentials) -> Result> { - let storage = creds.storage.build().await?; - - let (tx_inbox_id, rx_inbox_id) = watch::channel(None); - - let user = Arc::new(Self { - username, - creds: creds.clone(), - storage, - tx_inbox_id, - mailboxes: std::sync::Mutex::new(HashMap::new()), - }); - - // Ensure INBOX exists (done inside load_mailbox_list) - user.load_mailbox_list().await?; - - tokio::spawn(incoming_mail_watch_process( - Arc::downgrade(&user), - user.creds.clone(), - rx_inbox_id, - )); - - Ok(user) - } - - pub(super) async fn open_mailbox_by_id( - &self, - id: UniqueIdent, - min_uidvalidity: ImapUidvalidity, - ) -> Result> { - { - let cache = self.mailboxes.lock().unwrap(); - if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) { - return Ok(mb); - } - } - - let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?); - - let mut cache = self.mailboxes.lock().unwrap(); - if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) { - drop(mb); // we worked for nothing but at least we didn't starve someone else - Ok(concurrent_mb) - } else { - cache.insert(id, Arc::downgrade(&mb)); - Ok(mb) - } - } - - // ---- Mailbox list management ---- - - async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { - let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); - let (mut list, row) = match self - .storage - .row_fetch(&storage::Selector::Single(&row_ref)) - .await - { - Err(storage::StorageError::NotFound) => (MailboxList::new(), None), - Err(e) => return Err(e.into()), - Ok(rv) => { - let mut list = MailboxList::new(); - let (row_ref, row_vals) = match rv.into_iter().next() { - Some(row_val) => (row_val.row_ref, row_val.value), - None => (row_ref, vec![]), - }; - - for v in row_vals { - if let storage::Alternative::Value(vbytes) = v { - let list2 = - open_deserialize::(&vbytes, &self.creds.keys.master)?; - list.merge(list2); - } - } - (list, Some(row_ref)) - } - }; - - let is_default_mbx_missing = [DRAFTS, ARCHIVE, SENT, TRASH] - .iter() - .map(|mbx| list.create_mailbox(mbx)) - .fold(false, |acc, r| { - acc || matches!(r, CreatedMailbox::Created(..)) - }); - let is_inbox_missing = self.ensure_inbox_exists(&mut list, &row).await?; - if is_default_mbx_missing && !is_inbox_missing { - // It's the only case where we created some mailboxes and not saved them - // So we save them! - self.save_mailbox_list(&list, row.clone()).await?; - } - - Ok((list, row)) - } - - async fn ensure_inbox_exists( - &self, - list: &mut MailboxList, - ct: &Option, - ) -> Result { - // If INBOX doesn't exist, create a new mailbox with that name - // and save new mailbox list. - // Also, ensure that the mpsc::watch that keeps track of the - // inbox id is up-to-date. - let saved; - let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { - CreatedMailbox::Created(i, v) => { - self.save_mailbox_list(list, ct.clone()).await?; - saved = true; - (i, v) - } - CreatedMailbox::Existed(i, v) => { - saved = false; - (i, v) - } - }; - let inbox_id = Some((inbox_id, inbox_uidvalidity)); - if *self.tx_inbox_id.borrow() != inbox_id { - self.tx_inbox_id.send(inbox_id).unwrap(); - } - - Ok(saved) - } - - async fn save_mailbox_list( - &self, - list: &MailboxList, - ct: Option, - ) -> Result<()> { - let list_blob = seal_serialize(list, &self.creds.keys.master)?; - let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); - let row_val = storage::RowVal::new(rref, list_blob); - self.storage.row_insert(vec![row_val]).await?; - Ok(()) - } -} - -// ---- User cache ---- - -lazy_static! { - static ref USER_CACHE: std::sync::Mutex>> = - std::sync::Mutex::new(HashMap::new()); -} -- cgit v1.2.3 From ed47855ef1a6c9d10d48080367ff8b280530e362 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 20 Mar 2024 17:31:54 +0100 Subject: Share UniqueIdent between collections --- aero-collections/src/calendar/mod.rs | 6 +- aero-collections/src/calendar/namespace.rs | 47 ++++++++++++++ aero-collections/src/lib.rs | 1 + aero-collections/src/mail/incoming.rs | 2 +- aero-collections/src/mail/mailbox.rs | 2 +- aero-collections/src/mail/mod.rs | 1 - aero-collections/src/mail/namespace.rs | 2 +- aero-collections/src/mail/query.rs | 2 +- aero-collections/src/mail/snapshot.rs | 2 +- aero-collections/src/mail/uidindex.rs | 2 +- aero-collections/src/mail/unique_ident.rs | 101 ----------------------------- aero-collections/src/unique_ident.rs | 101 +++++++++++++++++++++++++++++ aero-collections/src/user.rs | 19 +++++- 13 files changed, 176 insertions(+), 112 deletions(-) create mode 100644 aero-collections/src/calendar/namespace.rs delete mode 100644 aero-collections/src/mail/unique_ident.rs create mode 100644 aero-collections/src/unique_ident.rs (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 19e3340..708e1f1 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -1 +1,5 @@ -//@FIXME Event Index +pub mod namespace; + +pub struct Calendar { + a: u64, +} diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs new file mode 100644 index 0000000..cf8a159 --- /dev/null +++ b/aero-collections/src/calendar/namespace.rs @@ -0,0 +1,47 @@ +use anyhow::Result; +use std::collections::{HashMap, BTreeMap}; +use std::sync::{Weak, Arc}; + +use serde::{Deserialize, Serialize}; + +use aero_user::storage; + +use crate::unique_ident::UniqueIdent; +use crate::user::User; +use super::Calendar; + +pub(crate) const CAL_LIST_PK: &str = "calendars"; +pub(crate) const CAL_LIST_SK: &str = "list"; + +pub(crate) struct CalendarNs(std::sync::Mutex>>); +impl CalendarNs { + pub fn new() -> Self { + Self(std::sync::Mutex::new(HashMap::new())) + } + + pub fn list(&self) { + todo!(); + } +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct CalendarList(BTreeMap); + +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +pub(crate) struct CalendarListEntry { + id_lww: (u64, Option), +} + +impl CalendarList { + pub(crate) async fn load(user: &Arc) -> Result<(Self, Option)> { + todo!(); + } + + pub(crate) async fn save(user: &Arc, ct: Option) -> Result<()> { + todo!(); + } + + pub(crate) fn new() -> Self { + Self(BTreeMap::new()) + } +} diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs index adcfc93..269cd13 100644 --- a/aero-collections/src/lib.rs +++ b/aero-collections/src/lib.rs @@ -1,3 +1,4 @@ +pub mod unique_ident; pub mod user; pub mod mail; pub mod calendar; diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs index 8220461..cd2f8fd 100644 --- a/aero-collections/src/mail/incoming.rs +++ b/aero-collections/src/mail/incoming.rs @@ -15,7 +15,7 @@ use aero_bayou::timestamp::now_msec; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; -use crate::mail::unique_ident::*; +use crate::unique_ident::*; use crate::user::User; use crate::mail::IMF; diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs index a767678..25aacf5 100644 --- a/aero-collections/src/mail/mailbox.rs +++ b/aero-collections/src/mail/mailbox.rs @@ -9,7 +9,7 @@ use aero_bayou::Bayou; use aero_bayou::timestamp::now_msec; use crate::mail::uidindex::*; -use crate::mail::unique_ident::*; +use crate::unique_ident::*; use crate::mail::IMF; pub struct Mailbox { diff --git a/aero-collections/src/mail/mod.rs b/aero-collections/src/mail/mod.rs index 85361f3..ca9b08b 100644 --- a/aero-collections/src/mail/mod.rs +++ b/aero-collections/src/mail/mod.rs @@ -3,7 +3,6 @@ pub mod mailbox; pub mod query; pub mod snapshot; pub mod uidindex; -pub mod unique_ident; pub mod namespace; // Internet Message Format diff --git a/aero-collections/src/mail/namespace.rs b/aero-collections/src/mail/namespace.rs index 452ac68..b1f6a70 100644 --- a/aero-collections/src/mail/namespace.rs +++ b/aero-collections/src/mail/namespace.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use aero_bayou::timestamp::now_msec; use crate::mail::uidindex::ImapUidvalidity; -use crate::mail::unique_ident::{gen_ident, UniqueIdent}; +use crate::unique_ident::{gen_ident, UniqueIdent}; pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; diff --git a/aero-collections/src/mail/query.rs b/aero-collections/src/mail/query.rs index 3e6fe99..7faba41 100644 --- a/aero-collections/src/mail/query.rs +++ b/aero-collections/src/mail/query.rs @@ -1,6 +1,6 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; -use super::unique_ident::UniqueIdent; +use crate::unique_ident::UniqueIdent; use anyhow::Result; use futures::future::FutureExt; use futures::stream::{BoxStream, Stream, StreamExt}; diff --git a/aero-collections/src/mail/snapshot.rs b/aero-collections/src/mail/snapshot.rs index ed756b5..9503d4d 100644 --- a/aero-collections/src/mail/snapshot.rs +++ b/aero-collections/src/mail/snapshot.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use anyhow::Result; +use crate::unique_ident::UniqueIdent; use super::mailbox::Mailbox; use super::query::{Query, QueryScope}; use super::uidindex::UidIndex; -use super::unique_ident::UniqueIdent; /// A Frozen Mailbox has a snapshot of the current mailbox /// state that is desynchronized with the real mailbox state. diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs index 637a1ac..ca975a3 100644 --- a/aero-collections/src/mail/uidindex.rs +++ b/aero-collections/src/mail/uidindex.rs @@ -4,7 +4,7 @@ use im::{HashMap, OrdMap, OrdSet}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use aero_bayou::*; -use crate::mail::unique_ident::UniqueIdent; +use crate::unique_ident::UniqueIdent; pub type ModSeq = NonZeroU64; pub type ImapUid = NonZeroU32; diff --git a/aero-collections/src/mail/unique_ident.rs b/aero-collections/src/mail/unique_ident.rs deleted file mode 100644 index 0987a2c..0000000 --- a/aero-collections/src/mail/unique_ident.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; - -use lazy_static::lazy_static; -use rand::prelude::*; -use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; - -use aero_bayou::timestamp::now_msec; - -/// An internal Mail Identifier is composed of two components: -/// - a process identifier, 128 bits, itself composed of: -/// - the timestamp of when the process started, 64 bits -/// - a 64-bit random number -/// - a sequence number, 64 bits -/// They are not part of the protocol but an internal representation -/// required by Aerogramme. -/// Their main property is to be unique without having to rely -/// on synchronization between IMAP processes. -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct UniqueIdent(pub [u8; 24]); - -struct IdentGenerator { - pid: u128, - sn: AtomicU64, -} - -impl IdentGenerator { - fn new() -> Self { - let time = now_msec() as u128; - let rand = thread_rng().gen::() as u128; - Self { - pid: (time << 64) | rand, - sn: AtomicU64::new(0), - } - } - - fn gen(&self) -> UniqueIdent { - let sn = self.sn.fetch_add(1, Ordering::Relaxed); - let mut res = [0u8; 24]; - res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); - res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); - UniqueIdent(res) - } -} - -lazy_static! { - static ref GENERATOR: IdentGenerator = IdentGenerator::new(); -} - -pub fn gen_ident() -> UniqueIdent { - GENERATOR.gen() -} - -// -- serde -- - -impl<'de> Deserialize<'de> for UniqueIdent { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let v = String::deserialize(d)?; - UniqueIdent::from_str(&v).map_err(D::Error::custom) - } -} - -impl Serialize for UniqueIdent { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&self.to_string()) - } -} - -impl std::fmt::Display for UniqueIdent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex::encode(self.0)) - } -} - -impl std::fmt::Debug for UniqueIdent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex::encode(self.0)) - } -} - -impl FromStr for UniqueIdent { - type Err = &'static str; - - fn from_str(s: &str) -> Result { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - - if bytes.len() != 24 { - return Err("bad length"); - } - - let mut tmp = [0u8; 24]; - tmp[..].copy_from_slice(&bytes); - Ok(UniqueIdent(tmp)) - } -} diff --git a/aero-collections/src/unique_ident.rs b/aero-collections/src/unique_ident.rs new file mode 100644 index 0000000..e4eea7a --- /dev/null +++ b/aero-collections/src/unique_ident.rs @@ -0,0 +1,101 @@ +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; + +use lazy_static::lazy_static; +use rand::prelude::*; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use aero_bayou::timestamp::now_msec; + +/// An internal Aerogramme identifier is composed of two components: +/// - a process identifier, 128 bits, itself composed of: +/// - the timestamp of when the process started, 64 bits +/// - a 64-bit random number +/// - a sequence number, 64 bits +/// They are not part of the protocol but an internal representation +/// required by Aerogramme. +/// Their main property is to be unique without having to rely +/// on synchronization between (IMAP) processes. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] +pub struct UniqueIdent(pub [u8; 24]); + +struct IdentGenerator { + pid: u128, + sn: AtomicU64, +} + +impl IdentGenerator { + fn new() -> Self { + let time = now_msec() as u128; + let rand = thread_rng().gen::() as u128; + Self { + pid: (time << 64) | rand, + sn: AtomicU64::new(0), + } + } + + fn gen(&self) -> UniqueIdent { + let sn = self.sn.fetch_add(1, Ordering::Relaxed); + let mut res = [0u8; 24]; + res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); + res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); + UniqueIdent(res) + } +} + +lazy_static! { + static ref GENERATOR: IdentGenerator = IdentGenerator::new(); +} + +pub fn gen_ident() -> UniqueIdent { + GENERATOR.gen() +} + +// -- serde -- + +impl<'de> Deserialize<'de> for UniqueIdent { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + UniqueIdent::from_str(&v).map_err(D::Error::custom) + } +} + +impl Serialize for UniqueIdent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl std::fmt::Display for UniqueIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +impl std::fmt::Debug for UniqueIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +impl FromStr for UniqueIdent { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + + if bytes.len() != 24 { + return Err("bad length"); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(UniqueIdent(tmp)) + } +} diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs index 193ce90..0c6b931 100644 --- a/aero-collections/src/user.rs +++ b/aero-collections/src/user.rs @@ -12,19 +12,27 @@ use aero_user::storage; use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; -use crate::mail::unique_ident::UniqueIdent; +use crate::unique_ident::UniqueIdent; use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; +use crate::calendar::Calendar; //@FIXME User should be totally rewriten -//to extract the local mailbox list -//to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace) +// to extract the local mailbox list +// to the mail/namespace.rs file (and mailbox list should be reworded as mail namespace) + +//@FIXME User should be run in a LocalSet +// to remove most - if not all - synchronizations types. +// Especially RwLock & co. pub struct User { pub username: String, pub creds: Credentials, pub storage: storage::Store, pub mailboxes: std::sync::Mutex>>, + pub calendars: std::sync::Mutex>>, + // Handle on worker processing received email + // (moving emails from the mailqueue to the user's INBOX) tx_inbox_id: watch::Sender>, } @@ -178,6 +186,7 @@ impl User { storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), + calendars: std::sync::Mutex::new(HashMap::new()), }); // Ensure INBOX exists (done inside load_mailbox_list) @@ -204,6 +213,10 @@ impl User { } } + // The idea here is that: + // 1. Opening a mailbox that is not already opened takes a significant amount of time + // 2. We don't want to lock the whole HashMap that contain the mailboxes during this + // operation which is why we droppped the lock above but take it again below. let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?); let mut cache = self.mailboxes.lock().unwrap(); -- cgit v1.2.3 From bc0f897803cbb9b7537010e9d4714a2a0b2a6872 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 26 Mar 2024 15:08:04 +0100 Subject: Calendar Namespace --- aero-collections/src/calendar/mod.rs | 15 ++ aero-collections/src/calendar/namespace.rs | 302 +++++++++++++++++++++++++++-- aero-collections/src/mail/mailbox.rs | 2 +- aero-collections/src/user.rs | 6 +- 4 files changed, 310 insertions(+), 15 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 708e1f1..d2217b8 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -1,5 +1,20 @@ pub mod namespace; +use anyhow::Result; + +use aero_user::login::Credentials; + +use crate::unique_ident::*; + pub struct Calendar { a: u64, } + +impl Calendar { + pub(crate) async fn open( + creds: &Credentials, + id: UniqueIdent, + ) -> Result { + todo!(); + } +} diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs index cf8a159..2fbc364 100644 --- a/aero-collections/src/calendar/namespace.rs +++ b/aero-collections/src/calendar/namespace.rs @@ -1,47 +1,327 @@ -use anyhow::Result; +use anyhow::{bail, Result}; use std::collections::{HashMap, BTreeMap}; use std::sync::{Weak, Arc}; use serde::{Deserialize, Serialize}; +use aero_bayou::timestamp::now_msec; use aero_user::storage; +use aero_user::cryptoblob::{open_deserialize, seal_serialize}; -use crate::unique_ident::UniqueIdent; +use crate::unique_ident::{gen_ident, UniqueIdent}; use crate::user::User; use super::Calendar; pub(crate) const CAL_LIST_PK: &str = "calendars"; pub(crate) const CAL_LIST_SK: &str = "list"; +pub(crate) const MAIN_CAL: &str = "Personal"; +pub(crate) const MAX_CALNAME_CHARS: usize = 32; pub(crate) struct CalendarNs(std::sync::Mutex>>); + impl CalendarNs { + /// Create a new calendar namespace pub fn new() -> Self { Self(std::sync::Mutex::new(HashMap::new())) } - pub fn list(&self) { - todo!(); + /// Open a calendar by name + pub async fn open(&self, user: &Arc, name: &str) -> Result>> { + let (list, _ct) = CalendarList::load(user).await?; + + match list.get(name) { + None => Ok(None), + Some(ident) => Ok(Some(self.open_by_id(user, ident).await?)), + } + } + + /// Open a calendar by unique id + /// Check user.rs::open_mailbox_by_id to understand this function + pub async fn open_by_id(&self, user: &Arc, id: UniqueIdent) -> Result> { + { + let cache = self.0.lock().unwrap(); + if let Some(cal) = cache.get(&id).and_then(Weak::upgrade) { + return Ok(cal); + } + } + + let cal = Arc::new(Calendar::open(&user.creds, id).await?); + + let mut cache = self.0.lock().unwrap(); + if let Some(concurrent_cal) = cache.get(&id).and_then(Weak::upgrade) { + drop(cal); // we worked for nothing but at least we didn't starve someone else + Ok(concurrent_cal) + } else { + cache.insert(id, Arc::downgrade(&cal)); + Ok(cal) + } + } + + /// List calendars + pub async fn list(&self, user: &Arc) -> Result> { + CalendarList::load(user).await.map(|(list, _)| list.names()) + } + + /// Delete a calendar from the index + pub async fn delete(&self, user: &Arc, name: &str) -> Result<()> { + // We currently assume that main cal is a bit specific + if name == MAIN_CAL { + bail!("Cannot delete main calendar"); + } + + let (mut list, ct) = CalendarList::load(user).await?; + if list.has(name) { + //@TODO: actually delete calendar content + list.bind(name, None); + list.save(user, ct).await?; + Ok(()) + } else { + bail!("Calendar {} does not exist", name); + } + } + + /// Rename a calendar in the index + pub async fn rename(&self, user: &Arc, old: &str, new: &str) -> Result<()> { + if old == MAIN_CAL { + bail!("Renaming main calendar is not supported currently"); + } + if !new.chars().all(char::is_alphanumeric) { + bail!("Unsupported characters in new calendar name, only alphanumeric characters are allowed currently"); + } + if new.len() > MAX_CALNAME_CHARS { + bail!("Calendar name can't contain more than 32 characters"); + } + + let (mut list, ct) = CalendarList::load(user).await?; + list.rename(old, new)?; + list.save(user, ct).await?; + + Ok(()) + } + + /// Create calendar + pub async fn create(&self, user: &Arc, name: &str) -> Result<()> { + if name == MAIN_CAL { + bail!("Main calendar is automatically created, can't create it manually"); + } + if !name.chars().all(char::is_alphanumeric) { + bail!("Unsupported characters in new calendar name, only alphanumeric characters are allowed"); + } + if name.len() > MAX_CALNAME_CHARS { + bail!("Calendar name can't contain more than 32 characters"); + } + + let (mut list, ct) = CalendarList::load(user).await?; + match list.create(name) { + CalendarExists::Existed(_) => bail!("Calendar {} already exists", name), + CalendarExists::Created(_) => (), + } + list.save(user, ct).await?; + + Ok(()) + } + + /// Has calendar + pub async fn has(&self, user: &Arc, name: &str) -> Result { + CalendarList::load(user).await.map(|(list, _)| list.has(name)) } } +// ------ +// ------ From this point, implementation is hidden from the rest of the crate +// ------ + #[derive(Serialize, Deserialize)] -pub(crate) struct CalendarList(BTreeMap); +struct CalendarList(BTreeMap); #[derive(Serialize, Deserialize, Clone, Copy, Debug)] -pub(crate) struct CalendarListEntry { +struct CalendarListEntry { id_lww: (u64, Option), } impl CalendarList { - pub(crate) async fn load(user: &Arc) -> Result<(Self, Option)> { - todo!(); + // ---- Index persistence related functions + + /// Load from storage + async fn load(user: &Arc) -> Result<(Self, Option)> { + let row_ref = storage::RowRef::new(CAL_LIST_PK, CAL_LIST_SK); + let (mut list, row) = match user + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { + Err(storage::StorageError::NotFound) => (Self::new(), None), + Err(e) => return Err(e.into()), + Ok(rv) => { + let mut list = Self::new(); + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { + if let storage::Alternative::Value(vbytes) = v { + let list2 = open_deserialize::(&vbytes, &user.creds.keys.master)?; + list.merge(list2); + } + } + (list, Some(row_ref)) + } + }; + + // Create default calendars (currently only one calendar is created) + let is_default_cal_missing = [MAIN_CAL] + .iter() + .map(|calname| list.create(calname)) + .fold(false, |acc, r| { + acc || matches!(r, CalendarExists::Created(..)) + }); + + // Save the index if we created a new calendar + if is_default_cal_missing { + list.save(user, row.clone()).await?; + } + + Ok((list, row)) } - pub(crate) async fn save(user: &Arc, ct: Option) -> Result<()> { - todo!(); + /// Save an updated index + async fn save(&self, user: &Arc, ct: Option) -> Result<()> { + let list_blob = seal_serialize(self, &user.creds.keys.master)?; + let rref = ct.unwrap_or(storage::RowRef::new(CAL_LIST_PK, CAL_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + user.storage.row_insert(vec![row_val]).await?; + Ok(()) } - pub(crate) fn new() -> Self { + // ----- Index manipulation functions + + /// Ensure that a given calendar exists + /// (Don't forget to save if it returns CalendarExists::Created) + fn create(&mut self, name: &str) -> CalendarExists { + if let Some(CalendarListEntry { + id_lww: (_, Some(id)) + }) = self.0.get(name) + { + return CalendarExists::Existed(*id); + } + + let id = gen_ident(); + self.bind(name, Some(id)).unwrap(); + CalendarExists::Created(id) + } + + /// Get a list of all calendar names + fn names(&self) -> Vec { + self.0 + .iter() + .filter(|(_, v)| v.id_lww.1.is_some()) + .map(|(k, _)| k.to_string()) + .collect() + } + + /// For a given calendar name, get its Unique Identifier + fn get(&self, name: &str) -> Option { + self.0.get(name).map(|CalendarListEntry { + id_lww: (_, ident), + }| *ident).flatten() + } + + /// Check if a given calendar name exists + fn has(&self, name: &str) -> bool { + self.get(name).is_some() + } + + /// Rename a calendar + fn rename(&mut self, old: &str, new: &str) -> Result<()> { + if self.has(new) { + bail!("Calendar {} already exists", new); + } + let ident = match self.get(old) { + None => bail!("Calendar {} does not exist", old), + Some(ident) => ident, + }; + + self.bind(old, None); + self.bind(new, Some(ident)); + + Ok(()) + } + + // ----- Internal logic + + /// New is not publicly exposed, use `load` instead + fn new() -> Self { Self(BTreeMap::new()) } + + /// Low level index updating logic (used to add/rename/delete) an entry + fn bind(&mut self, name: &str, id: Option) -> Option<()> { + let (ts, id) = match self.0.get_mut(name) { + None => { + if id.is_none() { + // User wants to delete entry with given name (passed id is None) + // Entry does not exist (get_mut is None) + // Nothing to do + return None; + } else { + // User wants entry with given name to be present (id is Some) + // Entry does not exist + // Initialize entry + (now_msec(), id) + } + } + Some(CalendarListEntry { + id_lww, + }) => { + if id_lww.1 == id { + // Entry is already equals to the requested id (Option self.id_lww.0 + || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1) + { + self.id_lww = other.id_lww; + } + } +} + +pub(crate) enum CalendarExists { + Created(UniqueIdent), + Existed(UniqueIdent), } diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs index 25aacf5..f797be6 100644 --- a/aero-collections/src/mail/mailbox.rs +++ b/aero-collections/src/mail/mailbox.rs @@ -8,8 +8,8 @@ use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store use aero_bayou::Bayou; use aero_bayou::timestamp::now_msec; -use crate::mail::uidindex::*; use crate::unique_ident::*; +use crate::mail::uidindex::*; use crate::mail::IMF; pub struct Mailbox { diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs index 0c6b931..9ed342f 100644 --- a/aero-collections/src/user.rs +++ b/aero-collections/src/user.rs @@ -14,7 +14,7 @@ use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::unique_ident::UniqueIdent; use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; -use crate::calendar::Calendar; +use crate::calendar::namespace::CalendarNs; //@FIXME User should be totally rewriten // to extract the local mailbox list @@ -29,7 +29,7 @@ pub struct User { pub creds: Credentials, pub storage: storage::Store, pub mailboxes: std::sync::Mutex>>, - pub calendars: std::sync::Mutex>>, + pub calendars: CalendarNs, // Handle on worker processing received email // (moving emails from the mailqueue to the user's INBOX) @@ -186,7 +186,7 @@ impl User { storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), - calendars: std::sync::Mutex::new(HashMap::new()), + calendars: CalendarNs::new(), }); // Ensure INBOX exists (done inside load_mailbox_list) -- cgit v1.2.3 From 0b57200eeb6780e843c5f798bdc53781eb83d51f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2024 10:33:46 +0100 Subject: Dav DAG wip --- aero-collections/src/calendar/mod.rs | 16 ++- aero-collections/src/davdag.rs | 185 +++++++++++++++++++++++++++++++++++ aero-collections/src/lib.rs | 1 + 3 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 aero-collections/src/davdag.rs (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index d2217b8..6537a4e 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -1,13 +1,19 @@ pub mod namespace; use anyhow::Result; +use tokio::sync::RwLock; +use aero_bayou::Bayou; use aero_user::login::Credentials; +use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; +use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::unique_ident::*; +use crate::davdag::DavDag; pub struct Calendar { - a: u64, + pub(super) id: UniqueIdent, + internal: RwLock, } impl Calendar { @@ -18,3 +24,11 @@ impl Calendar { todo!(); } } + +struct CalendarInternal { + id: UniqueIdent, + cal_path: String, + encryption_key: Key, + storage: Store, + uid_index: Bayou, +} diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs new file mode 100644 index 0000000..696b985 --- /dev/null +++ b/aero-collections/src/davdag.rs @@ -0,0 +1,185 @@ +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use im::{OrdMap, OrdSet, ordset}; + +use aero_bayou::*; + +use crate::unique_ident::UniqueIdent; + +/// Parents are only persisted in the event log, +/// not in the checkpoints. +pub type Parents = Vec; +pub type Etag = String; +pub type FileName = String; +pub type IndexEntry = (FileName, Etag); + +#[derive(Clone, Default)] +pub struct DavDag { + /// Source of trust + pub table: OrdMap, + + /// Indexes optimized for queries + pub idx_by_filename: OrdMap, + + /// Partial synchronization graph + /// parent -> direct children + pub successors: OrdMap>, + + /// Head nodes + pub heads: OrdSet, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum DavDagOp { + /// Merge is a virtual operation run when multiple heads are discovered + Merge(Parents, UniqueIdent), + + /// Add an item to the collection + Put(Parents, UniqueIdent, IndexEntry), + + /// Delete an item from the collection + Delete(Parents, UniqueIdent), +} + +impl DavDag { + pub fn op_merge(&self, ident: UniqueIdent) -> DavDagOp { + DavDagOp::Merge(self.heads_vec(), ident) + } + + pub fn op_put(&self, ident: UniqueIdent, entry: IndexEntry) -> DavDagOp { + DavDagOp::Put(self.heads_vec(), ident, entry) + } + + pub fn op_delete(&self, ident: UniqueIdent) -> DavDagOp { + DavDagOp::Delete(self.heads_vec(), ident) + } + + // HELPER functions + pub fn heads_vec(&self) -> Vec { + self.heads.clone().into_iter().collect() + } + + // INTERNAL functions + fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { + // Insert item in the source of trust + self.table.insert(ident, entry.clone()); + + // Update the cache + let (filename, _etag) = entry; + self.idx_by_filename.insert(filename, ident); + } + + fn unregister(&mut self, ident: &UniqueIdent) { + // Query the source of truth to get the information we + // need to clean the indexes + let (filename, _etag) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + self.idx_by_filename.remove(filename); + + // Finally clear item from the source of trust + self.table.remove(ident); + } + + // @FIXME: maybe in case of error we could simply disable the sync graph + // and ask the client to rely on manual sync. For now, we are skipping the event + // which is midly satisfying. + fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { + // All parents must exist in successors otherwise we can't accept item + // do the check + update successors + let mut try_successors = self.successors.clone(); + for par in parents.iter() { + match try_successors.get_mut(par) { + None => { + tracing::warn!("Unable to push a Dav DAG sync op into the graph, an event is missing, it's a bug"); + return false + }, + Some(v) => v.insert(*child), + }; + } + self.successors = try_successors; + + // Remove from HEADS this event's parents + parents.iter().for_each(|par| { self.heads.remove(par); }); + + // This event becomes a new HEAD in turn + self.heads.insert(*child); + + // This event is also a future successor + self.successors.insert(*child, ordset![]); + + true + } +} + +impl BayouState for DavDag { + type Op = DavDagOp; + + fn apply(&self, op: &Self::Op) -> Self { + let mut new = self.clone(); + + match op { + DavDagOp::Put(parents, ident, entry) => { + if new.sync_dag(ident, parents.as_slice()) { + new.register(*ident, entry.clone()); + } + }, + DavDagOp::Delete(parents, ident) => { + if new.sync_dag(ident, parents.as_slice()) { + new.unregister(ident); + } + }, + DavDagOp::Merge(parents, ident) => { + new.sync_dag(ident, parents.as_slice()); + } + } + + new + } +} + +// CUSTOM SERIALIZATION & DESERIALIZATION +#[derive(Serialize, Deserialize)] +struct DavDagSerializedRepr { + items: Vec<(UniqueIdent, IndexEntry)>, + heads: Vec, +} + +impl<'de> Deserialize<'de> for DavDag { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: DavDagSerializedRepr = DavDagSerializedRepr::deserialize(d)?; + let mut davdag = DavDag::default(); + + // Build the table + index + val.items.into_iter().for_each(|(ident, entry)| davdag.register(ident, entry)); + + // Initialize the synchronization DAG with its roots + val.heads.into_iter().for_each(|ident| { + davdag.successors.insert(ident, ordset![]); + davdag.heads.insert(ident); + }); + + Ok(davdag) + } +} + +impl Serialize for DavDag { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // Indexes are rebuilt on the fly, we serialize only the core database + let items = self.table.iter().map(|(ident, entry)| (*ident, entry.clone())).collect(); + + // We keep only the head entries from the sync graph, + // these entries will be used to initialize it back when deserializing + let heads = self.heads_vec(); + + // Finale serialization object + let val = DavDagSerializedRepr { items, heads }; + val.serialize(serializer) + } +} diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs index 269cd13..ef8b8d8 100644 --- a/aero-collections/src/lib.rs +++ b/aero-collections/src/lib.rs @@ -1,4 +1,5 @@ pub mod unique_ident; +pub mod davdag; pub mod user; pub mod mail; pub mod calendar; -- cgit v1.2.3 From a146a0babc25547f269c784e090e308fa831ab32 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2024 15:09:18 +0100 Subject: Sync algorithm --- aero-collections/src/davdag.rs | 82 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 5 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 696b985..59dcc7b 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -1,3 +1,4 @@ +use anyhow::{bail, Result}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use im::{OrdMap, OrdSet, ordset}; @@ -23,9 +24,14 @@ pub struct DavDag { /// Partial synchronization graph /// parent -> direct children pub successors: OrdMap>, - + pub ancestors: OrdMap>, + + /// All nodes + pub all_nodes: OrdSet, /// Head nodes pub heads: OrdSet, + /// Origin nodes + pub origins: OrdSet, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -54,11 +60,57 @@ impl DavDag { } // HELPER functions + + /// All HEAD events pub fn heads_vec(&self) -> Vec { self.heads.clone().into_iter().collect() } + /// Resolve a sync token + pub fn resolve(&self, known: UniqueIdent) -> Result> { + let already_known = self.all_ancestors(known); + + // We can't capture all missing events if we are not connected + // to all sinks of the graph, ie. if we don't already know all the sinks. + if !self.origins.is_subset(already_known.clone()) { + bail!("Not enough history to produce a correct diff, a full resync is needed"); + } + + // Missing items are all existing graph items from which + // we removed all items known by the given node. + // In other words, all values in all_nodes that are not in already_known. + Ok(self.all_nodes.clone().relative_complement(already_known)) + } + + /// Find all ancestors of a given + fn all_ancestors(&self, known: UniqueIdent) -> OrdSet { + let mut all_known: OrdSet = OrdSet::new(); + let mut to_collect = vec![known]; + loop { + let cursor = match to_collect.pop() { + // Loop stops here + None => break, + Some(v) => v, + }; + + if all_known.insert(cursor).is_some() { + // Item already processed + continue + } + + // Collect parents + let parents = match self.ancestors.get(&cursor) { + None => continue, + Some(c) => c, + }; + to_collect.extend(parents.iter()); + } + all_known + } + // INTERNAL functions + + /// Register a WebDAV item (put, copy, move) fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { // Insert item in the source of trust self.table.insert(ident, entry.clone()); @@ -68,6 +120,7 @@ impl DavDag { self.idx_by_filename.insert(filename, ident); } + /// Unregister a WebDAV item (delete, move) fn unregister(&mut self, ident: &UniqueIdent) { // Query the source of truth to get the information we // need to clean the indexes @@ -84,8 +137,11 @@ impl DavDag { // @FIXME: maybe in case of error we could simply disable the sync graph // and ask the client to rely on manual sync. For now, we are skipping the event // which is midly satisfying. + + /// When an event is processed, update the synchronization DAG fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { - // All parents must exist in successors otherwise we can't accept item + // --- Update SUCCESSORS + // All parents must exist in successors otherwise we can't accept item: // do the check + update successors let mut try_successors = self.successors.clone(); for par in parents.iter() { @@ -99,15 +155,29 @@ impl DavDag { } self.successors = try_successors; + // This event is also a future successor + self.successors.insert(*child, ordset![]); + + // --- Update ANCESTORS + // We register ancestors as it is required for the sync algorithm + self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| { + acc.insert(*p); + acc + })); + + // --- Update ORIGINS + // If this event has no parents, it's an origin + if parents.is_empty() { + self.origins.insert(*child); + } + + // --- Update HEADS // Remove from HEADS this event's parents parents.iter().for_each(|par| { self.heads.remove(par); }); // This event becomes a new HEAD in turn self.heads.insert(*child); - // This event is also a future successor - self.successors.insert(*child, ordset![]); - true } } @@ -160,6 +230,8 @@ impl<'de> Deserialize<'de> for DavDag { val.heads.into_iter().for_each(|ident| { davdag.successors.insert(ident, ordset![]); davdag.heads.insert(ident); + davdag.origins.insert(ident); + davdag.all_nodes.insert(ident); }); Ok(davdag) -- cgit v1.2.3 From 9afbfeb42794a71170fe4c46c911446bcc217660 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2024 16:16:37 +0100 Subject: Testing DAG sync --- aero-collections/src/davdag.rs | 174 ++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 61 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 59dcc7b..63a76a8 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -4,14 +4,18 @@ use im::{OrdMap, OrdSet, ordset}; use aero_bayou::*; -use crate::unique_ident::UniqueIdent; +use crate::unique_ident::{gen_ident, UniqueIdent}; /// Parents are only persisted in the event log, /// not in the checkpoints. -pub type Parents = Vec; +pub type Token = UniqueIdent; +pub type Parents = Vec; +pub type SyncDesc = (Parents, Token); + +pub type BlobId = UniqueIdent; pub type Etag = String; pub type FileName = String; -pub type IndexEntry = (FileName, Etag); +pub type IndexEntry = (BlobId, FileName, Etag); #[derive(Clone, Default)] pub struct DavDag { @@ -22,8 +26,6 @@ pub struct DavDag { pub idx_by_filename: OrdMap, /// Partial synchronization graph - /// parent -> direct children - pub successors: OrdMap>, pub ancestors: OrdMap>, /// All nodes @@ -37,33 +39,46 @@ pub struct DavDag { #[derive(Clone, Serialize, Deserialize, Debug)] pub enum DavDagOp { /// Merge is a virtual operation run when multiple heads are discovered - Merge(Parents, UniqueIdent), + Merge(SyncDesc), /// Add an item to the collection - Put(Parents, UniqueIdent, IndexEntry), + Put(SyncDesc, IndexEntry), /// Delete an item from the collection - Delete(Parents, UniqueIdent), + Delete(SyncDesc, BlobId), +} +impl DavDagOp { + pub fn token(&self) -> Token { + match self { + Self::Merge((_, t)) => *t, + Self::Put((_, t), _) => *t, + Self::Delete((_, t), _) => *t, + } + } } impl DavDag { - pub fn op_merge(&self, ident: UniqueIdent) -> DavDagOp { - DavDagOp::Merge(self.heads_vec(), ident) + pub fn op_merge(&self) -> DavDagOp { + DavDagOp::Merge(self.sync_desc()) } - pub fn op_put(&self, ident: UniqueIdent, entry: IndexEntry) -> DavDagOp { - DavDagOp::Put(self.heads_vec(), ident, entry) + pub fn op_put(&self, entry: IndexEntry) -> DavDagOp { + DavDagOp::Put(self.sync_desc(), entry) } - pub fn op_delete(&self, ident: UniqueIdent) -> DavDagOp { - DavDagOp::Delete(self.heads_vec(), ident) + pub fn op_delete(&self, ident: BlobId) -> DavDagOp { + DavDagOp::Delete(self.sync_desc(), ident) } // HELPER functions - /// All HEAD events - pub fn heads_vec(&self) -> Vec { - self.heads.clone().into_iter().collect() + pub fn heads_vec(&self) -> Vec { + self.heads.clone().into_iter().collect() + } + + /// A sync descriptor + pub fn sync_desc(&self) -> SyncDesc { + (self.heads_vec(), gen_ident()) } /// Resolve a sync token @@ -71,18 +86,21 @@ impl DavDag { let already_known = self.all_ancestors(known); // We can't capture all missing events if we are not connected - // to all sinks of the graph, ie. if we don't already know all the sinks. + // to all sinks of the graph, + // ie. if we don't already know all the sinks, + // ie. if we are missing so much history that + // the event log has been transformed into a checkpoint if !self.origins.is_subset(already_known.clone()) { bail!("Not enough history to produce a correct diff, a full resync is needed"); } - // Missing items are all existing graph items from which - // we removed all items known by the given node. - // In other words, all values in all_nodes that are not in already_known. + // Missing items are *all existing graph items* from which + // we removed *all items known by the given node*. + // In other words, all values in `all_nodes` that are not in `already_known`. Ok(self.all_nodes.clone().relative_complement(already_known)) } - /// Find all ancestors of a given + /// Find all ancestors of a given node fn all_ancestors(&self, known: UniqueIdent) -> OrdSet { let mut all_known: OrdSet = OrdSet::new(); let mut to_collect = vec![known]; @@ -111,21 +129,23 @@ impl DavDag { // INTERNAL functions /// Register a WebDAV item (put, copy, move) - fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { + fn register(&mut self, entry: IndexEntry) { + let (blob_id, filename, _etag) = entry.clone(); + // Insert item in the source of trust - self.table.insert(ident, entry.clone()); + self.table.insert(blob_id, entry); // Update the cache - let (filename, _etag) = entry; - self.idx_by_filename.insert(filename, ident); + self.idx_by_filename.insert(filename, blob_id); } /// Unregister a WebDAV item (delete, move) fn unregister(&mut self, ident: &UniqueIdent) { // Query the source of truth to get the information we // need to clean the indexes - let (filename, _etag) = match self.table.get(ident) { + let (_blob_id, filename, _etag) = match self.table.get(ident) { Some(v) => v, + // Element does not exist, return early None => return, }; self.idx_by_filename.remove(filename); @@ -134,29 +154,9 @@ impl DavDag { self.table.remove(ident); } - // @FIXME: maybe in case of error we could simply disable the sync graph - // and ask the client to rely on manual sync. For now, we are skipping the event - // which is midly satisfying. - /// When an event is processed, update the synchronization DAG - fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { - // --- Update SUCCESSORS - // All parents must exist in successors otherwise we can't accept item: - // do the check + update successors - let mut try_successors = self.successors.clone(); - for par in parents.iter() { - match try_successors.get_mut(par) { - None => { - tracing::warn!("Unable to push a Dav DAG sync op into the graph, an event is missing, it's a bug"); - return false - }, - Some(v) => v.insert(*child), - }; - } - self.successors = try_successors; - - // This event is also a future successor - self.successors.insert(*child, ordset![]); + fn sync_dag(&mut self, sync_desc: &SyncDesc) -> bool { + let (parents, child) = sync_desc; // --- Update ANCESTORS // We register ancestors as it is required for the sync algorithm @@ -177,6 +177,9 @@ impl DavDag { // This event becomes a new HEAD in turn self.heads.insert(*child); + + // --- Update ALL NODES + self.all_nodes.insert(*child); true } @@ -189,18 +192,18 @@ impl BayouState for DavDag { let mut new = self.clone(); match op { - DavDagOp::Put(parents, ident, entry) => { - if new.sync_dag(ident, parents.as_slice()) { - new.register(*ident, entry.clone()); + DavDagOp::Put(sync_desc, entry) => { + if new.sync_dag(sync_desc) { + new.register(entry.clone()); } }, - DavDagOp::Delete(parents, ident) => { - if new.sync_dag(ident, parents.as_slice()) { - new.unregister(ident); + DavDagOp::Delete(sync_desc, blob_id) => { + if new.sync_dag(sync_desc) { + new.unregister(blob_id); } }, - DavDagOp::Merge(parents, ident) => { - new.sync_dag(ident, parents.as_slice()); + DavDagOp::Merge(sync_desc) => { + new.sync_dag(sync_desc); } } @@ -211,7 +214,7 @@ impl BayouState for DavDag { // CUSTOM SERIALIZATION & DESERIALIZATION #[derive(Serialize, Deserialize)] struct DavDagSerializedRepr { - items: Vec<(UniqueIdent, IndexEntry)>, + items: Vec, heads: Vec, } @@ -224,11 +227,10 @@ impl<'de> Deserialize<'de> for DavDag { let mut davdag = DavDag::default(); // Build the table + index - val.items.into_iter().for_each(|(ident, entry)| davdag.register(ident, entry)); + val.items.into_iter().for_each(|entry| davdag.register(entry)); // Initialize the synchronization DAG with its roots val.heads.into_iter().for_each(|ident| { - davdag.successors.insert(ident, ordset![]); davdag.heads.insert(ident); davdag.origins.insert(ident); davdag.all_nodes.insert(ident); @@ -244,7 +246,7 @@ impl Serialize for DavDag { S: Serializer, { // Indexes are rebuilt on the fly, we serialize only the core database - let items = self.table.iter().map(|(ident, entry)| (*ident, entry.clone())).collect(); + let items = self.table.iter().map(|(_, entry)| entry.clone()).collect(); // We keep only the head entries from the sync graph, // these entries will be used to initialize it back when deserializing @@ -255,3 +257,53 @@ impl Serialize for DavDag { val.serialize(serializer) } } + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn base() { + let mut state = DavDag::default(); + + // Add item 1 + { + let m = UniqueIdent([0x01; 24]); + let ev = state.op_put((m, "cal.ics".into(), "321-321".into())); + state = state.apply(&ev); + + assert_eq!(state.table.len(), 1); + assert_eq!(state.resolve(ev.token()).unwrap().len(), 0); + } + + // Add 2 concurrent items + let (t1, t2) = { + let blob1 = UniqueIdent([0x02; 24]); + let ev1 = state.op_put((blob1, "cal2.ics".into(), "321-321".into())); + + let blob2 = UniqueIdent([0x01; 24]); + let ev2 = state.op_delete(blob2); + + state = state.apply(&ev1); + state = state.apply(&ev2); + + assert_eq!(state.table.len(), 1); + assert_eq!(state.resolve(ev1.token()).unwrap(), ordset![ev2.token()]); + + (ev1.token(), ev2.token()) + }; + + // Add later a new item + { + let blob3 = UniqueIdent([0x03; 24]); + let ev = state.op_put((blob3, "cal3.ics".into(), "321-321".into())); + + state = state.apply(&ev); + assert_eq!(state.table.len(), 2); + assert_eq!(state.resolve(ev.token()).unwrap().len(), 0); + assert_eq!(state.resolve(t1).unwrap(), ordset![t2, ev.token()]); + } + } +} -- cgit v1.2.3 From f179479c308876f2f41de695cc0375d7fd20b233 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 4 Apr 2024 11:28:15 +0200 Subject: WIP implem storage --- aero-collections/src/calendar/mod.rs | 79 ++++++++++++++++++++++++++++++++++-- aero-collections/src/davdag.rs | 61 +++++++++++++++++----------- 2 files changed, 113 insertions(+), 27 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 6537a4e..936f8c3 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -9,7 +9,7 @@ use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::unique_ident::*; -use crate::davdag::DavDag; +use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange}; pub struct Calendar { pub(super) id: UniqueIdent, @@ -20,8 +20,49 @@ impl Calendar { pub(crate) async fn open( creds: &Credentials, id: UniqueIdent, - ) -> Result { - todo!(); + ) -> Result { + let bayou_path = format!("calendar/dag/{}", id); + let cal_path = format!("calendar/events/{}", id); + + let mut davdag = Bayou::::new(creds, bayou_path).await?; + davdag.sync().await?; + + let internal = RwLock::new(CalendarInternal { + id, + encryption_key: creds.keys.master.clone(), + storage: creds.storage.build().await?, + davdag, + cal_path, + }); + + Ok(Self { id, internal }) + } + + /// Sync data with backing store + pub async fn force_sync(&self) -> Result<()> { + self.internal.write().await.force_sync().await + } + + /// Sync data with backing store only if changes are detected + /// or last sync is too old + pub async fn opportunistic_sync(&self) -> Result<()> { + self.internal.write().await.opportunistic_sync().await + } + + pub async fn get(&self, blob_id: UniqueIdent, message_key: &Key) -> Result> { + self.internal.read().await.get(blob_id, message_key).await + } + + pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { + self.internal.read().await.diff(sync_token).await + } + + pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result { + self.internal.write().await.put(entry, evt).await + } + + pub async fn delete(&self, blob_id: UniqueIdent) -> Result { + self.internal.write().await.delete(blob_id).await } } @@ -30,5 +71,35 @@ struct CalendarInternal { cal_path: String, encryption_key: Key, storage: Store, - uid_index: Bayou, + davdag: Bayou, +} + +impl CalendarInternal { + async fn force_sync(&mut self) -> Result<()> { + self.davdag.sync().await?; + Ok(()) + } + + async fn opportunistic_sync(&mut self) -> Result<()> { + self.davdag.opportunistic_sync().await?; + Ok(()) + } + + async fn get(&self, blob_id: BlobId, message_key: &Key) -> Result> { + todo!() + } + + async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result { + //@TODO write event to S3 + //@TODO add entry into Bayou + todo!(); + } + + async fn delete(&mut self, blob_id: BlobId) -> Result { + todo!(); + } + + async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { + todo!(); + } } diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 63a76a8..f668831 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -20,20 +20,31 @@ pub type IndexEntry = (BlobId, FileName, Etag); #[derive(Clone, Default)] pub struct DavDag { /// Source of trust - pub table: OrdMap, + pub table: OrdMap, /// Indexes optimized for queries - pub idx_by_filename: OrdMap, + pub idx_by_filename: OrdMap, + + // ------------ Below this line, data is ephemeral, ie. not checkpointed /// Partial synchronization graph - pub ancestors: OrdMap>, + pub ancestors: OrdMap>, /// All nodes - pub all_nodes: OrdSet, + pub all_nodes: OrdSet, /// Head nodes - pub heads: OrdSet, + pub heads: OrdSet, /// Origin nodes - pub origins: OrdSet, + pub origins: OrdSet, + + /// File change token by token + pub change: OrdMap, +} + +#[derive(Clone, Debug)] +pub enum SyncChange { + Ok(FileName), + NotFound(FileName), } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -66,8 +77,8 @@ impl DavDag { DavDagOp::Put(self.sync_desc(), entry) } - pub fn op_delete(&self, ident: BlobId) -> DavDagOp { - DavDagOp::Delete(self.sync_desc(), ident) + pub fn op_delete(&self, blob_id: BlobId) -> DavDagOp { + DavDagOp::Delete(self.sync_desc(), blob_id) } // HELPER functions @@ -129,33 +140,41 @@ impl DavDag { // INTERNAL functions /// Register a WebDAV item (put, copy, move) - fn register(&mut self, entry: IndexEntry) { + fn register(&mut self, sync_token: Option, entry: IndexEntry) { let (blob_id, filename, _etag) = entry.clone(); // Insert item in the source of trust self.table.insert(blob_id, entry); // Update the cache - self.idx_by_filename.insert(filename, blob_id); + self.idx_by_filename.insert(filename.to_string(), blob_id); + + // Record the change in the ephemeral synchronization map + if let Some(sync_token) = sync_token { + self.change.insert(sync_token, SyncChange::Ok(filename)); + } } /// Unregister a WebDAV item (delete, move) - fn unregister(&mut self, ident: &UniqueIdent) { + fn unregister(&mut self, sync_token: Token, blob_id: &BlobId) { // Query the source of truth to get the information we // need to clean the indexes - let (_blob_id, filename, _etag) = match self.table.get(ident) { + let (_blob_id, filename, _etag) = match self.table.get(blob_id) { Some(v) => v, // Element does not exist, return early None => return, }; self.idx_by_filename.remove(filename); + // Record the change in the ephemeral synchronization map + self.change.insert(sync_token, SyncChange::NotFound(filename.to_string())); + // Finally clear item from the source of trust - self.table.remove(ident); + self.table.remove(blob_id); } /// When an event is processed, update the synchronization DAG - fn sync_dag(&mut self, sync_desc: &SyncDesc) -> bool { + fn sync_dag(&mut self, sync_desc: &SyncDesc) { let (parents, child) = sync_desc; // --- Update ANCESTORS @@ -180,8 +199,6 @@ impl DavDag { // --- Update ALL NODES self.all_nodes.insert(*child); - - true } } @@ -193,14 +210,12 @@ impl BayouState for DavDag { match op { DavDagOp::Put(sync_desc, entry) => { - if new.sync_dag(sync_desc) { - new.register(entry.clone()); - } + new.sync_dag(sync_desc); + new.register(Some(sync_desc.1), entry.clone()); }, DavDagOp::Delete(sync_desc, blob_id) => { - if new.sync_dag(sync_desc) { - new.unregister(blob_id); - } + new.sync_dag(sync_desc); + new.unregister(sync_desc.1, blob_id); }, DavDagOp::Merge(sync_desc) => { new.sync_dag(sync_desc); @@ -227,7 +242,7 @@ impl<'de> Deserialize<'de> for DavDag { let mut davdag = DavDag::default(); // Build the table + index - val.items.into_iter().for_each(|entry| davdag.register(entry)); + val.items.into_iter().for_each(|entry| davdag.register(None, entry)); // Initialize the synchronization DAG with its roots val.heads.into_iter().for_each(|ident| { -- cgit v1.2.3 From 2efdd40b8edd83c3fef3d94f7e62b41b86e49959 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 4 Apr 2024 11:57:32 +0200 Subject: Write PUT --- aero-collections/src/calendar/mod.rs | 53 +++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 7 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 936f8c3..7e5a8c1 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -38,6 +38,8 @@ impl Calendar { Ok(Self { id, internal }) } + // ---- DAG sync utilities + /// Sync data with backing store pub async fn force_sync(&self) -> Result<()> { self.internal.write().await.force_sync().await @@ -49,24 +51,40 @@ impl Calendar { self.internal.write().await.opportunistic_sync().await } - pub async fn get(&self, blob_id: UniqueIdent, message_key: &Key) -> Result> { - self.internal.read().await.get(blob_id, message_key).await + // ---- Data API + + /// Access the DAG internal data (you can get the list of files for example) + pub async fn dag(&self) -> DavDag { + // Cloning is cheap + self.internal.read().await.davdag.state().clone() } + /// The diff API is a write API as we might need to push a merge node + /// to get a new sync token pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { - self.internal.read().await.diff(sync_token).await + self.internal.write().await.diff(sync_token).await } + /// Get a specific event + pub async fn get(&self, evt_id: UniqueIdent, message_key: &Key) -> Result> { + self.internal.read().await.get(evt_id, message_key).await + } + + /// Put a specific event pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result { self.internal.write().await.put(entry, evt).await } + /// Delete a specific event pub async fn delete(&self, blob_id: UniqueIdent) -> Result { self.internal.write().await.delete(blob_id).await } } +use base64::Engine; +const MESSAGE_KEY: &str = "message-key"; struct CalendarInternal { + #[allow(dead_code)] id: UniqueIdent, cal_path: String, encryption_key: Key, @@ -90,16 +108,37 @@ impl CalendarInternal { } async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result { - //@TODO write event to S3 - //@TODO add entry into Bayou - todo!(); + let message_key = gen_key(); + + let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?; + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key); + + // Write event to S3 + let message_blob = cryptoblob::seal(evt, &message_key)?; + let blob_val = BlobVal::new( + BlobRef(format!("{}/{}", self.cal_path, entry.0)), + message_blob, + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); + + self.storage + .blob_insert(blob_val) + .await?; + + // Add entry to Bayou + let davstate = self.davdag.state(); + let put_op = davstate.op_put(entry); + let token = put_op.token(); + self.davdag.push(put_op).await?; + + Ok(token) } async fn delete(&mut self, blob_id: BlobId) -> Result { todo!(); } - async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { + async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec)> { todo!(); } } -- cgit v1.2.3 From 272b93f04a0640e056fe994f48cb2837eacdad46 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 4 Apr 2024 14:59:47 +0200 Subject: GET logic --- aero-collections/src/calendar/mod.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 7e5a8c1..0e0e65f 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -1,6 +1,6 @@ pub mod namespace; -use anyhow::Result; +use anyhow::{anyhow, Result}; use tokio::sync::RwLock; use aero_bayou::Bayou; @@ -66,8 +66,8 @@ impl Calendar { } /// Get a specific event - pub async fn get(&self, evt_id: UniqueIdent, message_key: &Key) -> Result> { - self.internal.read().await.get(evt_id, message_key).await + pub async fn get(&self, evt_id: UniqueIdent) -> Result> { + self.internal.read().await.get(evt_id).await } /// Put a specific event @@ -103,8 +103,24 @@ impl CalendarInternal { Ok(()) } - async fn get(&self, blob_id: BlobId, message_key: &Key) -> Result> { - todo!() + async fn get(&self, blob_id: BlobId) -> Result> { + // Fetch message from S3 + let blob_ref = storage::BlobRef(format!("{}/{}", self.cal_path, blob_id)); + let object = self.storage.blob_fetch(&blob_ref).await?; + + // Decrypt message key from headers + let key_encrypted_b64 = object + .meta + .get(MESSAGE_KEY) + .ok_or(anyhow!("Missing key in metadata"))?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; + let message_key_raw = cryptoblob::open(&key_encrypted, &self.encryption_key)?; + let message_key = + cryptoblob::Key::from_slice(&message_key_raw).ok_or(anyhow!("Invalid message key"))?; + + // Decrypt body + let body = object.value; + cryptoblob::open(&body, &message_key) } async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result { -- cgit v1.2.3 From 054bd52279faefd327be092ea7ec13f75f0a6163 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 4 Apr 2024 15:40:26 +0200 Subject: Implement diff --- aero-collections/src/calendar/mod.rs | 43 +++++++++++++++++++++++++++++++----- aero-collections/src/davdag.rs | 4 ++-- aero-collections/src/mail/mailbox.rs | 2 +- 3 files changed, 41 insertions(+), 8 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 0e0e65f..127f41b 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -1,12 +1,12 @@ pub mod namespace; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use tokio::sync::RwLock; use aero_bayou::Bayou; use aero_user::login::Credentials; -use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; -use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; +use aero_user::cryptoblob::{self, gen_key, Key}; +use aero_user::storage::{self, BlobRef, BlobVal, Store}; use crate::unique_ident::*; use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange}; @@ -151,10 +151,43 @@ impl CalendarInternal { } async fn delete(&mut self, blob_id: BlobId) -> Result { - todo!(); + let davstate = self.davdag.state(); + + if davstate.table.contains_key(&blob_id) { + bail!("Cannot delete event that doesn't exist"); + } + + let del_op = davstate.op_delete(blob_id); + let token = del_op.token(); + self.davdag.push(del_op).await?; + + let blob_ref = BlobRef(format!("{}/{}", self.cal_path, blob_id)); + self.storage.blob_rm(&blob_ref).await?; + + Ok(token) } async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec)> { - todo!(); + let davstate = self.davdag.state(); + + let token_changed = davstate.resolve(sync_token)?; + let changes = token_changed + .iter() + .filter_map(|t: &Token| davstate.change.get(t)) + .map(|s| s.clone()) + .collect(); + + let heads = davstate.heads_vec(); + let token = match heads.as_slice() { + [ token ] => *token, + _ => { + let op_mg = davstate.op_merge(); + let token = op_mg.token(); + self.davdag.push(op_mg).await?; + token + } + }; + + Ok((token, changes)) } } diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index f668831..3aaebb8 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -93,7 +93,7 @@ impl DavDag { } /// Resolve a sync token - pub fn resolve(&self, known: UniqueIdent) -> Result> { + pub fn resolve(&self, known: Token) -> Result> { let already_known = self.all_ancestors(known); // We can't capture all missing events if we are not connected @@ -112,7 +112,7 @@ impl DavDag { } /// Find all ancestors of a given node - fn all_ancestors(&self, known: UniqueIdent) -> OrdSet { + fn all_ancestors(&self, known: Token) -> OrdSet { let mut all_known: OrdSet = OrdSet::new(); let mut to_collect = vec![known]; loop { diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs index f797be6..fcdb21e 100644 --- a/aero-collections/src/mail/mailbox.rs +++ b/aero-collections/src/mail/mailbox.rs @@ -375,7 +375,7 @@ impl MailboxInternal { async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { if !self.uid_index.state().table.contains_key(&ident) { - bail!("Cannot delete mail that doesn't exit"); + bail!("Cannot delete mail that doesn't exist"); } let del_mail_op = self.uid_index.state().op_mail_del(ident); -- cgit v1.2.3 From a2d2649ef92324ccd314ee787577ed504522824a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 5 Apr 2024 10:19:07 +0200 Subject: WIP dav integration --- aero-collections/src/calendar/namespace.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs index 2fbc364..9c21d19 100644 --- a/aero-collections/src/calendar/namespace.rs +++ b/aero-collections/src/calendar/namespace.rs @@ -17,7 +17,7 @@ pub(crate) const CAL_LIST_SK: &str = "list"; pub(crate) const MAIN_CAL: &str = "Personal"; pub(crate) const MAX_CALNAME_CHARS: usize = 32; -pub(crate) struct CalendarNs(std::sync::Mutex>>); +pub struct CalendarNs(std::sync::Mutex>>); impl CalendarNs { /// Create a new calendar namespace -- cgit v1.2.3 From 4594e068dbba3d3d704728449fc6ccaaadaa82f1 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 23 Apr 2024 10:35:43 +0200 Subject: PUT seems to work --- aero-collections/src/calendar/mod.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 127f41b..feae73e 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -71,8 +71,8 @@ impl Calendar { } /// Put a specific event - pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result { - self.internal.write().await.put(entry, evt).await + pub async fn put<'a>(&self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> { + self.internal.write().await.put(name, evt).await } /// Delete a specific event @@ -123,8 +123,9 @@ impl CalendarInternal { cryptoblob::open(&body, &message_key) } - async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result { + async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> { let message_key = gen_key(); + let blob_id = gen_ident(); let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?; let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key); @@ -132,22 +133,23 @@ impl CalendarInternal { // Write event to S3 let message_blob = cryptoblob::seal(evt, &message_key)?; let blob_val = BlobVal::new( - BlobRef(format!("{}/{}", self.cal_path, entry.0)), + BlobRef(format!("{}/{}", self.cal_path, blob_id)), message_blob, ) .with_meta(MESSAGE_KEY.to_string(), key_header); - self.storage + let etag = self.storage .blob_insert(blob_val) .await?; // Add entry to Bayou + let entry: IndexEntry = (blob_id, name.to_string(), etag); let davstate = self.davdag.state(); - let put_op = davstate.op_put(entry); + let put_op = davstate.op_put(entry.clone()); let token = put_op.token(); self.davdag.push(put_op).await?; - Ok(token) + Ok((token, entry)) } async fn delete(&mut self, blob_id: BlobId) -> Result { -- cgit v1.2.3 From e1d7cf88afd9baab67d53823e95cb1b7f240802f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 24 Apr 2024 17:35:00 +0200 Subject: Working ICS GET/PUT/DELETE --- aero-collections/src/calendar/mod.rs | 2 +- aero-collections/src/davdag.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index feae73e..028cf87 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -155,7 +155,7 @@ impl CalendarInternal { async fn delete(&mut self, blob_id: BlobId) -> Result { let davstate = self.davdag.state(); - if davstate.table.contains_key(&blob_id) { + if !davstate.table.contains_key(&blob_id) { bail!("Cannot delete event that doesn't exist"); } diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 3aaebb8..7335bdc 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -202,6 +202,16 @@ impl DavDag { } } +impl std::fmt::Debug for DavDag { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DavDag\n")?; + for elem in self.table.iter() { + f.write_fmt(format_args!("\t{:?} => {:?}", elem.0, elem.1))?; + } + Ok(()) + } +} + impl BayouState for DavDag { type Op = DavDagOp; -- cgit v1.2.3 From 6b9542088cd1b66af46e95b787493b601accb495 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 30 Apr 2024 13:02:59 +0200 Subject: Add icalendar dependency --- aero-collections/Cargo.toml | 1 + 1 file changed, 1 insertion(+) (limited to 'aero-collections') diff --git a/aero-collections/Cargo.toml b/aero-collections/Cargo.toml index 90d285e..95ab142 100644 --- a/aero-collections/Cargo.toml +++ b/aero-collections/Cargo.toml @@ -22,3 +22,4 @@ rand.workspace = true im.workspace = true sodiumoxide.workspace = true eml-codec.workspace = true +icalendar.workspace = true -- cgit v1.2.3 From 32dfd25f570b7a55bf43752684d286be0f6b2dc2 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 16 May 2024 17:38:34 +0200 Subject: format + WIP calendar-query --- aero-collections/src/calendar/mod.rs | 17 +++++------- aero-collections/src/calendar/namespace.rs | 43 ++++++++++++++---------------- aero-collections/src/davdag.rs | 39 ++++++++++++++++----------- aero-collections/src/lib.rs | 6 ++--- aero-collections/src/mail/incoming.rs | 4 +-- aero-collections/src/mail/mailbox.rs | 6 ++--- aero-collections/src/mail/mod.rs | 2 +- aero-collections/src/mail/namespace.rs | 6 ++++- aero-collections/src/mail/snapshot.rs | 2 +- aero-collections/src/mail/uidindex.rs | 2 +- aero-collections/src/user.rs | 7 +++-- 11 files changed, 70 insertions(+), 64 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 028cf87..cd05328 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -4,12 +4,12 @@ use anyhow::{anyhow, bail, Result}; use tokio::sync::RwLock; use aero_bayou::Bayou; -use aero_user::login::Credentials; use aero_user::cryptoblob::{self, gen_key, Key}; +use aero_user::login::Credentials; use aero_user::storage::{self, BlobRef, BlobVal, Store}; +use crate::davdag::{BlobId, DavDag, IndexEntry, SyncChange, Token}; use crate::unique_ident::*; -use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange}; pub struct Calendar { pub(super) id: UniqueIdent, @@ -17,10 +17,7 @@ pub struct Calendar { } impl Calendar { - pub(crate) async fn open( - creds: &Credentials, - id: UniqueIdent, - ) -> Result { + pub(crate) async fn open(creds: &Credentials, id: UniqueIdent) -> Result { let bayou_path = format!("calendar/dag/{}", id); let cal_path = format!("calendar/events/{}", id); @@ -126,7 +123,7 @@ impl CalendarInternal { async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> { let message_key = gen_key(); let blob_id = gen_ident(); - + let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?; let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key); @@ -138,9 +135,7 @@ impl CalendarInternal { ) .with_meta(MESSAGE_KEY.to_string(), key_header); - let etag = self.storage - .blob_insert(blob_val) - .await?; + let etag = self.storage.blob_insert(blob_val).await?; // Add entry to Bayou let entry: IndexEntry = (blob_id, name.to_string(), etag); @@ -181,7 +176,7 @@ impl CalendarInternal { let heads = davstate.heads_vec(); let token = match heads.as_slice() { - [ token ] => *token, + [token] => *token, _ => { let op_mg = davstate.op_merge(); let token = op_mg.token(); diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs index 9c21d19..db65703 100644 --- a/aero-collections/src/calendar/namespace.rs +++ b/aero-collections/src/calendar/namespace.rs @@ -1,16 +1,16 @@ use anyhow::{bail, Result}; -use std::collections::{HashMap, BTreeMap}; -use std::sync::{Weak, Arc}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Weak}; use serde::{Deserialize, Serialize}; use aero_bayou::timestamp::now_msec; -use aero_user::storage; use aero_user::cryptoblob::{open_deserialize, seal_serialize}; +use aero_user::storage; +use super::Calendar; use crate::unique_ident::{gen_ident, UniqueIdent}; use crate::user::User; -use super::Calendar; pub(crate) const CAL_LIST_PK: &str = "calendars"; pub(crate) const CAL_LIST_SK: &str = "list"; @@ -46,7 +46,7 @@ impl CalendarNs { } let cal = Arc::new(Calendar::open(&user.creds, id).await?); - + let mut cache = self.0.lock().unwrap(); if let Some(concurrent_cal) = cache.get(&id).and_then(Weak::upgrade) { drop(cal); // we worked for nothing but at least we didn't starve someone else @@ -117,13 +117,15 @@ impl CalendarNs { CalendarExists::Created(_) => (), } list.save(user, ct).await?; - + Ok(()) } /// Has calendar pub async fn has(&self, user: &Arc, name: &str) -> Result { - CalendarList::load(user).await.map(|(list, _)| list.has(name)) + CalendarList::load(user) + .await + .map(|(list, _)| list.has(name)) } } @@ -161,7 +163,8 @@ impl CalendarList { for v in row_vals { if let storage::Alternative::Value(vbytes) = v { - let list2 = open_deserialize::(&vbytes, &user.creds.keys.master)?; + let list2 = + open_deserialize::(&vbytes, &user.creds.keys.master)?; list.merge(list2); } } @@ -200,7 +203,7 @@ impl CalendarList { /// (Don't forget to save if it returns CalendarExists::Created) fn create(&mut self, name: &str) -> CalendarExists { if let Some(CalendarListEntry { - id_lww: (_, Some(id)) + id_lww: (_, Some(id)), }) = self.0.get(name) { return CalendarExists::Existed(*id); @@ -222,9 +225,10 @@ impl CalendarList { /// For a given calendar name, get its Unique Identifier fn get(&self, name: &str) -> Option { - self.0.get(name).map(|CalendarListEntry { - id_lww: (_, ident), - }| *ident).flatten() + self.0 + .get(name) + .map(|CalendarListEntry { id_lww: (_, ident) }| *ident) + .flatten() } /// Check if a given calendar name exists @@ -271,9 +275,7 @@ impl CalendarList { (now_msec(), id) } } - Some(CalendarListEntry { - id_lww, - }) => { + Some(CalendarListEntry { id_lww }) => { if id_lww.1 == id { // Entry is already equals to the requested id (Option, // ------------ Below this line, data is ephemeral, ie. not checkpointed - /// Partial synchronization graph pub ancestors: OrdMap>, @@ -84,7 +83,7 @@ impl DavDag { // HELPER functions pub fn heads_vec(&self) -> Vec { - self.heads.clone().into_iter().collect() + self.heads.clone().into_iter().collect() } /// A sync descriptor @@ -99,7 +98,7 @@ impl DavDag { // We can't capture all missing events if we are not connected // to all sinks of the graph, // ie. if we don't already know all the sinks, - // ie. if we are missing so much history that + // ie. if we are missing so much history that // the event log has been transformed into a checkpoint if !self.origins.is_subset(already_known.clone()) { bail!("Not enough history to produce a correct diff, a full resync is needed"); @@ -124,7 +123,7 @@ impl DavDag { if all_known.insert(cursor).is_some() { // Item already processed - continue + continue; } // Collect parents @@ -167,7 +166,8 @@ impl DavDag { self.idx_by_filename.remove(filename); // Record the change in the ephemeral synchronization map - self.change.insert(sync_token, SyncChange::NotFound(filename.to_string())); + self.change + .insert(sync_token, SyncChange::NotFound(filename.to_string())); // Finally clear item from the source of trust self.table.remove(blob_id); @@ -179,10 +179,13 @@ impl DavDag { // --- Update ANCESTORS // We register ancestors as it is required for the sync algorithm - self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| { - acc.insert(*p); - acc - })); + self.ancestors.insert( + *child, + parents.iter().fold(ordset![], |mut acc, p| { + acc.insert(*p); + acc + }), + ); // --- Update ORIGINS // If this event has no parents, it's an origin @@ -192,11 +195,13 @@ impl DavDag { // --- Update HEADS // Remove from HEADS this event's parents - parents.iter().for_each(|par| { self.heads.remove(par); }); + parents.iter().for_each(|par| { + self.heads.remove(par); + }); // This event becomes a new HEAD in turn self.heads.insert(*child); - + // --- Update ALL NODES self.all_nodes.insert(*child); } @@ -217,16 +222,16 @@ impl BayouState for DavDag { fn apply(&self, op: &Self::Op) -> Self { let mut new = self.clone(); - + match op { DavDagOp::Put(sync_desc, entry) => { new.sync_dag(sync_desc); new.register(Some(sync_desc.1), entry.clone()); - }, + } DavDagOp::Delete(sync_desc, blob_id) => { new.sync_dag(sync_desc); new.unregister(sync_desc.1, blob_id); - }, + } DavDagOp::Merge(sync_desc) => { new.sync_dag(sync_desc); } @@ -252,7 +257,9 @@ impl<'de> Deserialize<'de> for DavDag { let mut davdag = DavDag::default(); // Build the table + index - val.items.into_iter().for_each(|entry| davdag.register(None, entry)); + val.items + .into_iter() + .for_each(|entry| davdag.register(None, entry)); // Initialize the synchronization DAG with its roots val.heads.into_iter().for_each(|ident| { diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs index ef8b8d8..eabf61c 100644 --- a/aero-collections/src/lib.rs +++ b/aero-collections/src/lib.rs @@ -1,5 +1,5 @@ -pub mod unique_ident; +pub mod calendar; pub mod davdag; -pub mod user; pub mod mail; -pub mod calendar; +pub mod unique_ident; +pub mod user; diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs index cd2f8fd..55c2515 100644 --- a/aero-collections/src/mail/incoming.rs +++ b/aero-collections/src/mail/incoming.rs @@ -8,16 +8,16 @@ use futures::{future::BoxFuture, FutureExt}; use tokio::sync::watch; use tracing::{debug, error, info, warn}; +use aero_bayou::timestamp::now_msec; use aero_user::cryptoblob; use aero_user::login::{Credentials, PublicCredentials}; use aero_user::storage; -use aero_bayou::timestamp::now_msec; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::IMF; use crate::unique_ident::*; use crate::user::User; -use crate::mail::IMF; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs index fcdb21e..bec9669 100644 --- a/aero-collections/src/mail/mailbox.rs +++ b/aero-collections/src/mail/mailbox.rs @@ -2,15 +2,15 @@ use anyhow::{anyhow, bail, Result}; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; +use aero_bayou::timestamp::now_msec; +use aero_bayou::Bayou; use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; use aero_user::login::Credentials; use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; -use aero_bayou::Bayou; -use aero_bayou::timestamp::now_msec; -use crate::unique_ident::*; use crate::mail::uidindex::*; use crate::mail::IMF; +use crate::unique_ident::*; pub struct Mailbox { pub(super) id: UniqueIdent, diff --git a/aero-collections/src/mail/mod.rs b/aero-collections/src/mail/mod.rs index ca9b08b..584a9eb 100644 --- a/aero-collections/src/mail/mod.rs +++ b/aero-collections/src/mail/mod.rs @@ -1,9 +1,9 @@ pub mod incoming; pub mod mailbox; +pub mod namespace; pub mod query; pub mod snapshot; pub mod uidindex; -pub mod namespace; // Internet Message Format // aka RFC 822 - RFC 2822 - RFC 5322 diff --git a/aero-collections/src/mail/namespace.rs b/aero-collections/src/mail/namespace.rs index b1f6a70..0f1db7d 100644 --- a/aero-collections/src/mail/namespace.rs +++ b/aero-collections/src/mail/namespace.rs @@ -104,7 +104,11 @@ impl MailboxList { /// Ensures mailbox `name` maps to id `id`. /// If it already mapped to that, returns None. /// If a change had to be done, returns Some(new uidvalidity in mailbox). - pub(crate) fn set_mailbox(&mut self, name: &str, id: Option) -> Option { + pub(crate) fn set_mailbox( + &mut self, + name: &str, + id: Option, + ) -> Option { let (ts, id, uidvalidity) = match self.0.get_mut(name) { None => { if id.is_none() { diff --git a/aero-collections/src/mail/snapshot.rs b/aero-collections/src/mail/snapshot.rs index 9503d4d..6f8a8a8 100644 --- a/aero-collections/src/mail/snapshot.rs +++ b/aero-collections/src/mail/snapshot.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use anyhow::Result; -use crate::unique_ident::UniqueIdent; use super::mailbox::Mailbox; use super::query::{Query, QueryScope}; use super::uidindex::UidIndex; +use crate::unique_ident::UniqueIdent; /// A Frozen Mailbox has a snapshot of the current mailbox /// state that is desynchronized with the real mailbox state. diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs index ca975a3..6df3206 100644 --- a/aero-collections/src/mail/uidindex.rs +++ b/aero-collections/src/mail/uidindex.rs @@ -3,8 +3,8 @@ use std::num::{NonZeroU32, NonZeroU64}; use im::{HashMap, OrdMap, OrdSet}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use aero_bayou::*; use crate::unique_ident::UniqueIdent; +use aero_bayou::*; pub type ModSeq = NonZeroU64; pub type ImapUid = NonZeroU32; diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs index 9ed342f..f125c46 100644 --- a/aero-collections/src/user.rs +++ b/aero-collections/src/user.rs @@ -9,12 +9,15 @@ use aero_user::cryptoblob::{open_deserialize, seal_serialize}; use aero_user::login::Credentials; use aero_user::storage; +use crate::calendar::namespace::CalendarNs; use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; +use crate::mail::namespace::{ + CreatedMailbox, MailboxList, ARCHIVE, DRAFTS, INBOX, MAILBOX_HIERARCHY_DELIMITER, + MAILBOX_LIST_PK, MAILBOX_LIST_SK, SENT, TRASH, +}; use crate::mail::uidindex::ImapUidvalidity; use crate::unique_ident::UniqueIdent; -use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; -use crate::calendar::namespace::CalendarNs; //@FIXME User should be totally rewriten // to extract the local mailbox list -- cgit v1.2.3 From 18f2154151b2cf81e03bdda28fa2ea5d685e33d1 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 28 May 2024 16:03:25 +0200 Subject: implement propfind sync-token --- aero-collections/src/calendar/mod.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index cd05328..414426a 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -56,6 +56,11 @@ impl Calendar { self.internal.read().await.davdag.state().clone() } + /// Access the current token + pub async fn token(&self) -> Result { + self.internal.write().await.current_token().await + } + /// The diff API is a write API as we might need to push a merge node /// to get a new sync token pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { @@ -174,6 +179,12 @@ impl CalendarInternal { .map(|s| s.clone()) .collect(); + let token = self.current_token().await?; + Ok((token, changes)) + } + + async fn current_token(&mut self) -> Result { + let davstate = self.davdag.state(); let heads = davstate.heads_vec(); let token = match heads.as_slice() { [token] => *token, @@ -184,7 +195,6 @@ impl CalendarInternal { token } }; - - Ok((token, changes)) + Ok(token) } } -- cgit v1.2.3 From a2f5b451bd32780d60be69c6412cb351a54b765b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 28 May 2024 17:21:30 +0200 Subject: initial implementation of sync-collection --- aero-collections/src/davdag.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 36a9016..74e745f 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -42,7 +42,7 @@ pub struct DavDag { #[derive(Clone, Debug)] pub enum SyncChange { - Ok(FileName), + Ok((FileName, BlobId)), NotFound(FileName), } @@ -150,7 +150,8 @@ impl DavDag { // Record the change in the ephemeral synchronization map if let Some(sync_token) = sync_token { - self.change.insert(sync_token, SyncChange::Ok(filename)); + self.change + .insert(sync_token, SyncChange::Ok((filename, blob_id))); } } -- cgit v1.2.3 From f9fab60e5ee77c0cf57744e39b5685902189a38b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 29 May 2024 08:47:56 +0200 Subject: test report sync-collection --- aero-collections/src/calendar/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 414426a..ac07842 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -177,6 +177,10 @@ impl CalendarInternal { .iter() .filter_map(|t: &Token| davstate.change.get(t)) .map(|s| s.clone()) + .filter(|s| match s { + SyncChange::Ok((filename, _)) => davstate.idx_by_filename.get(filename).is_some(), + SyncChange::NotFound(filename) => davstate.idx_by_filename.get(filename).is_none(), + }) .collect(); let token = self.current_token().await?; -- cgit v1.2.3