aboutsummaryrefslogtreecommitdiff
path: root/src/mail
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-12-18 17:09:44 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-12-18 17:09:44 +0100
commit3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf (patch)
treefff5d16e266788b28e812c24669f50118831512b /src/mail
parent684f4de225c44464abcb6a9cb2ef6dcae90537a8 (diff)
downloadaerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.tar.gz
aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.zip
Storage trait new implementation
Diffstat (limited to 'src/mail')
-rw-r--r--src/mail/incoming.rs103
-rw-r--r--src/mail/mailbox.rs59
-rw-r--r--src/mail/user.rs68
3 files changed, 111 insertions, 119 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index e3c729f..f6b831d 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -5,6 +5,7 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::{anyhow, bail, Result};
+use base64::Engine;
use futures::{future::BoxFuture, FutureExt};
//use tokio::io::AsyncReadExt;
use tokio::sync::watch;
@@ -50,13 +51,11 @@ async fn incoming_mail_watch_process_internal(
creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> {
- let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK);
-
- let k2v = creds.row_client()?;
- let s3 = creds.blob_client()?;
+ let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK));
+ let storage = creds.storage.build()?;
let mut inbox: Option<Arc<Mailbox>> = None;
- let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK);
+ let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
loop {
let maybe_updated_incoming_key = if *lock_held.borrow() {
@@ -64,9 +63,9 @@ async fn incoming_mail_watch_process_internal(
let wait_new_mail = async {
loop {
- match incoming_key.poll().await
+ match storage.row_poll(&incoming_key).await
{
- Ok(row_val) => break row_val.to_ref(),
+ Ok(row_val) => break row_val.row_ref,
Err(e) => {
error!("Error in wait_new_mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
@@ -77,7 +76,7 @@ async fn incoming_mail_watch_process_internal(
tokio::select! {
inc_k = wait_new_mail => Some(inc_k),
- _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")),
+ _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
}
@@ -119,7 +118,7 @@ async fn incoming_mail_watch_process_internal(
// If we were able to open INBOX, and we have mail,
// fetch new mail
if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
- match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
+ match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
Ok(()) => {
incoming_key = updated_incoming_key;
}
@@ -136,20 +135,20 @@ async fn incoming_mail_watch_process_internal(
async fn handle_incoming_mail(
user: &Arc<User>,
- blobs: &storage::BlobStore,
+ storage: &storage::Store,
inbox: &Arc<Mailbox>,
lock_held: &watch::Receiver<bool>,
) -> Result<()> {
- let mails_res = blobs.list("incoming/").await?;
+ let mails_res = storage.blob_list("incoming/").await?;
for object in mails_res {
if !*lock_held.borrow() {
break;
}
- let key = object.key();
+ let key = object.0;
if let Some(mail_id) = key.strip_prefix("incoming/") {
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
- move_incoming_message(user, blobs, inbox, mail_id).await?;
+ move_incoming_message(user, storage, inbox, mail_id).await?;
}
}
}
@@ -159,7 +158,7 @@ async fn handle_incoming_mail(
async fn move_incoming_message(
user: &Arc<User>,
- s3: &storage::BlobStore,
+ storage: &storage::Store,
inbox: &Arc<Mailbox>,
id: UniqueIdent,
) -> Result<()> {
@@ -168,14 +167,15 @@ async fn move_incoming_message(
let object_key = format!("incoming/{}", id);
// 1. Fetch message from S3
- let object = s3.blob(&object_key).fetch().await?;
+ let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
// 1.a decrypt message key from headers
//info!("Object metadata: {:?}", get_result.metadata);
let key_encrypted_b64 = object
- .get_meta(MESSAGE_KEY)
+ .meta
+ .get(MESSAGE_KEY)
.ok_or(anyhow!("Missing key in metadata"))?;
- let key_encrypted = base64::decode(key_encrypted_b64)?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
let message_key = sodiumoxide::crypto::sealedbox::open(
&key_encrypted,
&user.creds.keys.public,
@@ -186,28 +186,28 @@ async fn move_incoming_message(
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
// 1.b retrieve message body
- let obj_body = object.content().ok_or(anyhow!("Missing object body"))?;
+ let obj_body = object.value;
let plain_mail = cryptoblob::open(&obj_body, &message_key)
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
// 2 parse mail and add to inbox
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
inbox
- .append_from_s3(msg, id, object.to_ref(), message_key)
+ .append_from_s3(msg, id, object.blob_ref.clone(), message_key)
.await?;
// 3 delete from incoming
- object.to_ref().rm().await?;
+ storage.blob_rm(&object.blob_ref).await?;
Ok(())
}
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
-fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
+fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
let (held_tx, held_rx) = watch::channel(false);
- tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx));
+ tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
held_rx
}
@@ -216,13 +216,12 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) ->
enum LockState {
Unknown,
Empty,
- Held(UniqueIdent, u64, storage::OrphanRowRef),
+ Held(UniqueIdent, u64, storage::RowRef),
}
async fn k2v_lock_loop_internal(
- k2v: storage::RowStore,
- pk: &'static str,
- sk: &'static str,
+ storage: storage::Store,
+ row_ref: storage::RowRef,
held_tx: watch::Sender<bool>,
) {
let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
@@ -232,10 +231,10 @@ async fn k2v_lock_loop_internal(
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
let watch_lock_loop: BoxFuture<Result<()>> = async {
- let mut ct = k2v.row(pk, sk);
+ let mut ct = row_ref.clone();
loop {
info!("k2v watch lock loop iter: ct = {:?}", ct);
- match ct.poll().await {
+ match storage.row_poll(&ct).await {
Err(e) => {
error!(
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
@@ -246,7 +245,7 @@ async fn k2v_lock_loop_internal(
}
Ok(cv) => {
let mut lock_state = None;
- for v in cv.content().iter() {
+ for v in cv.value.iter() {
if let storage::Alternative::Value(vbytes) = v {
if vbytes.len() == 32 {
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
@@ -260,7 +259,7 @@ async fn k2v_lock_loop_internal(
}
}
}
- let new_ct = cv.to_ref();
+ let new_ct = cv.row_ref;
info!(
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
@@ -268,7 +267,7 @@ async fn k2v_lock_loop_internal(
);
state_tx.send(
lock_state
- .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan()))
+ .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
.unwrap_or(LockState::Empty),
)?;
ct = new_ct;
@@ -358,10 +357,10 @@ async fn k2v_lock_loop_internal(
));
lock[8..].copy_from_slice(&our_pid.0);
let row = match ct {
- Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"),
- None => k2v.row(pk, sk),
+ Some(existing) => existing,
+ None => row_ref.clone(),
};
- if let Err(e) = row.set_value(&lock).push().await {
+ if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await {
error!("Could not take lock: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
@@ -377,7 +376,7 @@ async fn k2v_lock_loop_internal(
info!("lock loop exited, releasing");
if !held_tx.is_closed() {
- warn!("wierd...");
+ warn!("weird...");
let _ = held_tx.send(false);
}
@@ -387,8 +386,10 @@ async fn k2v_lock_loop_internal(
_ => None,
};
if let Some(ct) = release {
- let row = k2v.from_orphan(ct).expect("Incompatible source & target storage");
- let _ = row.rm().await;
+ match storage.row_rm(&storage::Selector::Single(&ct)).await {
+ Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
+ Ok(_) => (),
+ };
}
}
@@ -410,30 +411,32 @@ impl EncryptedMessage {
}
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
- let s3_client = creds.storage.blob_store()?;
- let k2v_client = creds.storage.row_store()?;
+ let storage = creds.storage.build()?;
// Get causality token of previous watch key
- let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK);
- let watch_ct = match query.fetch().await {
+ let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+ let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
Err(_) => query,
- Ok(cv) => cv.to_ref(),
+ Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
};
// Write mail to encrypted storage
let encrypted_key =
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
- let key_header = base64::encode(&encrypted_key);
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
- let mut send = s3_client
- .blob(&format!("incoming/{}", gen_ident()))
- .set_value(self.encrypted_body.clone().into());
- send.set_meta(MESSAGE_KEY, &key_header);
- send.push().await?;
+ let blob_val = storage::BlobVal::new(
+ storage::BlobRef(format!("incoming/{}", gen_ident())),
+ self.encrypted_body.clone().into(),
+ ).with_meta(MESSAGE_KEY.to_string(), key_header);
+ storage.blob_insert(&blob_val).await?;
// Update watch key to signal new mail
- watch_ct.set_value(gen_ident().0.as_ref()).push().await?;
-
+ let watch_val = storage::RowVal::new(
+ watch_ct.clone(),
+ gen_ident().0.to_vec(),
+ );
+ storage.row_insert(vec![watch_val]).await?;
Ok(())
}
}
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index 060267a..b4afd5e 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -8,7 +8,7 @@ use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
-use crate::storage::{RowStore, BlobStore, self};
+use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self};
use crate::timestamp::now_msec;
pub struct Mailbox {
@@ -44,8 +44,7 @@ impl Mailbox {
let mbox = RwLock::new(MailboxInternal {
id,
encryption_key: creds.keys.master.clone(),
- k2v: creds.storage.row_store()?,
- s3: creds.storage.blob_store()?,
+ storage: creds.storage.build()?,
uid_index,
mail_path,
});
@@ -178,10 +177,7 @@ struct MailboxInternal {
id: UniqueIdent,
mail_path: String,
encryption_key: Key,
-
- k2v: RowStore,
- s3: BlobStore,
-
+ storage: Store,
uid_index: Bayou<UidIndex>,
}
@@ -200,15 +196,15 @@ impl MailboxInternal {
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
- let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
- let res_vec = self.k2v.select(storage::Selector::List(ops)).await?;
+ let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
+ let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
let mut meta_vec = vec![];
for res in res_vec.into_iter() {
let mut meta_opt = None;
// Resolve conflicts
- for v in res.content().iter() {
+ for v in res.value.iter() {
match v {
storage::Alternative::Tombstone => (),
storage::Alternative::Value(v) => {
@@ -227,7 +223,7 @@ impl MailboxInternal {
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
- bail!("No valid meta value in k2v for {:?}", res.to_ref().key());
+ bail!("No valid meta value in k2v for {:?}", res.row_ref);
}
}
@@ -235,9 +231,9 @@ impl MailboxInternal {
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?;
- let body = obj_res.content().ok_or(anyhow!("missing body"))?;
- cryptoblob::open(body, message_key)
+ let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?;
+ let body = obj_res.value;
+ cryptoblob::open(&body, message_key)
}
// ---- Functions for changing the mailbox ----
@@ -270,7 +266,10 @@ impl MailboxInternal {
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
- self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?;
+ self.storage.blob_insert(&BlobVal::new(
+ BlobRef(format!("{}/{}", self.mail_path, ident)),
+ message_blob,
+ )).await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -282,7 +281,10 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?;
+ self.storage.row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )]).await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@@ -307,14 +309,14 @@ impl MailboxInternal {
&mut self,
mail: IMF<'a>,
ident: UniqueIdent,
- blob_ref: storage::BlobRef,
+ blob_src: storage::BlobRef,
message_key: Key,
) -> Result<()> {
futures::try_join!(
async {
// Copy mail body from previous location
- let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident));
- blob_ref.copy(&dst).await?;
+ let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
+ self.storage.blob_copy(&blob_src, &blob_dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -326,7 +328,10 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?;
+ self.storage.row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )]).await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@@ -350,13 +355,13 @@ impl MailboxInternal {
futures::try_join!(
async {
// Delete mail body from S3
- self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?;
+ self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
- self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?;
+ self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?;
Ok::<_, anyhow::Error>(())
}
)?;
@@ -402,15 +407,19 @@ impl MailboxInternal {
futures::try_join!(
async {
- let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id));
- self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?;
+ let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
+ let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
+ self.storage.blob_copy(&src, &dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
- self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?;
+ self.storage.row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &new_id.to_string()),
+ meta_blob,
+ )]).await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync(),
diff --git a/src/mail/user.rs b/src/mail/user.rs
index bdfb30c..8413cbf 100644
--- a/src/mail/user.rs
+++ b/src/mail/user.rs
@@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list";
pub struct User {
pub username: String,
pub creds: Credentials,
- pub k2v: storage::RowStore,
+ pub storage: storage::Store,
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
@@ -41,7 +41,7 @@ pub struct User {
impl User {
pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
- let cache_key = (username.clone(), creds.storage.clone());
+ let cache_key = (username.clone(), creds.storage.unique());
{
let cache = USER_CACHE.lock().unwrap();
@@ -81,11 +81,7 @@ impl User {
let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
if mb_uidvalidity > uidvalidity {
list.update_uidvalidity(name, mb_uidvalidity);
- let orphan = match ct {
- Some(x) => Some(x.to_orphan()),
- None => None,
- };
- self.save_mailbox_list(&list, orphan).await?;
+ self.save_mailbox_list(&list, ct).await?;
}
Ok(Some(mb))
} else {
@@ -108,11 +104,7 @@ impl User {
let (mut list, ct) = self.load_mailbox_list().await?;
match list.create_mailbox(name) {
CreatedMailbox::Created(_, _) => {
- let orphan = match ct {
- Some(x) => Some(x.to_orphan()),
- None => None,
- };
- self.save_mailbox_list(&list, orphan).await?;
+ self.save_mailbox_list(&list, ct).await?;
Ok(())
}
CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
@@ -129,11 +121,7 @@ impl User {
if list.has_mailbox(name) {
// TODO: actually delete mailbox contents
list.set_mailbox(name, None);
- let orphan = match ct {
- Some(x) => Some(x.to_orphan()),
- None => None,
- };
- self.save_mailbox_list(&list, orphan).await?;
+ self.save_mailbox_list(&list, ct).await?;
Ok(())
} else {
bail!("Mailbox {} does not exist", name);
@@ -154,11 +142,7 @@ impl User {
if old_name == INBOX {
list.rename_mailbox(old_name, new_name)?;
if !self.ensure_inbox_exists(&mut list, &ct).await? {
- let orphan = match ct {
- Some(x) => Some(x.to_orphan()),
- None => None,
- };
- self.save_mailbox_list(&list, orphan).await?;
+ self.save_mailbox_list(&list, ct).await?;
}
} else {
let names = list.existing_mailbox_names();
@@ -182,11 +166,7 @@ impl User {
}
}
- let orphan = match ct {
- Some(x) => Some(x.to_orphan()),
- None => None,
- };
- self.save_mailbox_list(&list, orphan).await?;
+ self.save_mailbox_list(&list, ct).await?;
}
Ok(())
}
@@ -194,14 +174,14 @@ impl User {
// ---- Internal user & mailbox management ----
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
- let k2v = creds.row_client()?;
+ let storage = creds.storage.build()?;
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
let user = Arc::new(Self {
username,
creds: creds.clone(),
- k2v,
+ storage,
tx_inbox_id,
mailboxes: std::sync::Mutex::new(HashMap::new()),
});
@@ -245,19 +225,25 @@ impl User {
// ---- Mailbox list management ----
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
- let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await {
+ let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
+ let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await {
Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
Err(e) => return Err(e.into()),
Ok(rv) => {
let mut list = MailboxList::new();
- for v in rv.content() {
+ let (row_ref, row_vals) = match rv.into_iter().next() {
+ Some(row_val) => (row_val.row_ref, row_val.value),
+ None => (row_ref, vec![]),
+ };
+
+ for v in row_vals {
if let storage::Alternative::Value(vbytes) = v {
let list2 =
open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
list.merge(list2);
}
}
- (list, Some(rv.to_ref()))
+ (list, Some(row_ref))
}
};
@@ -278,11 +264,7 @@ impl User {
let saved;
let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) {
CreatedMailbox::Created(i, v) => {
- let orphan = match ct {
- Some(x) => Some(x.to_orphan()),
- None => None,
- };
- self.save_mailbox_list(list, orphan).await?;
+ self.save_mailbox_list(list, ct.clone()).await?;
saved = true;
(i, v)
}
@@ -302,14 +284,12 @@ impl User {
async fn save_mailbox_list(
&self,
list: &MailboxList,
- ct: Option<storage::OrphanRowRef>,
+ ct: Option<storage::RowRef>,
) -> Result<()> {
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
- let rref = match ct {
- Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"),
- None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK),
- };
- rref.set_value(&list_blob).push().await?;
+ let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK));
+ let row_val = storage::RowVal::new(rref, list_blob);
+ self.storage.row_insert(vec![row_val]).await?;
Ok(())
}
}
@@ -482,6 +462,6 @@ enum CreatedMailbox {
// ---- User cache ----
lazy_static! {
- static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::Builders), Weak<User>>> =
+ static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> =
std::sync::Mutex::new(HashMap::new());
}