aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src/mail/mailbox.rs
diff options
context:
space:
mode:
Diffstat (limited to 'aero-collections/src/mail/mailbox.rs')
-rw-r--r--aero-collections/src/mail/mailbox.rs525
1 files changed, 525 insertions, 0 deletions
diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs
new file mode 100644
index 0000000..a767678
--- /dev/null
+++ b/aero-collections/src/mail/mailbox.rs
@@ -0,0 +1,525 @@
+use anyhow::{anyhow, bail, Result};
+use serde::{Deserialize, Serialize};
+use tokio::sync::RwLock;
+
+use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
+use aero_user::login::Credentials;
+use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
+use aero_bayou::Bayou;
+use aero_bayou::timestamp::now_msec;
+
+use crate::mail::uidindex::*;
+use crate::mail::unique_ident::*;
+use crate::mail::IMF;
+
+pub struct Mailbox {
+ pub(super) id: UniqueIdent,
+ mbox: RwLock<MailboxInternal>,
+}
+
+impl Mailbox {
+ pub(crate) async fn open(
+ creds: &Credentials,
+ id: UniqueIdent,
+ min_uidvalidity: ImapUidvalidity,
+ ) -> Result<Self> {
+ let index_path = format!("index/{}", id);
+ let mail_path = format!("mail/{}", id);
+
+ let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
+ uid_index.sync().await?;
+
+ let uidvalidity = uid_index.state().uidvalidity;
+ if uidvalidity < min_uidvalidity {
+ uid_index
+ .push(
+ uid_index
+ .state()
+ .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
+ )
+ .await?;
+ }
+
+ // @FIXME reporting through opentelemetry or some logs
+ // info on the "shape" of the mailbox would be welcomed
+ /*
+ dump(&uid_index);
+ */
+
+ let mbox = RwLock::new(MailboxInternal {
+ id,
+ encryption_key: creds.keys.master.clone(),
+ storage: creds.storage.build().await?,
+ uid_index,
+ mail_path,
+ });
+
+ Ok(Self { id, mbox })
+ }
+
+ /// Sync data with backing store
+ pub async fn force_sync(&self) -> Result<()> {
+ self.mbox.write().await.force_sync().await
+ }
+
+ /// Sync data with backing store only if changes are detected
+ /// or last sync is too old
+ pub async fn opportunistic_sync(&self) -> Result<()> {
+ self.mbox.write().await.opportunistic_sync().await
+ }
+
+ /// Block until a sync has been done (due to changes in the event log)
+ pub async fn notify(&self) -> std::sync::Weak<tokio::sync::Notify> {
+ self.mbox.read().await.notifier()
+ }
+
+ // ---- Functions for reading the mailbox ----
+
+ /// Get a clone of the current UID Index of this mailbox
+ /// (cloning is cheap so don't hesitate to use this)
+ pub async fn current_uid_index(&self) -> UidIndex {
+ self.mbox.read().await.uid_index.state().clone()
+ }
+
+ /// Fetch the metadata (headers + some more info) of the specified
+ /// mail IDs
+ pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ self.mbox.read().await.fetch_meta(ids).await
+ }
+
+ /// Fetch an entire e-mail
+ pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ self.mbox.read().await.fetch_full(id, message_key).await
+ }
+
+ pub async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox {
+ super::snapshot::FrozenMailbox::new(self.clone()).await
+ }
+
+ // ---- Functions for changing the mailbox ----
+
+ /// Add flags to message
+ pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ self.mbox.write().await.add_flags(id, flags).await
+ }
+
+ /// Delete flags from message
+ pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ self.mbox.write().await.del_flags(id, flags).await
+ }
+
+ /// Define the new flags for this message
+ pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ self.mbox.write().await.set_flags(id, flags).await
+ }
+
+ /// Insert an email into the mailbox
+ pub async fn append<'a>(
+ &self,
+ msg: IMF<'a>,
+ ident: Option<UniqueIdent>,
+ flags: &[Flag],
+ ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
+ self.mbox.write().await.append(msg, ident, flags).await
+ }
+
+ /// Insert an email into the mailbox, copying it from an existing S3 object
+ pub async fn append_from_s3<'a>(
+ &self,
+ msg: IMF<'a>,
+ ident: UniqueIdent,
+ blob_ref: storage::BlobRef,
+ message_key: Key,
+ ) -> Result<()> {
+ self.mbox
+ .write()
+ .await
+ .append_from_s3(msg, ident, blob_ref, message_key)
+ .await
+ }
+
+ /// Delete a message definitively from the mailbox
+ pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
+ self.mbox.write().await.delete(id).await
+ }
+
+ /// Copy an email from an other Mailbox to this mailbox
+ /// (use this when possible, as it allows for a certain number of storage optimizations)
+ pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<UniqueIdent> {
+ if self.id == from.id {
+ bail!("Cannot copy into same mailbox");
+ }
+
+ let (mut selflock, fromlock);
+ if self.id < from.id {
+ selflock = self.mbox.write().await;
+ fromlock = from.mbox.write().await;
+ } else {
+ fromlock = from.mbox.write().await;
+ selflock = self.mbox.write().await;
+ };
+ selflock.copy_from(&fromlock, uuid).await
+ }
+
+ /// Move an email from an other Mailbox to this mailbox
+ /// (use this when possible, as it allows for a certain number of storage optimizations)
+ pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
+ if self.id == from.id {
+ bail!("Cannot copy move same mailbox");
+ }
+
+ let (mut selflock, mut fromlock);
+ if self.id < from.id {
+ selflock = self.mbox.write().await;
+ fromlock = from.mbox.write().await;
+ } else {
+ fromlock = from.mbox.write().await;
+ selflock = self.mbox.write().await;
+ };
+ selflock.move_from(&mut fromlock, uuid).await
+ }
+}
+
+// ----
+
+// Non standard but common flags:
+// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
+struct MailboxInternal {
+ // 2023-05-15 will probably be used later.
+ #[allow(dead_code)]
+ id: UniqueIdent,
+ mail_path: String,
+ encryption_key: Key,
+ storage: Store,
+ uid_index: Bayou<UidIndex>,
+}
+
+impl MailboxInternal {
+ async fn force_sync(&mut self) -> Result<()> {
+ self.uid_index.sync().await?;
+ Ok(())
+ }
+
+ async fn opportunistic_sync(&mut self) -> Result<()> {
+ self.uid_index.opportunistic_sync().await?;
+ Ok(())
+ }
+
+ fn notifier(&self) -> std::sync::Weak<tokio::sync::Notify> {
+ self.uid_index.notifier()
+ }
+
+ // ---- Functions for reading the mailbox ----
+
+ async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
+ let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
+ let ops = ids
+ .iter()
+ .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
+ .collect::<Vec<_>>();
+ let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
+
+ let mut meta_vec = vec![];
+ for res in res_vec.into_iter() {
+ let mut meta_opt = None;
+
+ // Resolve conflicts
+ for v in res.value.iter() {
+ match v {
+ storage::Alternative::Tombstone => (),
+ storage::Alternative::Value(v) => {
+ let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
+ match meta_opt.as_mut() {
+ None => {
+ meta_opt = Some(meta);
+ }
+ Some(prevmeta) => {
+ prevmeta.try_merge(meta)?;
+ }
+ }
+ }
+ }
+ }
+ if let Some(meta) = meta_opt {
+ meta_vec.push(meta);
+ } else {
+ bail!("No valid meta value in k2v for {:?}", res.row_ref);
+ }
+ }
+
+ Ok(meta_vec)
+ }
+
+ async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ let obj_res = self
+ .storage
+ .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
+ .await?;
+ let body = obj_res.value;
+ cryptoblob::open(&body, message_key)
+ }
+
+ // ---- Functions for changing the mailbox ----
+
+ async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec());
+ self.uid_index.push(add_flag_op).await
+ }
+
+ async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec());
+ self.uid_index.push(del_flag_op).await
+ }
+
+ async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
+ let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec());
+ self.uid_index.push(set_flag_op).await
+ }
+
+ async fn append(
+ &mut self,
+ mail: IMF<'_>,
+ ident: Option<UniqueIdent>,
+ flags: &[Flag],
+ ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
+ let ident = ident.unwrap_or_else(gen_ident);
+ let message_key = gen_key();
+
+ futures::try_join!(
+ async {
+ // Encrypt and save mail body
+ let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
+ self.storage
+ .blob_insert(BlobVal::new(
+ BlobRef(format!("{}/{}", self.mail_path, ident)),
+ message_blob,
+ ))
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Save mail meta
+ let meta = MailMeta {
+ internaldate: now_msec(),
+ headers: mail.parsed.raw_headers.to_vec(),
+ message_key: message_key.clone(),
+ rfc822_size: mail.raw.len(),
+ };
+ let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ self.uid_index.opportunistic_sync()
+ )?;
+
+ // Add mail to Bayou mail index
+ let uid_state = self.uid_index.state();
+ let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
+
+ let uidvalidity = uid_state.uidvalidity;
+ let (uid, modseq) = match add_mail_op {
+ UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq),
+ _ => unreachable!(),
+ };
+
+ self.uid_index.push(add_mail_op).await?;
+
+ Ok((uidvalidity, uid, modseq))
+ }
+
+ async fn append_from_s3<'a>(
+ &mut self,
+ mail: IMF<'a>,
+ ident: UniqueIdent,
+ blob_src: storage::BlobRef,
+ message_key: Key,
+ ) -> Result<()> {
+ futures::try_join!(
+ async {
+ // Copy mail body from previous location
+ let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
+ self.storage.blob_copy(&blob_src, &blob_dst).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Save mail meta
+ let meta = MailMeta {
+ internaldate: now_msec(),
+ headers: mail.parsed.raw_headers.to_vec(),
+ message_key: message_key.clone(),
+ rfc822_size: mail.raw.len(),
+ };
+ let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ self.uid_index.opportunistic_sync()
+ )?;
+
+ // Add mail to Bayou mail index
+ let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]);
+ self.uid_index.push(add_mail_op).await?;
+
+ Ok(())
+ }
+
+ async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
+ if !self.uid_index.state().table.contains_key(&ident) {
+ bail!("Cannot delete mail that doesn't exit");
+ }
+
+ let del_mail_op = self.uid_index.state().op_mail_del(ident);
+ self.uid_index.push(del_mail_op).await?;
+
+ futures::try_join!(
+ async {
+ // Delete mail body from S3
+ self.storage
+ .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Delete mail meta from K2V
+ let sk = ident.to_string();
+ let res = self
+ .storage
+ .row_fetch(&storage::Selector::Single(&RowRef::new(
+ &self.mail_path,
+ &sk,
+ )))
+ .await?;
+ if let Some(row_val) = res.into_iter().next() {
+ self.storage
+ .row_rm(&storage::Selector::Single(&row_val.row_ref))
+ .await?;
+ }
+ Ok::<_, anyhow::Error>(())
+ }
+ )?;
+ Ok(())
+ }
+
+ async fn copy_from(
+ &mut self,
+ from: &MailboxInternal,
+ source_id: UniqueIdent,
+ ) -> Result<UniqueIdent> {
+ let new_id = gen_ident();
+ self.copy_internal(from, source_id, new_id).await?;
+ Ok(new_id)
+ }
+
+ async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
+ self.copy_internal(from, id, id).await?;
+ from.delete(id).await?;
+ Ok(())
+ }
+
+ async fn copy_internal(
+ &mut self,
+ from: &MailboxInternal,
+ source_id: UniqueIdent,
+ new_id: UniqueIdent,
+ ) -> Result<()> {
+ if self.encryption_key != from.encryption_key {
+ bail!("Message to be copied/moved does not belong to same account.");
+ }
+
+ let flags = from
+ .uid_index
+ .state()
+ .table
+ .get(&source_id)
+ .ok_or(anyhow!("Source mail not found"))?
+ .2
+ .clone();
+
+ futures::try_join!(
+ async {
+ let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
+ let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
+ self.storage.blob_copy(&src, &dst).await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ async {
+ // Copy mail meta in K2V
+ let meta = &from.fetch_meta(&[source_id]).await?[0];
+ let meta_blob = seal_serialize(meta, &self.encryption_key)?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &new_id.to_string()),
+ meta_blob,
+ )])
+ .await?;
+ Ok::<_, anyhow::Error>(())
+ },
+ self.uid_index.opportunistic_sync(),
+ )?;
+
+ // Add mail to Bayou mail index
+ let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags);
+ self.uid_index.push(add_mail_op).await?;
+
+ Ok(())
+ }
+}
+
+// Can be useful to debug so we want this code
+// to be available to developers
+#[allow(dead_code)]
+fn dump(uid_index: &Bayou<UidIndex>) {
+ let s = uid_index.state();
+ println!("---- MAILBOX STATE ----");
+ println!("UIDVALIDITY {}", s.uidvalidity);
+ println!("UIDNEXT {}", s.uidnext);
+ println!("INTERNALSEQ {}", s.internalseq);
+ for (uid, ident) in s.idx_by_uid.iter() {
+ println!(
+ "{} {} {}",
+ uid,
+ hex::encode(ident.0),
+ s.table.get(ident).cloned().unwrap().2.join(", ")
+ );
+ }
+ println!();
+}
+
+// ----
+
+/// The metadata of a message that is stored in K2V
+/// at pk = mail/<mailbox uuid>, sk = <message uuid>
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MailMeta {
+ /// INTERNALDATE field (milliseconds since epoch)
+ pub internaldate: u64,
+ /// Headers of the message
+ pub headers: Vec<u8>,
+ /// Secret key for decrypting entire message
+ pub message_key: Key,
+ /// RFC822 size
+ pub rfc822_size: usize,
+}
+
+impl MailMeta {
+ fn try_merge(&mut self, other: Self) -> Result<()> {
+ if self.headers != other.headers
+ || self.message_key != other.message_key
+ || self.rfc822_size != other.rfc822_size
+ {
+ bail!("Conflicting MailMeta values.");
+ }
+ self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
+ Ok(())
+ }
+}