From 3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 18 Dec 2023 17:09:44 +0100 Subject: Storage trait new implementation --- src/mail/incoming.rs | 103 ++++++++++++++++++++++++++------------------------- src/mail/mailbox.rs | 59 ++++++++++++++++------------- src/mail/user.rs | 68 ++++++++++++---------------------- 3 files changed, 111 insertions(+), 119 deletions(-) (limited to 'src/mail') diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index e3c729f..f6b831d 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use base64::Engine; use futures::{future::BoxFuture, FutureExt}; //use tokio::io::AsyncReadExt; use tokio::sync::watch; @@ -50,13 +51,11 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK); - - let k2v = creds.row_client()?; - let s3 = creds.blob_client()?; + let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let storage = creds.storage.build()?; let mut inbox: Option> = None; - let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK); + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); loop { let maybe_updated_incoming_key = if *lock_held.borrow() { @@ -64,9 +63,9 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match incoming_key.poll().await + match storage.row_poll(&incoming_key).await { - Ok(row_val) => break row_val.to_ref(), + Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -77,7 +76,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -119,7 +118,7 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { - match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { Ok(()) => { incoming_key = updated_incoming_key; } @@ -136,20 +135,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc, - blobs: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { - let mails_res = blobs.list("incoming/").await?; + let mails_res = storage.blob_list("incoming/").await?; for object in mails_res { if !*lock_held.borrow() { break; } - let key = object.key(); + let key = object.0; if let Some(mail_id) = key.strip_prefix("incoming/") { if let Ok(mail_id) = mail_id.parse::() { - move_incoming_message(user, blobs, inbox, mail_id).await?; + move_incoming_message(user, storage, inbox, mail_id).await?; } } } @@ -159,7 +158,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc, - s3: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc, id: UniqueIdent, ) -> Result<()> { @@ -168,14 +167,15 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let object = s3.blob(&object_key).fetch().await?; + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; // 1.a decrypt message key from headers //info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = object - .get_meta(MESSAGE_KEY) + .meta + .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::decode(key_encrypted_b64)?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, @@ -186,28 +186,28 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = object.content().ok_or(anyhow!("Missing object body"))?; + let obj_body = object.value; let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, object.to_ref(), message_key) + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) .await?; // 3 delete from incoming - object.to_ref().rm().await?; + storage.blob_rm(&object.blob_ref).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver { +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver { let (held_tx, held_rx) = watch::channel(false); - tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); held_rx } @@ -216,13 +216,12 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, storage::OrphanRowRef), + Held(UniqueIdent, u64, storage::RowRef), } async fn k2v_lock_loop_internal( - k2v: storage::RowStore, - pk: &'static str, - sk: &'static str, + storage: storage::Store, + row_ref: storage::RowRef, held_tx: watch::Sender, ) { let (state_tx, mut state_rx) = watch::channel::(LockState::Unknown); @@ -232,10 +231,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture> = async { - let mut ct = k2v.row(pk, sk); + let mut ct = row_ref.clone(); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match ct.poll().await { + match storage.row_poll(&ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -246,7 +245,7 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.content().iter() { + for v in cv.value.iter() { if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); @@ -260,7 +259,7 @@ async fn k2v_lock_loop_internal( } } } - let new_ct = cv.to_ref(); + let new_ct = cv.row_ref; info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", @@ -268,7 +267,7 @@ async fn k2v_lock_loop_internal( ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) .unwrap_or(LockState::Empty), )?; ct = new_ct; @@ -358,10 +357,10 @@ async fn k2v_lock_loop_internal( )); lock[8..].copy_from_slice(&our_pid.0); let row = match ct { - Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"), - None => k2v.row(pk, sk), + Some(existing) => existing, + None => row_ref.clone(), }; - if let Err(e) = row.set_value(&lock).push().await { + if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -377,7 +376,7 @@ async fn k2v_lock_loop_internal( info!("lock loop exited, releasing"); if !held_tx.is_closed() { - warn!("wierd..."); + warn!("weird..."); let _ = held_tx.send(false); } @@ -387,8 +386,10 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let row = k2v.from_orphan(ct).expect("Incompatible source & target storage"); - let _ = row.rm().await; + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; } } @@ -410,30 +411,32 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.blob_store()?; - let k2v_client = creds.storage.row_store()?; + let storage = creds.storage.build()?; // Get causality token of previous watch key - let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); - let watch_ct = match query.fetch().await { + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { Err(_) => query, - Ok(cv) => cv.to_ref(), + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - let mut send = s3_client - .blob(&format!("incoming/{}", gen_ident())) - .set_value(self.encrypted_body.clone().into()); - send.set_meta(MESSAGE_KEY, &key_header); - send.push().await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ).with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(&blob_val).await?; // Update watch key to signal new mail - watch_ct.set_value(gen_ident().0.as_ref()).push().await?; - + let watch_val = storage::RowVal::new( + watch_ct.clone(), + gen_ident().0.to_vec(), + ); + storage.row_insert(vec![watch_val]).await?; Ok(()) } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 060267a..b4afd5e 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{RowStore, BlobStore, self}; +use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -44,8 +44,7 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, encryption_key: creds.keys.master.clone(), - k2v: creds.storage.row_store()?, - s3: creds.storage.blob_store()?, + storage: creds.storage.build()?, uid_index, mail_path, }); @@ -178,10 +177,7 @@ struct MailboxInternal { id: UniqueIdent, mail_path: String, encryption_key: Key, - - k2v: RowStore, - s3: BlobStore, - + storage: Store, uid_index: Bayou, } @@ -200,15 +196,15 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::>(); - let res_vec = self.k2v.select(storage::Selector::List(ops)).await?; + let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::>(); + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; for res in res_vec.into_iter() { let mut meta_opt = None; // Resolve conflicts - for v in res.content().iter() { + for v in res.value.iter() { match v { storage::Alternative::Tombstone => (), storage::Alternative::Value(v) => { @@ -227,7 +223,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", res.to_ref().key()); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -235,9 +231,9 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?; - let body = obj_res.content().ok_or(anyhow!("missing body"))?; - cryptoblob::open(body, message_key) + let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -270,7 +266,10 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?; + self.storage.blob_insert(&BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -282,7 +281,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -307,14 +309,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - blob_ref: storage::BlobRef, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident)); - blob_ref.copy(&dst).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -326,7 +328,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -350,13 +355,13 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?; + self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?; + self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?; Ok::<_, anyhow::Error>(()) } )?; @@ -402,15 +407,19 @@ impl MailboxInternal { futures::try_join!( async { - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id)); - self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/mail/user.rs b/src/mail/user.rs index bdfb30c..8413cbf 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: storage::RowStore, + pub storage: storage::Store, pub mailboxes: std::sync::Mutex>>, tx_inbox_id: watch::Sender>, @@ -41,7 +41,7 @@ pub struct User { impl User { pub async fn new(username: String, creds: Credentials) -> Result> { - let cache_key = (username.clone(), creds.storage.clone()); + let cache_key = (username.clone(), creds.storage.unique()); { let cache = USER_CACHE.lock().unwrap(); @@ -81,11 +81,7 @@ impl User { let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; if mb_uidvalidity > uidvalidity { list.update_uidvalidity(name, mb_uidvalidity); - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } Ok(Some(mb)) } else { @@ -108,11 +104,7 @@ impl User { let (mut list, ct) = self.load_mailbox_list().await?; match list.create_mailbox(name) { CreatedMailbox::Created(_, _) => { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; Ok(()) } CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), @@ -129,11 +121,7 @@ impl User { if list.has_mailbox(name) { // TODO: actually delete mailbox contents list.set_mailbox(name, None); - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; Ok(()) } else { bail!("Mailbox {} does not exist", name); @@ -154,11 +142,7 @@ impl User { if old_name == INBOX { list.rename_mailbox(old_name, new_name)?; if !self.ensure_inbox_exists(&mut list, &ct).await? { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } } else { let names = list.existing_mailbox_names(); @@ -182,11 +166,7 @@ impl User { } } - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } Ok(()) } @@ -194,14 +174,14 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result> { - let k2v = creds.row_client()?; + let storage = creds.storage.build()?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); let user = Arc::new(Self { username, creds: creds.clone(), - k2v, + storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), }); @@ -245,19 +225,25 @@ impl User { // ---- Mailbox list management ---- async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { - let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), Ok(rv) => { let mut list = MailboxList::new(); - for v in rv.content() { + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(rv.to_ref())) + (list, Some(row_ref)) } }; @@ -278,11 +264,7 @@ impl User { let saved; let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { CreatedMailbox::Created(i, v) => { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(list, orphan).await?; + self.save_mailbox_list(list, ct.clone()).await?; saved = true; (i, v) } @@ -302,14 +284,12 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option, + ct: Option, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - let rref = match ct { - Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"), - None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), - }; - rref.set_value(&list_blob).push().await?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; Ok(()) } } @@ -482,6 +462,6 @@ enum CreatedMailbox { // ---- User cache ---- lazy_static! { - static ref USER_CACHE: std::sync::Mutex>> = + static ref USER_CACHE: std::sync::Mutex>> = std::sync::Mutex::new(HashMap::new()); } -- cgit v1.2.3