From 8ac3a8ce8ba268a3261e23694b8b62afa6a3ae37 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 1 Nov 2023 16:45:29 +0100 Subject: implement an AnyCredentials --- src/mail/user.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/user.rs b/src/mail/user.rs index 5523c2a..9d94563 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -30,9 +30,11 @@ pub const INBOX: &str = "INBOX"; const MAILBOX_LIST_PK: &str = "mailboxes"; const MAILBOX_LIST_SK: &str = "list"; -pub struct User { +use crate::storage::*; + +pub struct User { pub username: String, - pub creds: Credentials, + pub creds: Credentials, pub k2v: K2vClient, pub mailboxes: std::sync::Mutex>>, -- cgit v1.2.3 From cf8b9ac28d6813bd589f363ad3659dd215bd7cea Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 1 Nov 2023 17:18:58 +0100 Subject: mask implementation to the rest of the code --- src/mail/user.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/user.rs b/src/mail/user.rs index 9d94563..360786d 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -32,9 +32,9 @@ const MAILBOX_LIST_SK: &str = "list"; use crate::storage::*; -pub struct User { +pub struct User { pub username: String, - pub creds: Credentials, + pub creds: Credentials, pub k2v: K2vClient, pub mailboxes: std::sync::Mutex>>, -- cgit v1.2.3 From 26f14df3f460320b2e2d31deb9d3cef90f43790c Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 09:42:50 +0100 Subject: we are doomed with static types --- src/mail/incoming.rs | 5 +++-- src/mail/user.rs | 2 -- 2 files changed, 3 insertions(+), 4 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b7d2f48..7094b42 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -23,6 +23,7 @@ use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; use crate::time::now_msec; +use crate::storage::Sto; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; @@ -139,14 +140,14 @@ async fn incoming_mail_watch_process_internal( Ok(()) } -async fn handle_incoming_mail( +async fn handle_incoming_mail( user: &Arc, s3: &S3Client, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { let lor = ListObjectsV2Request { - bucket: user.creds.storage.bucket.clone(), + bucket: user.creds.storage.engine::().bucket.clone(), max_keys: Some(1000), prefix: Some("incoming/".into()), ..Default::default() diff --git a/src/mail/user.rs b/src/mail/user.rs index 360786d..5523c2a 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -30,8 +30,6 @@ pub const INBOX: &str = "INBOX"; const MAILBOX_LIST_PK: &str = "mailboxes"; const MAILBOX_LIST_SK: &str = "list"; -use crate::storage::*; - pub struct User { pub username: String, pub creds: Credentials, -- cgit v1.2.3 From 9aa58194d44fef8b0b916f6c96edd124ce13bf7b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 10:38:47 +0100 Subject: try dynamic dispatch --- src/mail/incoming.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 7094b42..b7d2f48 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -23,7 +23,6 @@ use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; use crate::time::now_msec; -use crate::storage::Sto; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; @@ -140,14 +139,14 @@ async fn incoming_mail_watch_process_internal( Ok(()) } -async fn handle_incoming_mail( +async fn handle_incoming_mail( user: &Arc, s3: &S3Client, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { let lor = ListObjectsV2Request { - bucket: user.creds.storage.engine::().bucket.clone(), + bucket: user.creds.storage.bucket.clone(), max_keys: Some(1000), prefix: Some("incoming/".into()), ..Default::default() -- cgit v1.2.3 From 553ea25f1854706b60ce6f087545968533ef6140 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 11:51:03 +0100 Subject: gradually implement our interface --- src/mail/mailbox.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/mail') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index d92140d..614382e 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -14,6 +14,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; +use crate::storage::{RowStore, BlobStore}; use crate::time::now_msec; pub struct Mailbox { @@ -50,8 +51,8 @@ impl Mailbox { id, bucket: creds.bucket().to_string(), encryption_key: creds.keys.master.clone(), - k2v: creds.k2v_client()?, - s3: creds.s3_client()?, + k2v: creds.storage.builders.row_store()?, + s3: creds.storage.builders.blob_store()?, uid_index, mail_path, }); @@ -186,8 +187,8 @@ struct MailboxInternal { mail_path: String, encryption_key: Key, - k2v: K2vClient, - s3: S3Client, + k2v: RowStore, + s3: BlobStore, uid_index: Bayou, } -- cgit v1.2.3 From 3b363b2a7803564231e001c215ab427c99c9435b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 12:18:43 +0100 Subject: implement equality+cmp for builders based on url --- src/mail/user.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/mail') diff --git a/src/mail/user.rs b/src/mail/user.rs index 5523c2a..2104455 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -13,6 +13,7 @@ use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::{gen_ident, UniqueIdent}; +use crate::storage; use crate::time::now_msec; pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; @@ -455,6 +456,6 @@ enum CreatedMailbox { // ---- User cache ---- lazy_static! { - static ref USER_CACHE: std::sync::Mutex>> = + static ref USER_CACHE: std::sync::Mutex>> = std::sync::Mutex::new(HashMap::new()); } -- cgit v1.2.3 From 1e192f93d5bf544c82fe91fb799d77e8b5d53afe Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 12:58:45 +0100 Subject: make all our objects send+sync --- src/mail/incoming.rs | 6 +++--- src/mail/user.rs | 34 ++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 19 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b7d2f48..3ea7d6a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -54,10 +54,10 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); + let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK); - let k2v = creds.k2v_client()?; - let s3 = creds.s3_client()?; + let k2v = creds.row_client()?; + let s3 = creds.blob_client()?; let mut inbox: Option> = None; let mut prev_ct: Option = None; diff --git a/src/mail/user.rs b/src/mail/user.rs index 2104455..3b8d4e7 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -34,7 +34,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: K2vClient, + pub k2v: storage::RowStore, pub mailboxes: std::sync::Mutex>>, tx_inbox_id: watch::Sender>, @@ -174,7 +174,7 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result> { - let k2v = creds.k2v_client()?; + let k2v = creds.row_client()?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); @@ -224,32 +224,32 @@ impl User { // ---- Mailbox list management ---- - async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { - let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await { - Err(k2v_client::Error::NotFound) => (MailboxList::new(), None), + async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { + let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await { + Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), - Ok(cv) => { + Ok(rv) => { let mut list = MailboxList::new(); - for v in cv.value { - if let K2vValue::Value(vbytes) = v { + for v in rv.content() { + if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(cv.causality)) + (list, Some(rv.to_ref())) } }; - self.ensure_inbox_exists(&mut list, &ct).await?; + self.ensure_inbox_exists(&mut list, &row).await?; - Ok((list, ct)) + Ok((list, row)) } async fn ensure_inbox_exists( &self, list: &mut MailboxList, - ct: &Option, + ct: &Option, ) -> Result { // If INBOX doesn't exist, create a new mailbox with that name // and save new mailbox list. @@ -278,12 +278,14 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option, + ct: Option, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - self.k2v - .insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct) - .await?; + let rref = match ct { + Some(x) => x, + None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), + }; + rref.set_value(list_blob).push().await?; Ok(()) } } -- cgit v1.2.3 From a65f5b25894faa9802d274beb394f40062c65bae Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 15:28:19 +0100 Subject: WIP rewrite mail/incoming --- src/mail/incoming.rs | 81 ++++++++++++++++++---------------------------------- src/mail/mailbox.rs | 6 ++-- 2 files changed, 30 insertions(+), 57 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 3ea7d6a..4e3fc8c 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -22,6 +22,7 @@ use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; +use crate::storage; use crate::time::now_msec; const INCOMING_PK: &str = "incoming"; @@ -60,18 +61,17 @@ async fn incoming_mail_watch_process_internal( let s3 = creds.blob_client()?; let mut inbox: Option> = None; - let mut prev_ct: Option = None; + let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK); loop { - let new_mail = if *lock_held.borrow() { + let maybe_updated_incoming_key = if *lock_held.borrow() { info!("incoming lock held"); let wait_new_mail = async { loop { - match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct) - .await + match incoming_key.poll().await { - Ok(cv) => break cv, + Ok(row_val) => break row_val.to_ref(), Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -81,10 +81,10 @@ async fn incoming_mail_watch_process_internal( }; tokio::select! { - cv = wait_new_mail => Some(cv.causality), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(), - _ = lock_held.changed() => None, - _ = rx_inbox_id.changed() => None, + inc_k = wait_new_mail => Some(inc_k), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, } } else { info!("incoming lock not held"); @@ -123,10 +123,10 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail - if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { + if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { Ok(()) => { - prev_ct = Some(new_ct); + incoming_key = updated_incoming_key; } Err(e) => { error!("Could not fetch incoming mail: {}", e); @@ -141,27 +141,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc, - s3: &S3Client, + blobs: &storage::BlobStore, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { - let lor = ListObjectsV2Request { - bucket: user.creds.storage.bucket.clone(), - max_keys: Some(1000), - prefix: Some("incoming/".into()), - ..Default::default() - }; - let mails_res = s3.list_objects_v2(lor).await?; + let mails_res = blobs.list("incoming/").await?; - for object in mails_res.contents.unwrap_or_default() { + for object in mails_res { if !*lock_held.borrow() { break; } - if let Some(key) = object.key { - if let Some(mail_id) = key.strip_prefix("incoming/") { - if let Ok(mail_id) = mail_id.parse::() { - move_incoming_message(user, s3, inbox, mail_id).await?; - } + let key = object.key(); + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::() { + move_incoming_message(user, blobs, inbox, mail_id).await?; } } } @@ -171,7 +164,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc, - s3: &S3Client, + s3: &storage::BlobStore, inbox: &Arc, id: UniqueIdent, ) -> Result<()> { @@ -180,20 +173,12 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let gor = GetObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - let get_result = s3.get_object(gor).await?; + let object = s3.blob(&object_key).fetch().await?; // 1.a decrypt message key from headers - info!("Object metadata: {:?}", get_result.metadata); - let key_encrypted_b64 = get_result - .metadata - .as_ref() - .ok_or(anyhow!("Missing key in metadata"))? - .get(MESSAGE_KEY) + //info!("Object metadata: {:?}", get_result.metadata); + let key_encrypted_b64 = object + .get_meta(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; let key_encrypted = base64::decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( @@ -206,13 +191,8 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; - let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); - obj_body - .into_async_read() - .read_to_end(&mut mail_buf) - .await?; - let plain_mail = cryptoblob::open(&mail_buf, &message_key) + let obj_body = object.content().ok_or(anyhow!("Missing object body"))?; + let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox @@ -222,19 +202,14 @@ async fn move_incoming_message( .await?; // 3 delete from incoming - let dor = DeleteObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - s3.delete_object(dor).await?; + object.to_ref().rm().await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver { +fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver { let (held_tx, held_rx) = watch::channel(false); tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); @@ -250,7 +225,7 @@ enum LockState { } async fn k2v_lock_loop_internal( - k2v: K2vClient, + k2v: storage::RowStore, pk: &'static str, sk: &'static str, held_tx: watch::Sender, diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 614382e..581f432 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -49,10 +49,9 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, - bucket: creds.bucket().to_string(), encryption_key: creds.keys.master.clone(), - k2v: creds.storage.builders.row_store()?, - s3: creds.storage.builders.blob_store()?, + k2v: creds.storage.row_store()?, + s3: creds.storage.blob_store()?, uid_index, mail_path, }); @@ -183,7 +182,6 @@ struct MailboxInternal { // 2023-05-15 will probably be used later. #[allow(dead_code)] id: UniqueIdent, - bucket: String, mail_path: String, encryption_key: Key, -- cgit v1.2.3 From bf67935c54f5f66f4cab4ceb58c1b5831b9421b0 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 16:17:11 +0100 Subject: add rust analyzer to the shell --- src/mail/incoming.rs | 2 +- src/mail/mailbox.rs | 18 ++++++------------ 2 files changed, 7 insertions(+), 13 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 4e3fc8c..e550e98 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -198,7 +198,7 @@ async fn move_incoming_message( // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, &object_key, message_key) + .append_from_s3(msg, id, object.to_ref(), message_key) .await?; // 3 delete from incoming diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 581f432..83039d5 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -14,7 +14,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{RowStore, BlobStore}; +use crate::storage::{RowStore, BlobStore, self}; use crate::time::now_msec; pub struct Mailbox { @@ -121,13 +121,13 @@ impl Mailbox { &self, msg: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_ref: storage::BlobRef, message_key: Key, ) -> Result<()> { self.mbox .write() .await - .append_from_s3(msg, ident, s3_key, message_key) + .append_from_s3(msg, ident, blob_ref, message_key) .await } @@ -348,20 +348,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_ref: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - copy_source: format!("{}/{}", self.bucket, s3_key), - metadata_directive: Some("REPLACE".into()), - ..Default::default() - }; - self.s3.copy_object(cor).await?; + let dst = self.s3.blob(format!("{}/{}", self.mail_path, ident)); + blob_ref.copy(dst).await?; Ok::<_, anyhow::Error>(()) }, async { -- cgit v1.2.3 From 652da6efd35f198289ba3de26b60eb2e228de73a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 2 Nov 2023 17:25:56 +0100 Subject: converted incoming mail --- src/mail/mailbox.rs | 106 ++++++++++++---------------------------------------- 1 file changed, 23 insertions(+), 83 deletions(-) (limited to 'src/mail') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 83039d5..e8111df 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,11 +1,5 @@ use anyhow::{anyhow, bail, Result}; -use k2v_client::K2vClient; -use k2v_client::{BatchReadOp, Filter, K2vValue}; -use rusoto_s3::{ - CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3, -}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; @@ -206,35 +200,18 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids - .iter() - .map(|id| BatchReadOp { - partition_key: &self.mail_path, - filter: Filter { - start: Some(id), - end: None, - prefix: None, - limit: None, - reverse: false, - }, - single_item: true, - conflicts_only: false, - tombstones: false, - }) - .collect::>(); - let res_vec = self.k2v.read_batch(&ops).await?; + let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::>(); + let res_vec = self.k2v.select(storage::Selector::List(ops)).await?; let mut meta_vec = vec![]; - for (op, res) in ops.iter().zip(res_vec.into_iter()) { - if res.items.len() != 1 { - bail!("Expected 1 item, got {}", res.items.len()); - } - let (_, cv) = res.items.iter().next().unwrap(); + for res in res_vec.into_iter() { let mut meta_opt = None; - for v in cv.value.iter() { + + // Resolve conflicts + for v in res.content().iter() { match v { - K2vValue::Tombstone => (), - K2vValue::Value(v) => { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { let meta = open_deserialize::(v, &self.encryption_key)?; match meta_opt.as_mut() { None => { @@ -250,7 +227,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", op.filter.start); + bail!("No valid meta value in k2v for {:?}", res.to_ref().sk()); } } @@ -258,19 +235,9 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, id), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - - cryptoblob::open(&buf, message_key) + let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?; + let body = obj_res.content().ok_or(anyhow!("missing body"))?; + cryptoblob::open(body, message_key) } // ---- Functions for changing the mailbox ---- @@ -303,13 +270,7 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - body: Some(message_blob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?; Ok::<_, anyhow::Error>(()) }, async { @@ -321,9 +282,7 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) - .await?; + self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -354,8 +313,8 @@ impl MailboxInternal { futures::try_join!( async { // Copy mail body from previous location - let dst = self.s3.blob(format!("{}/{}", self.mail_path, ident)); - blob_ref.copy(dst).await?; + let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident)); + blob_ref.copy(&dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -367,9 +326,7 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) - .await?; + self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -393,21 +350,13 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let v = self.k2v.read_item(&self.mail_path, &sk).await?; - self.k2v - .delete_item(&self.mail_path, &sk, v.causality) - .await?; + self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?; Ok::<_, anyhow::Error>(()) } )?; @@ -438,7 +387,7 @@ impl MailboxInternal { source_id: UniqueIdent, new_id: UniqueIdent, ) -> Result<()> { - if self.bucket != from.bucket || self.encryption_key != from.encryption_key { + if self.encryption_key != from.encryption_key { bail!("Message to be copied/moved does not belong to same account."); } @@ -453,24 +402,15 @@ impl MailboxInternal { futures::try_join!( async { - // Copy mail body from S3 - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, new_id), - copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id), - ..Default::default() - }; - - self.s3.copy_object(cor).await?; + let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id)); + self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None) - .await?; + self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), -- cgit v1.2.3 From 916b27d87ec7f5bff41f9dd888914d50ae067fc0 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 15 Nov 2023 15:56:43 +0100 Subject: WIP refactor storage (new timestamp.rs file) --- src/mail/incoming.rs | 3 +-- src/mail/mailbox.rs | 4 ++-- src/mail/unique_ident.rs | 2 +- src/mail/user.rs | 3 +-- 4 files changed, 5 insertions(+), 7 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index e550e98..c3a9390 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -15,7 +15,6 @@ use tokio::sync::watch; use tracing::{error, info, warn}; use crate::cryptoblob; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::{Credentials, PublicCredentials}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; @@ -23,7 +22,7 @@ use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; use crate::storage; -use crate::time::now_msec; +use crate::timestamp::now_msec; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index e8111df..f27d50a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -9,7 +9,7 @@ use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; use crate::storage::{RowStore, BlobStore, self}; -use crate::time::now_msec; +use crate::timestamp::now_msec; pub struct Mailbox { pub(super) id: UniqueIdent, @@ -227,7 +227,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", res.to_ref().sk()); + bail!("No valid meta value in k2v for {:?}", res.to_ref().key()); } } diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs index 267f66e..0e629db 100644 --- a/src/mail/unique_ident.rs +++ b/src/mail/unique_ident.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use rand::prelude::*; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; -use crate::time::now_msec; +use crate::timestamp::now_msec; /// An internal Mail Identifier is composed of two components: /// - a process identifier, 128 bits, itself composed of: diff --git a/src/mail/user.rs b/src/mail/user.rs index 3b8d4e7..6d3bc1a 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -2,7 +2,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Weak}; use anyhow::{anyhow, bail, Result}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use tokio::sync::watch; @@ -14,7 +13,7 @@ use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::{gen_ident, UniqueIdent}; use crate::storage; -use crate::time::now_msec; +use crate::timestamp::now_msec; pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; -- cgit v1.2.3 From 6da8b815b694a37d39a2be04c8e1585aac17954a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 16 Nov 2023 18:27:24 +0100 Subject: not very clear how we pass data across channel --- src/mail/incoming.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index c3a9390..9899ae8 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -81,7 +81,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } -- cgit v1.2.3 From 7eb690e49dd995663e8ea35b1a1f5b14584b4509 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 17 Nov 2023 10:46:13 +0100 Subject: introduce an "orphan" enum --- src/mail/user.rs | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) (limited to 'src/mail') diff --git a/src/mail/user.rs b/src/mail/user.rs index 6d3bc1a..7011dcc 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -81,7 +81,11 @@ impl User { let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; if mb_uidvalidity > uidvalidity { list.update_uidvalidity(name, mb_uidvalidity); - self.save_mailbox_list(&list, ct).await?; + let orphan = match ct { + Some(x) => Some(x.to_orphan()), + None => None, + }; + self.save_mailbox_list(&list, orphan).await?; } Ok(Some(mb)) } else { @@ -104,7 +108,11 @@ impl User { let (mut list, ct) = self.load_mailbox_list().await?; match list.create_mailbox(name) { CreatedMailbox::Created(_, _) => { - self.save_mailbox_list(&list, ct).await?; + let orphan = match ct { + Some(x) => Some(x.to_orphan()), + None => None, + }; + self.save_mailbox_list(&list, orphan).await?; Ok(()) } CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), @@ -121,7 +129,11 @@ impl User { if list.has_mailbox(name) { // TODO: actually delete mailbox contents list.set_mailbox(name, None); - self.save_mailbox_list(&list, ct).await?; + let orphan = match ct { + Some(x) => Some(x.to_orphan()), + None => None, + }; + self.save_mailbox_list(&list, orphan).await?; Ok(()) } else { bail!("Mailbox {} does not exist", name); @@ -142,7 +154,11 @@ impl User { if old_name == INBOX { list.rename_mailbox(old_name, new_name)?; if !self.ensure_inbox_exists(&mut list, &ct).await? { - self.save_mailbox_list(&list, ct).await?; + let orphan = match ct { + Some(x) => Some(x.to_orphan()), + None => None, + }; + self.save_mailbox_list(&list, orphan).await?; } } else { let names = list.existing_mailbox_names(); @@ -165,7 +181,12 @@ impl User { list.rename_mailbox(name, &nnew)?; } } - self.save_mailbox_list(&list, ct).await?; + + let orphan = match ct { + Some(x) => Some(x.to_orphan()), + None => None, + }; + self.save_mailbox_list(&list, orphan).await?; } Ok(()) } @@ -257,7 +278,11 @@ impl User { let saved; let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { CreatedMailbox::Created(i, v) => { - self.save_mailbox_list(list, ct.clone()).await?; + let orphan = match ct { + Some(x) => Some(x.to_orphan()), + None => None, + }; + self.save_mailbox_list(list, orphan).await?; saved = true; (i, v) } @@ -277,11 +302,11 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option, + ct: Option, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; let rref = match ct { - Some(x) => x, + Some(x) => self.k2v.from_orphan(x), None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), }; rref.set_value(list_blob).push().await?; -- cgit v1.2.3 From 4a33ac2265dae0e8fd1f7fbaec54ab7120334cbe Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 17 Nov 2023 12:15:44 +0100 Subject: incoming has been fully ported --- src/mail/incoming.rs | 74 +++++++++++++++++++++++----------------------------- 1 file changed, 32 insertions(+), 42 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 9899ae8..db22f3e 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +//use std::collections::HashMap; use std::convert::TryFrom; use std::sync::{Arc, Weak}; @@ -6,11 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, FutureExt}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; -use tokio::io::AsyncReadExt; +//use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; @@ -81,7 +77,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan())), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -220,7 +216,7 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, CausalityToken), + Held(UniqueIdent, u64, storage::OrphanRowRef), } async fn k2v_lock_loop_internal( @@ -236,10 +232,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture> = async { - let mut ct = None; + let mut ct = k2v.row(pk, sk); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { + match ct.poll().await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -250,8 +246,8 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.value.iter() { - if let K2vValue::Value(vbytes) = v { + for v in cv.content().iter() { + if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); @@ -264,16 +260,18 @@ async fn k2v_lock_loop_internal( } } } + let new_ct = cv.to_ref(); + info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", - ct, cv.causality, lock_state + ct, new_ct, lock_state ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) .unwrap_or(LockState::Empty), )?; - ct = Some(cv.causality); + ct = new_ct; } } } @@ -359,7 +357,11 @@ async fn k2v_lock_loop_internal( now_msec() + LOCK_DURATION.as_millis() as u64, )); lock[8..].copy_from_slice(&our_pid.0); - if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { + let row = match ct { + Some(orphan) => k2v.from_orphan(orphan), + None => k2v.row(pk, sk), + }; + if let Err(e) = row.set_value(lock).push().await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -385,7 +387,8 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let _ = k2v.delete_item(pk, sk, ct.clone()).await; + let row = k2v.from_orphan(ct); + let _ = row.rm().await; } } @@ -407,13 +410,14 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.s3_client()?; - let k2v_client = creds.storage.k2v_client()?; + let s3_client = creds.storage.blob_store()?; + let k2v_client = creds.storage.row_store()?; // Get causality token of previous watch key - let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { - Err(_) => None, - Ok(cv) => Some(cv.causality), + let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match query.fetch().await { + Err(_) => query, + Ok(cv) => cv.to_ref(), }; // Write mail to encrypted storage @@ -421,28 +425,14 @@ impl EncryptedMessage { sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); let key_header = base64::encode(&encrypted_key); - let por = PutObjectRequest { - bucket: creds.storage.bucket.clone(), - key: format!("incoming/{}", gen_ident()), - metadata: Some( - [(MESSAGE_KEY.to_string(), key_header)] - .into_iter() - .collect::>(), - ), - body: Some(self.encrypted_body.clone().into()), - ..Default::default() - }; - s3_client.put_object(por).await?; + let mut send = s3_client + .blob(&format!("incoming/{}", gen_ident())) + .set_value(self.encrypted_body.clone().into()); + send.set_meta(MESSAGE_KEY, &key_header); + send.push().await?; // Update watch key to signal new mail - k2v_client - .insert_item( - INCOMING_PK, - INCOMING_WATCH_SK, - gen_ident().0.to_vec(), - watch_ct, - ) - .await?; + watch_ct.set_value(gen_ident().0.to_vec()).push().await?; Ok(()) } -- cgit v1.2.3 From e92dc35564e91ce4e6a8defa9e8b52eef9e55fae Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 17 Nov 2023 15:02:43 +0100 Subject: fix orphan storage compatibility --- src/mail/incoming.rs | 6 +++--- src/mail/user.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index db22f3e..da4c819 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -77,7 +77,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan())), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -358,7 +358,7 @@ async fn k2v_lock_loop_internal( )); lock[8..].copy_from_slice(&our_pid.0); let row = match ct { - Some(orphan) => k2v.from_orphan(orphan), + Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"), None => k2v.row(pk, sk), }; if let Err(e) = row.set_value(lock).push().await { @@ -387,7 +387,7 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let row = k2v.from_orphan(ct); + let row = k2v.from_orphan(ct).expect("Incompatible source & target storage"); let _ = row.rm().await; } } diff --git a/src/mail/user.rs b/src/mail/user.rs index 7011dcc..7a3e5c7 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -306,7 +306,7 @@ impl User { ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; let rref = match ct { - Some(x) => self.k2v.from_orphan(x), + Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"), None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), }; rref.set_value(list_blob).push().await?; -- cgit v1.2.3 From 14c7a96c282e20ff0d5343a7a378554f34983d21 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 23 Nov 2023 15:04:47 +0100 Subject: extract setup logic --- src/mail/user.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/user.rs b/src/mail/user.rs index 7a3e5c7..bdfb30c 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::watch; use crate::cryptoblob::{open_deserialize, seal_serialize}; -use crate::login::{Credentials, StorageCredentials}; +use crate::login::Credentials; use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; @@ -309,7 +309,7 @@ impl User { Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"), None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), }; - rref.set_value(list_blob).push().await?; + rref.set_value(&list_blob).push().await?; Ok(()) } } -- cgit v1.2.3 From 8cd9801030e24c58621b3bed8723e8a8a4722ef8 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 23 Nov 2023 15:16:44 +0100 Subject: various fixes --- src/mail/incoming.rs | 4 ++-- src/mail/mailbox.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index da4c819..e3c729f 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -361,7 +361,7 @@ async fn k2v_lock_loop_internal( Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"), None => k2v.row(pk, sk), }; - if let Err(e) = row.set_value(lock).push().await { + if let Err(e) = row.set_value(&lock).push().await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -432,7 +432,7 @@ impl EncryptedMessage { send.push().await?; // Update watch key to signal new mail - watch_ct.set_value(gen_ident().0.to_vec()).push().await?; + watch_ct.set_value(gen_ident().0.as_ref()).push().await?; Ok(()) } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index f27d50a..060267a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -282,7 +282,7 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?; + self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -326,7 +326,7 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?; + self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -410,7 +410,7 @@ impl MailboxInternal { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(meta_blob).push().await?; + self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), -- cgit v1.2.3 From 3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 18 Dec 2023 17:09:44 +0100 Subject: Storage trait new implementation --- src/mail/incoming.rs | 103 ++++++++++++++++++++++++++------------------------- src/mail/mailbox.rs | 59 ++++++++++++++++------------- src/mail/user.rs | 68 ++++++++++++---------------------- 3 files changed, 111 insertions(+), 119 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index e3c729f..f6b831d 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use base64::Engine; use futures::{future::BoxFuture, FutureExt}; //use tokio::io::AsyncReadExt; use tokio::sync::watch; @@ -50,13 +51,11 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK); - - let k2v = creds.row_client()?; - let s3 = creds.blob_client()?; + let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let storage = creds.storage.build()?; let mut inbox: Option> = None; - let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK); + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); loop { let maybe_updated_incoming_key = if *lock_held.borrow() { @@ -64,9 +63,9 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match incoming_key.poll().await + match storage.row_poll(&incoming_key).await { - Ok(row_val) => break row_val.to_ref(), + Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -77,7 +76,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -119,7 +118,7 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { - match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { Ok(()) => { incoming_key = updated_incoming_key; } @@ -136,20 +135,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc, - blobs: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { - let mails_res = blobs.list("incoming/").await?; + let mails_res = storage.blob_list("incoming/").await?; for object in mails_res { if !*lock_held.borrow() { break; } - let key = object.key(); + let key = object.0; if let Some(mail_id) = key.strip_prefix("incoming/") { if let Ok(mail_id) = mail_id.parse::() { - move_incoming_message(user, blobs, inbox, mail_id).await?; + move_incoming_message(user, storage, inbox, mail_id).await?; } } } @@ -159,7 +158,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc, - s3: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc, id: UniqueIdent, ) -> Result<()> { @@ -168,14 +167,15 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let object = s3.blob(&object_key).fetch().await?; + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; // 1.a decrypt message key from headers //info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = object - .get_meta(MESSAGE_KEY) + .meta + .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::decode(key_encrypted_b64)?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, @@ -186,28 +186,28 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = object.content().ok_or(anyhow!("Missing object body"))?; + let obj_body = object.value; let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, object.to_ref(), message_key) + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) .await?; // 3 delete from incoming - object.to_ref().rm().await?; + storage.blob_rm(&object.blob_ref).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver { +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver { let (held_tx, held_rx) = watch::channel(false); - tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); held_rx } @@ -216,13 +216,12 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, storage::OrphanRowRef), + Held(UniqueIdent, u64, storage::RowRef), } async fn k2v_lock_loop_internal( - k2v: storage::RowStore, - pk: &'static str, - sk: &'static str, + storage: storage::Store, + row_ref: storage::RowRef, held_tx: watch::Sender, ) { let (state_tx, mut state_rx) = watch::channel::(LockState::Unknown); @@ -232,10 +231,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture> = async { - let mut ct = k2v.row(pk, sk); + let mut ct = row_ref.clone(); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match ct.poll().await { + match storage.row_poll(&ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -246,7 +245,7 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.content().iter() { + for v in cv.value.iter() { if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); @@ -260,7 +259,7 @@ async fn k2v_lock_loop_internal( } } } - let new_ct = cv.to_ref(); + let new_ct = cv.row_ref; info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", @@ -268,7 +267,7 @@ async fn k2v_lock_loop_internal( ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) .unwrap_or(LockState::Empty), )?; ct = new_ct; @@ -358,10 +357,10 @@ async fn k2v_lock_loop_internal( )); lock[8..].copy_from_slice(&our_pid.0); let row = match ct { - Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"), - None => k2v.row(pk, sk), + Some(existing) => existing, + None => row_ref.clone(), }; - if let Err(e) = row.set_value(&lock).push().await { + if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -377,7 +376,7 @@ async fn k2v_lock_loop_internal( info!("lock loop exited, releasing"); if !held_tx.is_closed() { - warn!("wierd..."); + warn!("weird..."); let _ = held_tx.send(false); } @@ -387,8 +386,10 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let row = k2v.from_orphan(ct).expect("Incompatible source & target storage"); - let _ = row.rm().await; + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; } } @@ -410,30 +411,32 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.blob_store()?; - let k2v_client = creds.storage.row_store()?; + let storage = creds.storage.build()?; // Get causality token of previous watch key - let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); - let watch_ct = match query.fetch().await { + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { Err(_) => query, - Ok(cv) => cv.to_ref(), + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - let mut send = s3_client - .blob(&format!("incoming/{}", gen_ident())) - .set_value(self.encrypted_body.clone().into()); - send.set_meta(MESSAGE_KEY, &key_header); - send.push().await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ).with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(&blob_val).await?; // Update watch key to signal new mail - watch_ct.set_value(gen_ident().0.as_ref()).push().await?; - + let watch_val = storage::RowVal::new( + watch_ct.clone(), + gen_ident().0.to_vec(), + ); + storage.row_insert(vec![watch_val]).await?; Ok(()) } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 060267a..b4afd5e 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{RowStore, BlobStore, self}; +use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -44,8 +44,7 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, encryption_key: creds.keys.master.clone(), - k2v: creds.storage.row_store()?, - s3: creds.storage.blob_store()?, + storage: creds.storage.build()?, uid_index, mail_path, }); @@ -178,10 +177,7 @@ struct MailboxInternal { id: UniqueIdent, mail_path: String, encryption_key: Key, - - k2v: RowStore, - s3: BlobStore, - + storage: Store, uid_index: Bayou, } @@ -200,15 +196,15 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::>(); - let res_vec = self.k2v.select(storage::Selector::List(ops)).await?; + let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::>(); + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; for res in res_vec.into_iter() { let mut meta_opt = None; // Resolve conflicts - for v in res.content().iter() { + for v in res.value.iter() { match v { storage::Alternative::Tombstone => (), storage::Alternative::Value(v) => { @@ -227,7 +223,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", res.to_ref().key()); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -235,9 +231,9 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?; - let body = obj_res.content().ok_or(anyhow!("missing body"))?; - cryptoblob::open(body, message_key) + let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -270,7 +266,10 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?; + self.storage.blob_insert(&BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -282,7 +281,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -307,14 +309,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - blob_ref: storage::BlobRef, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident)); - blob_ref.copy(&dst).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -326,7 +328,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -350,13 +355,13 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?; + self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?; + self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?; Ok::<_, anyhow::Error>(()) } )?; @@ -402,15 +407,19 @@ impl MailboxInternal { futures::try_join!( async { - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id)); - self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/mail/user.rs b/src/mail/user.rs index bdfb30c..8413cbf 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: storage::RowStore, + pub storage: storage::Store, pub mailboxes: std::sync::Mutex>>, tx_inbox_id: watch::Sender>, @@ -41,7 +41,7 @@ pub struct User { impl User { pub async fn new(username: String, creds: Credentials) -> Result> { - let cache_key = (username.clone(), creds.storage.clone()); + let cache_key = (username.clone(), creds.storage.unique()); { let cache = USER_CACHE.lock().unwrap(); @@ -81,11 +81,7 @@ impl User { let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; if mb_uidvalidity > uidvalidity { list.update_uidvalidity(name, mb_uidvalidity); - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } Ok(Some(mb)) } else { @@ -108,11 +104,7 @@ impl User { let (mut list, ct) = self.load_mailbox_list().await?; match list.create_mailbox(name) { CreatedMailbox::Created(_, _) => { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; Ok(()) } CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), @@ -129,11 +121,7 @@ impl User { if list.has_mailbox(name) { // TODO: actually delete mailbox contents list.set_mailbox(name, None); - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; Ok(()) } else { bail!("Mailbox {} does not exist", name); @@ -154,11 +142,7 @@ impl User { if old_name == INBOX { list.rename_mailbox(old_name, new_name)?; if !self.ensure_inbox_exists(&mut list, &ct).await? { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } } else { let names = list.existing_mailbox_names(); @@ -182,11 +166,7 @@ impl User { } } - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } Ok(()) } @@ -194,14 +174,14 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result> { - let k2v = creds.row_client()?; + let storage = creds.storage.build()?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); let user = Arc::new(Self { username, creds: creds.clone(), - k2v, + storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), }); @@ -245,19 +225,25 @@ impl User { // ---- Mailbox list management ---- async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { - let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), Ok(rv) => { let mut list = MailboxList::new(); - for v in rv.content() { + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(rv.to_ref())) + (list, Some(row_ref)) } }; @@ -278,11 +264,7 @@ impl User { let saved; let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { CreatedMailbox::Created(i, v) => { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(list, orphan).await?; + self.save_mailbox_list(list, ct.clone()).await?; saved = true; (i, v) } @@ -302,14 +284,12 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option, + ct: Option, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - let rref = match ct { - Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"), - None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), - }; - rref.set_value(&list_blob).push().await?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; Ok(()) } } @@ -482,6 +462,6 @@ enum CreatedMailbox { // ---- User cache ---- lazy_static! { - static ref USER_CACHE: std::sync::Mutex>> = + static ref USER_CACHE: std::sync::Mutex>> = std::sync::Mutex::new(HashMap::new()); } -- cgit v1.2.3 From c75f2d91ff969dd791cb476031ee80870c6ad61a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 19 Dec 2023 19:02:22 +0100 Subject: implemented an in memory storage --- src/mail/incoming.rs | 2 +- src/mail/mailbox.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index f6b831d..2a6c947 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - match storage.row_rm(&storage::Selector::Single(&ct)).await { + match storage.row_rm_single(&ct).await { Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), Ok(_) => (), }; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index b4afd5e..65f44b1 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -361,7 +361,12 @@ impl MailboxInternal { async { // Delete mail meta from K2V let sk = ident.to_string(); - self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?; + let res = self.storage + .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) + .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage.row_rm_single(&row_val.row_ref).await?; + } Ok::<_, anyhow::Error>(()) } )?; -- cgit v1.2.3 From 012c6ad6724b6a6e155ee717e6d558e1fe199e43 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 21 Dec 2023 21:54:36 +0100 Subject: initialize aws sdk with our info --- src/mail/incoming.rs | 6 +++--- src/mail/mailbox.rs | 4 ++-- src/mail/user.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 2a6c947..3eafac7 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -51,8 +51,8 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); - let storage = creds.storage.build()?; + let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let storage = creds.storage.build().await?; let mut inbox: Option> = None; let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); @@ -411,7 +411,7 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let storage = creds.storage.build()?; + let storage = creds.storage.build().await?; // Get causality token of previous watch key let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 65f44b1..60a91dd 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -25,7 +25,7 @@ impl Mailbox { let index_path = format!("index/{}", id); let mail_path = format!("mail/{}", id); - let mut uid_index = Bayou::::new(creds, index_path)?; + let mut uid_index = Bayou::::new(creds, index_path).await?; uid_index.sync().await?; let uidvalidity = uid_index.state().uidvalidity; @@ -44,7 +44,7 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, encryption_key: creds.keys.master.clone(), - storage: creds.storage.build()?, + storage: creds.storage.build().await?, uid_index, mail_path, }); diff --git a/src/mail/user.rs b/src/mail/user.rs index 8413cbf..8d12c58 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -174,7 +174,7 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result> { - let storage = creds.storage.build()?; + let storage = creds.storage.build().await?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); -- cgit v1.2.3 From 0f7764d9f05b3fccfa30ddebb52997200af13bf2 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 22 Dec 2023 19:32:07 +0100 Subject: s3 is now implemented --- src/mail/incoming.rs | 2 +- src/mail/mailbox.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 3eafac7..b17959a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -429,7 +429,7 @@ impl EncryptedMessage { storage::BlobRef(format!("incoming/{}", gen_ident())), self.encrypted_body.clone().into(), ).with_meta(MESSAGE_KEY.to_string(), key_header); - storage.blob_insert(&blob_val).await?; + storage.blob_insert(blob_val).await?; // Update watch key to signal new mail let watch_val = storage::RowVal::new( diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 60a91dd..c925f39 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -266,7 +266,7 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.storage.blob_insert(&BlobVal::new( + self.storage.blob_insert(BlobVal::new( BlobRef(format!("{}/{}", self.mail_path, ident)), message_blob, )).await?; -- cgit v1.2.3 From 54c9736a247bb3534a285caa637c9afb052bc2dd Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Dec 2023 14:58:09 +0100 Subject: implemente garage storage --- src/mail/incoming.rs | 2 +- src/mail/mailbox.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b17959a..7e33a9a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - match storage.row_rm_single(&ct).await { + match storage.row_rm(&storage::Selector::Single(&ct)).await { Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), Ok(_) => (), }; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c925f39..6fb7dea 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -365,7 +365,7 @@ impl MailboxInternal { .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) .await?; if let Some(row_val) = res.into_iter().next() { - self.storage.row_rm_single(&row_val.row_ref).await?; + self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?; } Ok::<_, anyhow::Error>(()) } -- cgit v1.2.3 From 7ac24ad913fa081e1bd6f5b042b9da0173dad267 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Dec 2023 14:58:28 +0100 Subject: cargo format --- src/mail/incoming.rs | 21 +++++++++-------- src/mail/mailbox.rs | 64 +++++++++++++++++++++++++++++++++++----------------- src/mail/user.rs | 6 ++++- 3 files changed, 60 insertions(+), 31 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 7e33a9a..04d2ef1 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -51,7 +51,10 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); let storage = creds.storage.build().await?; let mut inbox: Option> = None; @@ -63,8 +66,7 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match storage.row_poll(&incoming_key).await - { + match storage.row_poll(&incoming_key).await { Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); @@ -360,7 +362,10 @@ async fn k2v_lock_loop_internal( Some(existing) => existing, None => row_ref.clone(), }; - if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -428,14 +433,12 @@ impl EncryptedMessage { let blob_val = storage::BlobVal::new( storage::BlobRef(format!("incoming/{}", gen_ident())), self.encrypted_body.clone().into(), - ).with_meta(MESSAGE_KEY.to_string(), key_header); + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); storage.blob_insert(blob_val).await?; // Update watch key to signal new mail - let watch_val = storage::RowVal::new( - watch_ct.clone(), - gen_ident().0.to_vec(), - ); + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); storage.row_insert(vec![watch_val]).await?; Ok(()) } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 6fb7dea..e424ba3 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -196,7 +196,10 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::>(); + let ops = ids + .iter() + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) + .collect::>(); let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; @@ -231,7 +234,10 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; let body = obj_res.value; cryptoblob::open(&body, message_key) } @@ -266,10 +272,12 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.storage.blob_insert(BlobVal::new( - BlobRef(format!("{}/{}", self.mail_path, ident)), - message_blob, - )).await?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; Ok::<_, anyhow::Error>(()) }, async { @@ -281,10 +289,12 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &ident.to_string()), - meta_blob, - )]).await?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -328,10 +338,12 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( + self.storage + .row_insert(vec![RowVal::new( RowRef::new(&self.mail_path, &ident.to_string()), meta_blob, - )]).await?; + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -355,17 +367,25 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let res = self.storage - .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) .await?; if let Some(row_val) = res.into_iter().next() { - self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?; + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; } Ok::<_, anyhow::Error>(()) } @@ -421,10 +441,12 @@ impl MailboxInternal { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &new_id.to_string()), - meta_blob, - )]).await?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/mail/user.rs b/src/mail/user.rs index 8d12c58..da0d509 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -226,7 +226,11 @@ impl User { async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); - let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), Ok(rv) => { -- cgit v1.2.3