diff options
Diffstat (limited to 'src/mail')
-rw-r--r-- | src/mail/incoming.rs | 188 | ||||
-rw-r--r-- | src/mail/mailbox.rs | 159 | ||||
-rw-r--r-- | src/mail/unique_ident.rs | 2 | ||||
-rw-r--r-- | src/mail/user.rs | 55 |
4 files changed, 177 insertions, 227 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b7d2f48..04d2ef1 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,28 +1,25 @@ -use std::collections::HashMap; +//use std::collections::HashMap; use std::convert::TryFrom; use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use base64::Engine; use futures::{future::BoxFuture, FutureExt}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; -use tokio::io::AsyncReadExt; +//use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; use crate::cryptoblob; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::{Credentials, PublicCredentials}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; -use crate::time::now_msec; +use crate::storage; +use crate::timestamp::now_msec; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; @@ -54,24 +51,23 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); - - let k2v = creds.k2v_client()?; - let s3 = creds.s3_client()?; + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); + let storage = creds.storage.build().await?; let mut inbox: Option<Arc<Mailbox>> = None; - let mut prev_ct: Option<CausalityToken> = None; + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); loop { - let new_mail = if *lock_held.borrow() { + let maybe_updated_incoming_key = if *lock_held.borrow() { info!("incoming lock held"); let wait_new_mail = async { loop { - match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct) - .await - { - Ok(cv) => break cv, + match storage.row_poll(&incoming_key).await { + Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -81,10 +77,10 @@ async fn incoming_mail_watch_process_internal( }; tokio::select! { - cv = wait_new_mail => Some(cv.causality), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(), - _ = lock_held.changed() => None, - _ = rx_inbox_id.changed() => None, + inc_k = wait_new_mail => Some(inc_k), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, } } else { info!("incoming lock not held"); @@ -123,10 +119,10 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail - if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { - match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { + if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { Ok(()) => { - prev_ct = Some(new_ct); + incoming_key = updated_incoming_key; } Err(e) => { error!("Could not fetch incoming mail: {}", e); @@ -141,27 +137,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc<User>, - s3: &S3Client, + storage: &storage::Store, inbox: &Arc<Mailbox>, lock_held: &watch::Receiver<bool>, ) -> Result<()> { - let lor = ListObjectsV2Request { - bucket: user.creds.storage.bucket.clone(), - max_keys: Some(1000), - prefix: Some("incoming/".into()), - ..Default::default() - }; - let mails_res = s3.list_objects_v2(lor).await?; + let mails_res = storage.blob_list("incoming/").await?; - for object in mails_res.contents.unwrap_or_default() { + for object in mails_res { if !*lock_held.borrow() { break; } - if let Some(key) = object.key { - if let Some(mail_id) = key.strip_prefix("incoming/") { - if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { - move_incoming_message(user, s3, inbox, mail_id).await?; - } + let key = object.0; + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { + move_incoming_message(user, storage, inbox, mail_id).await?; } } } @@ -171,7 +160,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc<User>, - s3: &S3Client, + storage: &storage::Store, inbox: &Arc<Mailbox>, id: UniqueIdent, ) -> Result<()> { @@ -180,22 +169,15 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let gor = GetObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - let get_result = s3.get_object(gor).await?; + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; // 1.a decrypt message key from headers - info!("Object metadata: {:?}", get_result.metadata); - let key_encrypted_b64 = get_result - .metadata - .as_ref() - .ok_or(anyhow!("Missing key in metadata"))? + //info!("Object metadata: {:?}", get_result.metadata); + let key_encrypted_b64 = object + .meta .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::decode(key_encrypted_b64)?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, @@ -206,38 +188,28 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; - let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); - obj_body - .into_async_read() - .read_to_end(&mut mail_buf) - .await?; - let plain_mail = cryptoblob::open(&mail_buf, &message_key) + let obj_body = object.value; + let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, &object_key, message_key) + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) .await?; // 3 delete from incoming - let dor = DeleteObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - s3.delete_object(dor).await?; + storage.blob_rm(&object.blob_ref).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> { +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> { let (held_tx, held_rx) = watch::channel(false); - tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); held_rx } @@ -246,13 +218,12 @@ fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::R enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, CausalityToken), + Held(UniqueIdent, u64, storage::RowRef), } async fn k2v_lock_loop_internal( - k2v: K2vClient, - pk: &'static str, - sk: &'static str, + storage: storage::Store, + row_ref: storage::RowRef, held_tx: watch::Sender<bool>, ) { let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown); @@ -262,10 +233,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture<Result<()>> = async { - let mut ct = None; + let mut ct = row_ref.clone(); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { + match storage.row_poll(&ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -277,7 +248,7 @@ async fn k2v_lock_loop_internal( Ok(cv) => { let mut lock_state = None; for v in cv.value.iter() { - if let K2vValue::Value(vbytes) = v { + if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); @@ -290,16 +261,18 @@ async fn k2v_lock_loop_internal( } } } + let new_ct = cv.row_ref; + info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", - ct, cv.causality, lock_state + ct, new_ct, lock_state ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) .unwrap_or(LockState::Empty), )?; - ct = Some(cv.causality); + ct = new_ct; } } } @@ -385,7 +358,14 @@ async fn k2v_lock_loop_internal( now_msec() + LOCK_DURATION.as_millis() as u64, )); lock[8..].copy_from_slice(&our_pid.0); - if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { + let row = match ct { + Some(existing) => existing, + None => row_ref.clone(), + }; + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -401,7 +381,7 @@ async fn k2v_lock_loop_internal( info!("lock loop exited, releasing"); if !held_tx.is_closed() { - warn!("wierd..."); + warn!("weird..."); let _ = held_tx.send(false); } @@ -411,7 +391,10 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let _ = k2v.delete_item(pk, sk, ct.clone()).await; + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; } } @@ -433,43 +416,30 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.s3_client()?; - let k2v_client = creds.storage.k2v_client()?; + let storage = creds.storage.build().await?; // Get causality token of previous watch key - let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { - Err(_) => None, - Ok(cv) => Some(cv.causality), + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { + Err(_) => query, + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); - - let por = PutObjectRequest { - bucket: creds.storage.bucket.clone(), - key: format!("incoming/{}", gen_ident()), - metadata: Some( - [(MESSAGE_KEY.to_string(), key_header)] - .into_iter() - .collect::<HashMap<_, _>>(), - ), - body: Some(self.encrypted_body.clone().into()), - ..Default::default() - }; - s3_client.put_object(por).await?; + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - // Update watch key to signal new mail - k2v_client - .insert_item( - INCOMING_PK, - INCOMING_WATCH_SK, - gen_ident().0.to_vec(), - watch_ct, - ) - .await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(blob_val).await?; + // Update watch key to signal new mail + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); + storage.row_insert(vec![watch_val]).await?; Ok(()) } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index d92140d..e424ba3 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,11 +1,5 @@ use anyhow::{anyhow, bail, Result}; -use k2v_client::K2vClient; -use k2v_client::{BatchReadOp, Filter, K2vValue}; -use rusoto_s3::{ - CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3, -}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; @@ -14,7 +8,8 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::time::now_msec; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; +use crate::timestamp::now_msec; pub struct Mailbox { pub(super) id: UniqueIdent, @@ -30,7 +25,7 @@ impl Mailbox { let index_path = format!("index/{}", id); let mail_path = format!("mail/{}", id); - let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?; + let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?; uid_index.sync().await?; let uidvalidity = uid_index.state().uidvalidity; @@ -48,10 +43,8 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, - bucket: creds.bucket().to_string(), encryption_key: creds.keys.master.clone(), - k2v: creds.k2v_client()?, - s3: creds.s3_client()?, + storage: creds.storage.build().await?, uid_index, mail_path, }); @@ -121,13 +114,13 @@ impl Mailbox { &self, msg: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_ref: storage::BlobRef, message_key: Key, ) -> Result<()> { self.mbox .write() .await - .append_from_s3(msg, ident, s3_key, message_key) + .append_from_s3(msg, ident, blob_ref, message_key) .await } @@ -182,13 +175,9 @@ struct MailboxInternal { // 2023-05-15 will probably be used later. #[allow(dead_code)] id: UniqueIdent, - bucket: String, mail_path: String, encryption_key: Key, - - k2v: K2vClient, - s3: S3Client, - + storage: Store, uid_index: Bayou<UidIndex>, } @@ -209,33 +198,19 @@ impl MailboxInternal { let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let ops = ids .iter() - .map(|id| BatchReadOp { - partition_key: &self.mail_path, - filter: Filter { - start: Some(id), - end: None, - prefix: None, - limit: None, - reverse: false, - }, - single_item: true, - conflicts_only: false, - tombstones: false, - }) + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) .collect::<Vec<_>>(); - let res_vec = self.k2v.read_batch(&ops).await?; + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; - for (op, res) in ops.iter().zip(res_vec.into_iter()) { - if res.items.len() != 1 { - bail!("Expected 1 item, got {}", res.items.len()); - } - let (_, cv) = res.items.iter().next().unwrap(); + for res in res_vec.into_iter() { let mut meta_opt = None; - for v in cv.value.iter() { + + // Resolve conflicts + for v in res.value.iter() { match v { - K2vValue::Tombstone => (), - K2vValue::Value(v) => { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?; match meta_opt.as_mut() { None => { @@ -251,7 +226,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", op.filter.start); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -259,19 +234,12 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, id), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - - cryptoblob::open(&buf, message_key) + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -304,13 +272,12 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - body: Some(message_blob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; Ok::<_, anyhow::Error>(()) }, async { @@ -322,8 +289,11 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, @@ -349,20 +319,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - copy_source: format!("{}/{}", self.bucket, s3_key), - metadata_directive: Some("REPLACE".into()), - ..Default::default() - }; - self.s3.copy_object(cor).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -374,8 +338,11 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, @@ -400,21 +367,26 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let v = self.k2v.read_item(&self.mail_path, &sk).await?; - self.k2v - .delete_item(&self.mail_path, &sk, v.causality) + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; + } Ok::<_, anyhow::Error>(()) } )?; @@ -445,7 +417,7 @@ impl MailboxInternal { source_id: UniqueIdent, new_id: UniqueIdent, ) -> Result<()> { - if self.bucket != from.bucket || self.encryption_key != from.encryption_key { + if self.encryption_key != from.encryption_key { bail!("Message to be copied/moved does not belong to same account."); } @@ -460,23 +432,20 @@ impl MailboxInternal { futures::try_join!( async { - // Copy mail body from S3 - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, new_id), - copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id), - ..Default::default() - }; - - self.s3.copy_object(cor).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs index 267f66e..0e629db 100644 --- a/src/mail/unique_ident.rs +++ b/src/mail/unique_ident.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use rand::prelude::*; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; -use crate::time::now_msec; +use crate::timestamp::now_msec; /// An internal Mail Identifier is composed of two components: /// - a process identifier, 128 bits, itself composed of: diff --git a/src/mail/user.rs b/src/mail/user.rs index 5523c2a..da0d509 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -2,18 +2,18 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Weak}; use anyhow::{anyhow, bail, Result}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use crate::cryptoblob::{open_deserialize, seal_serialize}; -use crate::login::{Credentials, StorageCredentials}; +use crate::login::Credentials; use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::{gen_ident, UniqueIdent}; -use crate::time::now_msec; +use crate::storage; +use crate::timestamp::now_msec; pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; @@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: K2vClient, + pub storage: storage::Store, pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>, tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>, @@ -41,7 +41,7 @@ pub struct User { impl User { pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> { - let cache_key = (username.clone(), creds.storage.clone()); + let cache_key = (username.clone(), creds.storage.unique()); { let cache = USER_CACHE.lock().unwrap(); @@ -165,6 +165,7 @@ impl User { list.rename_mailbox(name, &nnew)?; } } + self.save_mailbox_list(&list, ct).await?; } Ok(()) @@ -173,14 +174,14 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> { - let k2v = creds.k2v_client()?; + let storage = creds.storage.build().await?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); let user = Arc::new(Self { username, creds: creds.clone(), - k2v, + storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), }); @@ -223,32 +224,42 @@ impl User { // ---- Mailbox list management ---- - async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> { - let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await { - Err(k2v_client::Error::NotFound) => (MailboxList::new(), None), + async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { + Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), - Ok(cv) => { + Ok(rv) => { let mut list = MailboxList::new(); - for v in cv.value { - if let K2vValue::Value(vbytes) = v { + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { + if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(cv.causality)) + (list, Some(row_ref)) } }; - self.ensure_inbox_exists(&mut list, &ct).await?; + self.ensure_inbox_exists(&mut list, &row).await?; - Ok((list, ct)) + Ok((list, row)) } async fn ensure_inbox_exists( &self, list: &mut MailboxList, - ct: &Option<CausalityToken>, + ct: &Option<storage::RowRef>, ) -> Result<bool> { // If INBOX doesn't exist, create a new mailbox with that name // and save new mailbox list. @@ -277,12 +288,12 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option<CausalityToken>, + ct: Option<storage::RowRef>, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - self.k2v - .insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct) - .await?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; Ok(()) } } @@ -455,6 +466,6 @@ enum CreatedMailbox { // ---- User cache ---- lazy_static! { - static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> = + static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> = std::sync::Mutex::new(HashMap::new()); } |