From 0048b43f27d44133af233f44a3f9d318eabe6694 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 30 Jun 2022 12:07:01 +0200 Subject: Implement move and copy mail --- src/mail/mailbox.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++++----- src/mail/uidindex.rs | 9 ++++++ 2 files changed, 90 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 8057fe7..883bc08 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,13 +1,13 @@ use anyhow::{anyhow, bail, Result}; use k2v_client::K2vClient; use k2v_client::{BatchReadOp, Filter, K2vValue}; -use rusoto_s3::{DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3}; +use rusoto_s3::{CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3}; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; -use crate::cryptoblob::{self, gen_key, open_deserialize, Key}; +use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; @@ -88,14 +88,38 @@ impl Mailbox { /// Copy an email from an other Mailbox to this mailbox /// (use this when possible, as it allows for a certain number of storage optimizations) - pub async fn copy_from(&self, _from: &Mailbox, _uuid: UniqueIdent) -> Result<()> { - unimplemented!() + pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { + if self.id == from.id { + bail!("Cannot copy into same mailbox"); + } + + let (mut selflock, mut fromlock); + if self.id < from.id { + selflock = self.mbox.write().await; + fromlock = from.mbox.write().await; + } else { + fromlock = from.mbox.write().await; + selflock = self.mbox.write().await; + }; + selflock.copy_from(&fromlock, uuid).await } /// Move an email from an other Mailbox to this mailbox /// (use this when possible, as it allows for a certain number of storage optimizations) - pub async fn move_from(&self, _from: &Mailbox, _uuid: UniqueIdent) -> Result<()> { - unimplemented!() + pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { + if self.id == from.id { + bail!("Cannot copy move same mailbox"); + } + + let (mut selflock, mut fromlock); + if self.id < from.id { + selflock = self.mbox.write().await; + fromlock = from.mbox.write().await; + } else { + fromlock = from.mbox.write().await; + selflock = self.mbox.write().await; + }; + selflock.move_from(&mut fromlock, uuid).await } // ---- @@ -220,7 +244,7 @@ impl MailboxInternal { message_key: message_key.clone(), rfc822_size: mail.raw.len(), }; - let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?; + let meta_blob = seal_serialize(&meta, &self.encryption_key)?; self.k2v .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) .await?; @@ -239,6 +263,10 @@ impl MailboxInternal { } async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { + if !self.uid_index.state().table.contains_key(&ident) { + bail!("Cannot delete mail that doesn't exit"); + } + let del_mail_op = self.uid_index.state().op_mail_del(ident); self.uid_index.push(del_mail_op).await?; @@ -264,6 +292,52 @@ impl MailboxInternal { Ok(()) } + async fn copy_from(&mut self, from: &MailboxInternal, source_id: UniqueIdent) -> Result<()> { + let new_id = gen_ident(); + self.copy_internal(from, source_id, new_id).await + } + + async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> { + self.copy_internal(from, id, id).await?; + from.delete(id).await?; + Ok(()) + } + + async fn copy_internal(&mut self, from: &MailboxInternal, source_id: UniqueIdent, new_id: UniqueIdent) -> Result<()> { + let flags = from.uid_index.state() + .table.get(&source_id).ok_or(anyhow!("Source mail not found"))?.1.clone(); + + futures::try_join!( + async { + // Copy mail body from S3 + let mut cor = CopyObjectRequest::default(); + cor.bucket = self.bucket.clone(); + cor.key = format!("{}/{}", self.mail_path, new_id); + cor.copy_source = format!("{}/{}/{}", from.bucket, from.mail_path, source_id); + self.s3.copy_object(cor).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Copy mail meta in K2V + let meta = &from.fetch_meta(&[source_id]).await?[0]; + let meta_blob = seal_serialize(meta, &self.encryption_key)?; + self.k2v + .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None) + .await?; + Ok::<_, anyhow::Error>(()) + }, + )?; + + // Add mail to Bayou mail index + let add_mail_op = self + .uid_index + .state() + .op_mail_add(new_id, flags); + self.uid_index.push(add_mail_op).await?; + + Ok(()) + } + // ---- async fn test(&mut self) -> Result<()> { diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 7160ec6..736c763 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -36,6 +36,7 @@ pub enum UidIndexOp { MailDel(UniqueIdent), FlagAdd(UniqueIdent, Vec), FlagDel(UniqueIdent, Vec), + BumpUidvalidity(u32), } impl UidIndex { @@ -59,6 +60,11 @@ impl UidIndex { UidIndexOp::FlagDel(ident, flags) } + #[must_use] + pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp { + UidIndexOp::BumpUidvalidity(count) + } + // INTERNAL functions to keep state consistent fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, flags: &Vec) { @@ -156,6 +162,9 @@ impl BayouState for UidIndex { new.idx_by_flag.remove(*uid, rm_flags); } } + UidIndexOp::BumpUidvalidity(count) => { + new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count).unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); + } } new } -- cgit v1.2.3