aboutsummaryrefslogblamecommitdiff
path: root/src/mail/mailbox.rs
blob: 19a95e08affe30d9e39be1bccf99e77851866be4 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                   
                          
                                                


                                                                                             

                                    
                        

                        
                                                                              
                              
                             
                                 
                     
                          
 
                    
                               
                                  
 
 
              




                                         

                                                 
 


                                                                       

                                                        






                                                                                        

         

                         

                                                
                                               
                                                      



                                     


                             

     
                                    







                                                                 

     

                                                  


                                                            
                                                        

     






















                                                                                           




                                                                                      
                                        






                                                               

     














                                                                               


                                                                   

     

                                                                                            




                                                                                    
                                     







                                                 



                                                                                            













                                                                                    
     






                                                                               
                    
                   

                        
 



                               
 
 
                      
                                                  



                                     




                                                          

                                                  


















                                                                              
 
                                  
                                                              



                                                                  














                                                                                          

                 




                                                                              


















                                                                                         











                                                                                     




                                                                                     





                                             





















                                                                             
                                                                             



                                                                                      

                                               
           
 
                                       








                                                                       

                                                
                              
































                                                                                      

                                               


                                       


                              
                                                         

                                                



                                                                  



                                                               
























                                                                        










                                                                                                 

















                                                                                     



















                                                                                              
                                                


                                       
                                                                            



                                                

















                                                             















                                                     












                                                                                 
use anyhow::{anyhow, bail, Result};
use k2v_client::K2vClient;
use k2v_client::{BatchReadOp, Filter, K2vValue};
use rusoto_s3::{
    CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3,
};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;

use crate::bayou::Bayou;
use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
use crate::time::now_msec;

pub struct Mailbox {
    pub(super) id: UniqueIdent,
    mbox: RwLock<MailboxInternal>,
}

impl Mailbox {
    pub(super) async fn open(
        creds: &Credentials,
        id: UniqueIdent,
        min_uidvalidity: ImapUidvalidity,
    ) -> Result<Self> {
        let index_path = format!("index/{}", id);
        let mail_path = format!("mail/{}", id);

        let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
        uid_index.sync().await?;

        let uidvalidity = uid_index.state().uidvalidity;
        if uidvalidity < min_uidvalidity {
            uid_index
                .push(
                    uid_index
                        .state()
                        .op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
                )
                .await?;
        }

        dump(&uid_index);

        let mbox = RwLock::new(MailboxInternal {
            id,
            bucket: creds.bucket().to_string(),
            encryption_key: creds.keys.master.clone(),
            k2v: creds.k2v_client()?,
            s3: creds.s3_client()?,
            uid_index,
            mail_path,
        });

        Ok(Self { id, mbox })
    }

    /// Sync data with backing store
    pub async fn force_sync(&self) -> Result<()> {
        self.mbox.write().await.force_sync().await
    }

    /// Sync data with backing store only if changes are detected
    /// or last sync is too old
    pub async fn opportunistic_sync(&self) -> Result<()> {
        self.mbox.write().await.opportunistic_sync().await
    }

    // ---- Functions for reading the mailbox ----

    /// Get a clone of the current UID Index of this mailbox
    /// (cloning is cheap so don't hesitate to use this)
    pub async fn current_uid_index(&self) -> UidIndex {
        self.mbox.read().await.uid_index.state().clone()
    }

    /// Fetch the metadata (headers + some more info) of the specified
    /// mail IDs
    pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
        self.mbox.read().await.fetch_meta(ids).await
    }

    /// Fetch an entire e-mail
    pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
        self.mbox.read().await.fetch_full(id, message_key).await
    }

    // ---- Functions for changing the mailbox ----

    /// Add flags to message
    pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
        self.mbox.write().await.add_flags(id, flags).await
    }

    /// Delete flags from message
    pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
        self.mbox.write().await.del_flags(id, flags).await
    }

    /// Define the new flags for this message
    pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
        self.mbox.write().await.set_flags(id, flags).await
    }

    /// Insert an email into the mailbox
    pub async fn append<'a>(
        &self,
        msg: IMF<'a>,
        ident: Option<UniqueIdent>,
        flags: &[Flag],
    ) -> Result<(ImapUidvalidity, ImapUid)> {
        self.mbox.write().await.append(msg, ident, flags).await
    }

    /// Insert an email into the mailbox, copying it from an existing S3 object
    pub async fn append_from_s3<'a>(
        &self,
        msg: IMF<'a>,
        ident: UniqueIdent,
        s3_key: &str,
        message_key: Key,
    ) -> Result<()> {
        self.mbox
            .write()
            .await
            .append_from_s3(msg, ident, s3_key, message_key)
            .await
    }

    /// Delete a message definitively from the mailbox
    pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
        self.mbox.write().await.delete(id).await
    }

    /// Copy an email from an other Mailbox to this mailbox
    /// (use this when possible, as it allows for a certain number of storage optimizations)
    pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
        if self.id == from.id {
            bail!("Cannot copy into same mailbox");
        }

        let (mut selflock, fromlock);
        if self.id < from.id {
            selflock = self.mbox.write().await;
            fromlock = from.mbox.write().await;
        } else {
            fromlock = from.mbox.write().await;
            selflock = self.mbox.write().await;
        };
        selflock.copy_from(&fromlock, uuid).await
    }

    /// Move an email from an other Mailbox to this mailbox
    /// (use this when possible, as it allows for a certain number of storage optimizations)
    pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
        if self.id == from.id {
            bail!("Cannot copy move same mailbox");
        }

        let (mut selflock, mut fromlock);
        if self.id < from.id {
            selflock = self.mbox.write().await;
            fromlock = from.mbox.write().await;
        } else {
            fromlock = from.mbox.write().await;
            selflock = self.mbox.write().await;
        };
        selflock.move_from(&mut fromlock, uuid).await
    }
}

// ----

// Non standard but common flags:
// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
struct MailboxInternal {
    id: UniqueIdent,
    bucket: String,
    mail_path: String,
    encryption_key: Key,

    k2v: K2vClient,
    s3: S3Client,

    uid_index: Bayou<UidIndex>,
}

impl MailboxInternal {
    async fn force_sync(&mut self) -> Result<()> {
        self.uid_index.sync().await?;
        Ok(())
    }

    async fn opportunistic_sync(&mut self) -> Result<()> {
        self.uid_index.opportunistic_sync().await?;
        Ok(())
    }

    // ---- Functions for reading the mailbox ----

    async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
        let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
        let ops = ids
            .iter()
            .map(|id| BatchReadOp {
                partition_key: &self.mail_path,
                filter: Filter {
                    start: Some(id),
                    end: None,
                    prefix: None,
                    limit: None,
                    reverse: false,
                },
                single_item: true,
                conflicts_only: false,
                tombstones: false,
            })
            .collect::<Vec<_>>();
        let res_vec = self.k2v.read_batch(&ops).await?;

        let mut meta_vec = vec![];
        for (op, res) in ops.iter().zip(res_vec.into_iter()) {
            if res.items.len() != 1 {
                bail!("Expected 1 item, got {}", res.items.len());
            }
            let (_, cv) = res.items.iter().next().unwrap();
            let mut meta_opt = None;
            for v in cv.value.iter() {
                match v {
                    K2vValue::Tombstone => (),
                    K2vValue::Value(v) => {
                        let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
                        match meta_opt.as_mut() {
                            None => {
                                meta_opt = Some(meta);
                            }
                            Some(prevmeta) => {
                                prevmeta.try_merge(meta)?;
                            }
                        }
                    }
                }
            }
            if let Some(meta) = meta_opt {
                meta_vec.push(meta);
            } else {
                bail!("No valid meta value in k2v for {:?}", op.filter.start);
            }
        }

        Ok(meta_vec)
    }

    async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
        let mut gor = GetObjectRequest::default();
        gor.bucket = self.bucket.clone();
        gor.key = format!("{}/{}", self.mail_path, id);

        let obj_res = self.s3.get_object(gor).await?;

        let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
        let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
        obj_body.into_async_read().read_to_end(&mut buf).await?;

        Ok(cryptoblob::open(&buf, &message_key)?)
    }

    // ---- Functions for changing the mailbox ----

    async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
        let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec());
        self.uid_index.push(add_flag_op).await
    }

    async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
        let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec());
        self.uid_index.push(del_flag_op).await
    }

    async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
        let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec());
        self.uid_index.push(set_flag_op).await
    }

    async fn append(
        &mut self,
        mail: IMF<'_>,
        ident: Option<UniqueIdent>,
        flags: &[Flag],
    ) -> Result<(ImapUidvalidity, ImapUid)> {
        let ident = ident.unwrap_or_else(|| gen_ident());
        let message_key = gen_key();

        futures::try_join!(
            async {
                // Encrypt and save mail body
                let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
                let mut por = PutObjectRequest::default();
                por.bucket = self.bucket.clone();
                por.key = format!("{}/{}", self.mail_path, ident);
                por.body = Some(message_blob.into());
                self.s3.put_object(por).await?;
                Ok::<_, anyhow::Error>(())
            },
            async {
                // Save mail meta
                let meta = MailMeta {
                    internaldate: now_msec(),
                    headers: mail.raw[..mail.parsed.offset_body].to_vec(),
                    message_key: message_key.clone(),
                    rfc822_size: mail.raw.len(),
                };
                let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
                self.k2v
                    .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
                    .await?;
                Ok::<_, anyhow::Error>(())
            },
            self.uid_index.opportunistic_sync()
        )?;

        // Add mail to Bayou mail index
        let uid_state = self.uid_index.state();
        let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());

        let uidvalidity = uid_state.uidvalidity;
        let uid = match add_mail_op {
            UidIndexOp::MailAdd(_, uid, _) => uid,
            _ => unreachable!(),
        };

        self.uid_index.push(add_mail_op).await?;

        Ok((uidvalidity, uid))
    }

    async fn append_from_s3<'a>(
        &mut self,
        mail: IMF<'a>,
        ident: UniqueIdent,
        s3_key: &str,
        message_key: Key,
    ) -> Result<()> {
        futures::try_join!(
            async {
                // Copy mail body from previous location
                let mut cor = CopyObjectRequest::default();
                cor.bucket = self.bucket.clone();
                cor.key = format!("{}/{}", self.mail_path, ident);
                cor.copy_source = format!("{}/{}", self.bucket, s3_key);
                cor.metadata_directive = Some("REPLACE".into());
                self.s3.copy_object(cor).await?;
                Ok::<_, anyhow::Error>(())
            },
            async {
                // Save mail meta
                let meta = MailMeta {
                    internaldate: now_msec(),
                    headers: mail.raw[..mail.parsed.offset_body].to_vec(),
                    message_key: message_key.clone(),
                    rfc822_size: mail.raw.len(),
                };
                let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
                self.k2v
                    .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
                    .await?;
                Ok::<_, anyhow::Error>(())
            },
            self.uid_index.opportunistic_sync()
        )?;

        // Add mail to Bayou mail index
        let add_mail_op = self
            .uid_index
            .state()
            .op_mail_add(ident, vec!["\\Unseen".into()]);
        self.uid_index.push(add_mail_op).await?;

        Ok(())
    }

    async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
        if !self.uid_index.state().table.contains_key(&ident) {
            bail!("Cannot delete mail that doesn't exit");
        }

        let del_mail_op = self.uid_index.state().op_mail_del(ident);
        self.uid_index.push(del_mail_op).await?;

        futures::try_join!(
            async {
                // Delete mail body from S3
                let mut dor = DeleteObjectRequest::default();
                dor.bucket = self.bucket.clone();
                dor.key = format!("{}/{}", self.mail_path, ident);
                self.s3.delete_object(dor).await?;
                Ok::<_, anyhow::Error>(())
            },
            async {
                // Delete mail meta from K2V
                let sk = ident.to_string();
                let v = self.k2v.read_item(&self.mail_path, &sk).await?;
                self.k2v
                    .delete_item(&self.mail_path, &sk, v.causality)
                    .await?;
                Ok::<_, anyhow::Error>(())
            }
        )?;
        Ok(())
    }

    async fn copy_from(&mut self, from: &MailboxInternal, source_id: UniqueIdent) -> Result<()> {
        let new_id = gen_ident();
        self.copy_internal(from, source_id, new_id).await
    }

    async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
        self.copy_internal(from, id, id).await?;
        from.delete(id).await?;
        Ok(())
    }

    async fn copy_internal(
        &mut self,
        from: &MailboxInternal,
        source_id: UniqueIdent,
        new_id: UniqueIdent,
    ) -> Result<()> {
        if self.bucket != from.bucket || self.encryption_key != from.encryption_key {
            bail!("Message to be copied/moved does not belong to same account.");
        }

        let flags = from
            .uid_index
            .state()
            .table
            .get(&source_id)
            .ok_or(anyhow!("Source mail not found"))?
            .1
            .clone();

        futures::try_join!(
            async {
                // Copy mail body from S3
                let mut cor = CopyObjectRequest::default();
                cor.bucket = self.bucket.clone();
                cor.key = format!("{}/{}", self.mail_path, new_id);
                cor.copy_source = format!("{}/{}/{}", from.bucket, from.mail_path, source_id);
                self.s3.copy_object(cor).await?;
                Ok::<_, anyhow::Error>(())
            },
            async {
                // Copy mail meta in K2V
                let meta = &from.fetch_meta(&[source_id]).await?[0];
                let meta_blob = seal_serialize(meta, &self.encryption_key)?;
                self.k2v
                    .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None)
                    .await?;
                Ok::<_, anyhow::Error>(())
            },
            self.uid_index.opportunistic_sync(),
        )?;

        // Add mail to Bayou mail index
        let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags);
        self.uid_index.push(add_mail_op).await?;

        Ok(())
    }
}

fn dump(uid_index: &Bayou<UidIndex>) {
    let s = uid_index.state();
    println!("---- MAILBOX STATE ----");
    println!("UIDVALIDITY {}", s.uidvalidity);
    println!("UIDNEXT {}", s.uidnext);
    println!("INTERNALSEQ {}", s.internalseq);
    for (uid, ident) in s.idx_by_uid.iter() {
        println!(
            "{} {} {}",
            uid,
            hex::encode(ident.0),
            s.table.get(ident).cloned().unwrap().1.join(", ")
        );
    }
    println!("");
}

// ----

/// The metadata of a message that is stored in K2V
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
#[derive(Serialize, Deserialize)]
pub struct MailMeta {
    /// INTERNALDATE field (milliseconds since epoch)
    pub internaldate: u64,
    /// Headers of the message
    pub headers: Vec<u8>,
    /// Secret key for decrypting entire message
    pub message_key: Key,
    /// RFC822 size
    pub rfc822_size: usize,
}

impl MailMeta {
    fn try_merge(&mut self, other: Self) -> Result<()> {
        if self.headers != other.headers
            || self.message_key != other.message_key
            || self.rfc822_size != other.rfc822_size
        {
            bail!("Conflicting MailMeta values.");
        }
        self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
        Ok(())
    }
}