aboutsummaryrefslogtreecommitdiff
path: root/src/mail
diff options
context:
space:
mode:
authorQuentin <quentin@dufour.io>2023-12-27 16:35:43 +0000
committerQuentin <quentin@dufour.io>2023-12-27 16:35:43 +0000
commit6ff3c6f71efd802da422a371e6168ae528fb2ddc (patch)
tree62b5d7d9bc7fd2bf3defd1a85ae1b3f34cd4b8ee /src/mail
parent609dde413972ebeeb8cd658a5ec9f62b34b5c402 (diff)
parentea4cd48bba96027882a637df08e313af92a3db46 (diff)
downloadaerogramme-6ff3c6f71efd802da422a371e6168ae528fb2ddc.tar.gz
aerogramme-6ff3c6f71efd802da422a371e6168ae528fb2ddc.zip
Add storage behind a trait
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/32
Diffstat (limited to 'src/mail')
-rw-r--r--src/mail/incoming.rs188
-rw-r--r--src/mail/mailbox.rs159
-rw-r--r--src/mail/unique_ident.rs2
-rw-r--r--src/mail/user.rs55
4 files changed, 177 insertions, 227 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index b7d2f48..04d2ef1 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -1,28 +1,25 @@
-use std::collections::HashMap;
+//use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::{anyhow, bail, Result};
+use base64::Engine;
use futures::{future::BoxFuture, FutureExt};
-use k2v_client::{CausalityToken, K2vClient, K2vValue};
-use rusoto_s3::{
- DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
-};
-use tokio::io::AsyncReadExt;
+//use tokio::io::AsyncReadExt;
use tokio::sync::watch;
use tracing::{error, info, warn};
use crate::cryptoblob;
-use crate::k2v_util::k2v_wait_value_changed;
use crate::login::{Credentials, PublicCredentials};
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::*;
use crate::mail::user::User;
use crate::mail::IMF;
-use crate::time::now_msec;
+use crate::storage;
+use crate::timestamp::now_msec;
const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
@@ -54,24 +51,23 @@ async fn incoming_mail_watch_process_internal(
creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> {
- let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK);
-
- let k2v = creds.k2v_client()?;
- let s3 = creds.s3_client()?;
+ let mut lock_held = k2v_lock_loop(
+ creds.storage.build().await?,
+ storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
+ );
+ let storage = creds.storage.build().await?;
let mut inbox: Option<Arc<Mailbox>> = None;
- let mut prev_ct: Option<CausalityToken> = None;
+ let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
loop {
- let new_mail = if *lock_held.borrow() {
+ let maybe_updated_incoming_key = if *lock_held.borrow() {
info!("incoming lock held");
let wait_new_mail = async {
loop {
- match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct)
- .await
- {
- Ok(cv) => break cv,
+ match storage.row_poll(&incoming_key).await {
+ Ok(row_val) => break row_val.row_ref,
Err(e) => {
error!("Error in wait_new_mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
@@ -81,10 +77,10 @@ async fn incoming_mail_watch_process_internal(
};
tokio::select! {
- cv = wait_new_mail => Some(cv.causality),
- _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(),
- _ = lock_held.changed() => None,
- _ = rx_inbox_id.changed() => None,
+ inc_k = wait_new_mail => Some(inc_k),
+ _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
+ _ = lock_held.changed() => None,
+ _ = rx_inbox_id.changed() => None,
}
} else {
info!("incoming lock not held");
@@ -123,10 +119,10 @@ async fn incoming_mail_watch_process_internal(
// If we were able to open INBOX, and we have mail,
// fetch new mail
- if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) {
- match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
+ if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
+ match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
Ok(()) => {
- prev_ct = Some(new_ct);
+ incoming_key = updated_incoming_key;
}
Err(e) => {
error!("Could not fetch incoming mail: {}", e);
@@ -141,27 +137,20 @@ async fn incoming_mail_watch_process_internal(
async fn handle_incoming_mail(
user: &Arc<User>,
- s3: &S3Client,
+ storage: &storage::Store,
inbox: &Arc<Mailbox>,
lock_held: &watch::Receiver<bool>,
) -> Result<()> {
- let lor = ListObjectsV2Request {
- bucket: user.creds.storage.bucket.clone(),
- max_keys: Some(1000),
- prefix: Some("incoming/".into()),
- ..Default::default()
- };
- let mails_res = s3.list_objects_v2(lor).await?;
+ let mails_res = storage.blob_list("incoming/").await?;
- for object in mails_res.contents.unwrap_or_default() {
+ for object in mails_res {
if !*lock_held.borrow() {
break;
}
- if let Some(key) = object.key {
- if let Some(mail_id) = key.strip_prefix("incoming/") {
- if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
- move_incoming_message(user, s3, inbox, mail_id).await?;
- }
+ let key = object.0;
+ if let Some(mail_id) = key.strip_prefix("incoming/") {
+ if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
+ move_incoming_message(user, storage, inbox, mail_id).await?;
}
}
}
@@ -171,7 +160,7 @@ async fn handle_incoming_mail(
async fn move_incoming_message(
user: &Arc<User>,
- s3: &S3Client,
+ storage: &storage::Store,
inbox: &Arc<Mailbox>,
id: UniqueIdent,
) -> Result<()> {
@@ -180,22 +169,15 @@ async fn move_incoming_message(
let object_key = format!("incoming/{}", id);
// 1. Fetch message from S3
- let gor = GetObjectRequest {
- bucket: user.creds.storage.bucket.clone(),
- key: object_key.clone(),
- ..Default::default()
- };
- let get_result = s3.get_object(gor).await?;
+ let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
// 1.a decrypt message key from headers
- info!("Object metadata: {:?}", get_result.metadata);
- let key_encrypted_b64 = get_result
- .metadata
- .as_ref()
- .ok_or(anyhow!("Missing key in metadata"))?
+ //info!("Object metadata: {:?}", get_result.metadata);
+ let key_encrypted_b64 = object
+ .meta
.get(MESSAGE_KEY)
.ok_or(anyhow!("Missing key in metadata"))?;
- let key_encrypted = base64::decode(key_encrypted_b64)?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
let message_key = sodiumoxide::crypto::sealedbox::open(
&key_encrypted,
&user.creds.keys.public,
@@ -206,38 +188,28 @@ async fn move_incoming_message(
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
// 1.b retrieve message body
- let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?;
- let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize);
- obj_body
- .into_async_read()
- .read_to_end(&mut mail_buf)
- .await?;
- let plain_mail = cryptoblob::open(&mail_buf, &message_key)
+ let obj_body = object.value;
+ let plain_mail = cryptoblob::open(&obj_body, &message_key)
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
// 2 parse mail and add to inbox
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
inbox
- .append_from_s3(msg, id, &object_key, message_key)
+ .append_from_s3(msg, id, object.blob_ref.clone(), message_key)
.await?;
// 3 delete from incoming
- let dor = DeleteObjectRequest {
- bucket: user.creds.storage.bucket.clone(),
- key: object_key.clone(),
- ..Default::default()
- };
- s3.delete_object(dor).await?;
+ storage.blob_rm(&object.blob_ref).await?;
Ok(())
}
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
-fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
+fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
let (held_tx, held_rx) = watch::channel(false);
- tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx));
+ tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
held_rx
}
@@ -246,13 +218,12 @@ fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::R
enum LockState {
Unknown,
Empty,
- Held(UniqueIdent, u64, CausalityToken),
+ Held(UniqueIdent, u64, storage::RowRef),
}
async fn k2v_lock_loop_internal(
- k2v: K2vClient,
- pk: &'static str,
- sk: &'static str,
+ storage: storage::Store,
+ row_ref: storage::RowRef,
held_tx: watch::Sender<bool>,
) {
let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
@@ -262,10 +233,10 @@ async fn k2v_lock_loop_internal(
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
let watch_lock_loop: BoxFuture<Result<()>> = async {
- let mut ct = None;
+ let mut ct = row_ref.clone();
loop {
info!("k2v watch lock loop iter: ct = {:?}", ct);
- match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
+ match storage.row_poll(&ct).await {
Err(e) => {
error!(
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
@@ -277,7 +248,7 @@ async fn k2v_lock_loop_internal(
Ok(cv) => {
let mut lock_state = None;
for v in cv.value.iter() {
- if let K2vValue::Value(vbytes) = v {
+ if let storage::Alternative::Value(vbytes) = v {
if vbytes.len() == 32 {
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
@@ -290,16 +261,18 @@ async fn k2v_lock_loop_internal(
}
}
}
+ let new_ct = cv.row_ref;
+
info!(
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
- ct, cv.causality, lock_state
+ ct, new_ct, lock_state
);
state_tx.send(
lock_state
- .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
+ .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
.unwrap_or(LockState::Empty),
)?;
- ct = Some(cv.causality);
+ ct = new_ct;
}
}
}
@@ -385,7 +358,14 @@ async fn k2v_lock_loop_internal(
now_msec() + LOCK_DURATION.as_millis() as u64,
));
lock[8..].copy_from_slice(&our_pid.0);
- if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
+ let row = match ct {
+ Some(existing) => existing,
+ None => row_ref.clone(),
+ };
+ if let Err(e) = storage
+ .row_insert(vec![storage::RowVal::new(row, lock)])
+ .await
+ {
error!("Could not take lock: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
@@ -401,7 +381,7 @@ async fn k2v_lock_loop_internal(
info!("lock loop exited, releasing");
if !held_tx.is_closed() {
- warn!("wierd...");
+ warn!("weird...");
let _ = held_tx.send(false);
}
@@ -411,7 +391,10 @@ async fn k2v_lock_loop_internal(
_ => None,
};
if let Some(ct) = release {
- let _ = k2v.delete_item(pk, sk, ct.clone()).await;
+ match storage.row_rm(&storage::Selector::Single(&ct)).await {
+ Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
+ Ok(_) => (),
+ };
}
}
@@ -433,43 +416,30 @@ impl EncryptedMessage {
}
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
- let s3_client = creds.storage.s3_client()?;
- let k2v_client = creds.storage.k2v_client()?;
+ let storage = creds.storage.build().await?;
// Get causality token of previous watch key
- let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
- Err(_) => None,
- Ok(cv) => Some(cv.causality),
+ let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
+ let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
+ Err(_) => query,
+ Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
};
// Write mail to encrypted storage
let encrypted_key =
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
- let key_header = base64::encode(&encrypted_key);
-
- let por = PutObjectRequest {
- bucket: creds.storage.bucket.clone(),
- key: format!("incoming/{}", gen_ident()),
- metadata: Some(
- [(MESSAGE_KEY.to_string(), key_header)]
- .into_iter()
- .collect::<HashMap<_, _>>(),
- ),
- body: Some(self.encrypted_body.clone().into()),
- ..Default::default()
- };
- s3_client.put_object(por).await?;
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
- // Update watch key to signal new mail
- k2v_client
- .insert_item(
- INCOMING_PK,
- INCOMING_WATCH_SK,
- gen_ident().0.to_vec(),
- watch_ct,
- )
- .await?;
+ let blob_val = storage::BlobVal::new(
+ storage::BlobRef(format!("incoming/{}", gen_ident())),
+ self.encrypted_body.clone().into(),
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+ storage.blob_insert(blob_val).await?;
+ // Update watch key to signal new mail
+ let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
+ storage.row_insert(vec![watch_val]).await?;
Ok(())
}
}
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index d92140d..e424ba3 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -1,11 +1,5 @@
use anyhow::{anyhow, bail, Result};
-use k2v_client::K2vClient;
-use k2v_client::{BatchReadOp, Filter, K2vValue};
-use rusoto_s3::{
- CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3,
-};
use serde::{Deserialize, Serialize};
-use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
@@ -14,7 +8,8 @@ use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
-use crate::time::now_msec;
+use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
+use crate::timestamp::now_msec;
pub struct Mailbox {
pub(super) id: UniqueIdent,
@@ -30,7 +25,7 @@ impl Mailbox {
let index_path = format!("index/{}", id);
let mail_path = format!("mail/{}", id);
- let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
+ let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
uid_index.sync().await?;
let uidvalidity = uid_index.state().uidvalidity;
@@ -48,10 +43,8 @@ impl Mailbox {
let mbox = RwLock::new(MailboxInternal {
id,
- bucket: creds.bucket().to_string(),
encryption_key: creds.keys.master.clone(),
- k2v: creds.k2v_client()?,
- s3: creds.s3_client()?,
+ storage: creds.storage.build().await?,
uid_index,
mail_path,
});
@@ -121,13 +114,13 @@ impl Mailbox {
&self,
msg: IMF<'a>,
ident: UniqueIdent,
- s3_key: &str,
+ blob_ref: storage::BlobRef,
message_key: Key,
) -> Result<()> {
self.mbox
.write()
.await
- .append_from_s3(msg, ident, s3_key, message_key)
+ .append_from_s3(msg, ident, blob_ref, message_key)
.await
}
@@ -182,13 +175,9 @@ struct MailboxInternal {
// 2023-05-15 will probably be used later.
#[allow(dead_code)]
id: UniqueIdent,
- bucket: String,
mail_path: String,
encryption_key: Key,
-
- k2v: K2vClient,
- s3: S3Client,
-
+ storage: Store,
uid_index: Bayou<UidIndex>,
}
@@ -209,33 +198,19 @@ impl MailboxInternal {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let ops = ids
.iter()
- .map(|id| BatchReadOp {
- partition_key: &self.mail_path,
- filter: Filter {
- start: Some(id),
- end: None,
- prefix: None,
- limit: None,
- reverse: false,
- },
- single_item: true,
- conflicts_only: false,
- tombstones: false,
- })
+ .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
.collect::<Vec<_>>();
- let res_vec = self.k2v.read_batch(&ops).await?;
+ let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
let mut meta_vec = vec![];
- for (op, res) in ops.iter().zip(res_vec.into_iter()) {
- if res.items.len() != 1 {
- bail!("Expected 1 item, got {}", res.items.len());
- }
- let (_, cv) = res.items.iter().next().unwrap();
+ for res in res_vec.into_iter() {
let mut meta_opt = None;
- for v in cv.value.iter() {
+
+ // Resolve conflicts
+ for v in res.value.iter() {
match v {
- K2vValue::Tombstone => (),
- K2vValue::Value(v) => {
+ storage::Alternative::Tombstone => (),
+ storage::Alternative::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
match meta_opt.as_mut() {
None => {
@@ -251,7 +226,7 @@ impl MailboxInternal {
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
- bail!("No valid meta value in k2v for {:?}", op.filter.start);
+ bail!("No valid meta value in k2v for {:?}", res.row_ref);
}
}
@@ -259,19 +234,12 @@ impl MailboxInternal {
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- let gor = GetObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, id),
- ..Default::default()
- };
-
- let obj_res = self.s3.get_object(gor).await?;
-
- let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
- let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
- obj_body.into_async_read().read_to_end(&mut buf).await?;
-
- cryptoblob::open(&buf, message_key)
+ let obj_res = self
+ .storage
+ .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
+ .await?;
+ let body = obj_res.value;
+ cryptoblob::open(&body, message_key)
}
// ---- Functions for changing the mailbox ----
@@ -304,13 +272,12 @@ impl MailboxInternal {
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
- let por = PutObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- body: Some(message_blob.into()),
- ..Default::default()
- };
- self.s3.put_object(por).await?;
+ self.storage
+ .blob_insert(BlobVal::new(
+ BlobRef(format!("{}/{}", self.mail_path, ident)),
+ message_blob,
+ ))
+ .await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -322,8 +289,11 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
.await?;
Ok::<_, anyhow::Error>(())
},
@@ -349,20 +319,14 @@ impl MailboxInternal {
&mut self,
mail: IMF<'a>,
ident: UniqueIdent,
- s3_key: &str,
+ blob_src: storage::BlobRef,
message_key: Key,
) -> Result<()> {
futures::try_join!(
async {
// Copy mail body from previous location
- let cor = CopyObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- copy_source: format!("{}/{}", self.bucket, s3_key),
- metadata_directive: Some("REPLACE".into()),
- ..Default::default()
- };
- self.s3.copy_object(cor).await?;
+ let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
+ self.storage.blob_copy(&blob_src, &blob_dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -374,8 +338,11 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
.await?;
Ok::<_, anyhow::Error>(())
},
@@ -400,21 +367,26 @@ impl MailboxInternal {
futures::try_join!(
async {
// Delete mail body from S3
- let dor = DeleteObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- ..Default::default()
- };
- self.s3.delete_object(dor).await?;
+ self.storage
+ .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
+ .await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
- let v = self.k2v.read_item(&self.mail_path, &sk).await?;
- self.k2v
- .delete_item(&self.mail_path, &sk, v.causality)
+ let res = self
+ .storage
+ .row_fetch(&storage::Selector::Single(&RowRef::new(
+ &self.mail_path,
+ &sk,
+ )))
.await?;
+ if let Some(row_val) = res.into_iter().next() {
+ self.storage
+ .row_rm(&storage::Selector::Single(&row_val.row_ref))
+ .await?;
+ }
Ok::<_, anyhow::Error>(())
}
)?;
@@ -445,7 +417,7 @@ impl MailboxInternal {
source_id: UniqueIdent,
new_id: UniqueIdent,
) -> Result<()> {
- if self.bucket != from.bucket || self.encryption_key != from.encryption_key {
+ if self.encryption_key != from.encryption_key {
bail!("Message to be copied/moved does not belong to same account.");
}
@@ -460,23 +432,20 @@ impl MailboxInternal {
futures::try_join!(
async {
- // Copy mail body from S3
- let cor = CopyObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, new_id),
- copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id),
- ..Default::default()
- };
-
- self.s3.copy_object(cor).await?;
+ let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
+ let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
+ self.storage.blob_copy(&src, &dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None)
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &new_id.to_string()),
+ meta_blob,
+ )])
.await?;
Ok::<_, anyhow::Error>(())
},
diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs
index 267f66e..0e629db 100644
--- a/src/mail/unique_ident.rs
+++ b/src/mail/unique_ident.rs
@@ -5,7 +5,7 @@ use lazy_static::lazy_static;
use rand::prelude::*;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
-use crate::time::now_msec;
+use crate::timestamp::now_msec;
/// An internal Mail Identifier is composed of two components:
/// - a process identifier, 128 bits, itself composed of:
diff --git a/src/mail/user.rs b/src/mail/user.rs
index 5523c2a..da0d509 100644
--- a/src/mail/user.rs
+++ b/src/mail/user.rs
@@ -2,18 +2,18 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Weak};
use anyhow::{anyhow, bail, Result};
-use k2v_client::{CausalityToken, K2vClient, K2vValue};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::cryptoblob::{open_deserialize, seal_serialize};
-use crate::login::{Credentials, StorageCredentials};
+use crate::login::Credentials;
use crate::mail::incoming::incoming_mail_watch_process;
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::{gen_ident, UniqueIdent};
-use crate::time::now_msec;
+use crate::storage;
+use crate::timestamp::now_msec;
pub const MAILBOX_HIERARCHY_DELIMITER: char = '.';
@@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list";
pub struct User {
pub username: String,
pub creds: Credentials,
- pub k2v: K2vClient,
+ pub storage: storage::Store,
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
@@ -41,7 +41,7 @@ pub struct User {
impl User {
pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
- let cache_key = (username.clone(), creds.storage.clone());
+ let cache_key = (username.clone(), creds.storage.unique());
{
let cache = USER_CACHE.lock().unwrap();
@@ -165,6 +165,7 @@ impl User {
list.rename_mailbox(name, &nnew)?;
}
}
+
self.save_mailbox_list(&list, ct).await?;
}
Ok(())
@@ -173,14 +174,14 @@ impl User {
// ---- Internal user & mailbox management ----
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
- let k2v = creds.k2v_client()?;
+ let storage = creds.storage.build().await?;
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
let user = Arc::new(Self {
username,
creds: creds.clone(),
- k2v,
+ storage,
tx_inbox_id,
mailboxes: std::sync::Mutex::new(HashMap::new()),
});
@@ -223,32 +224,42 @@ impl User {
// ---- Mailbox list management ----
- async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> {
- let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await {
- Err(k2v_client::Error::NotFound) => (MailboxList::new(), None),
+ async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
+ let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
+ let (mut list, row) = match self
+ .storage
+ .row_fetch(&storage::Selector::Single(&row_ref))
+ .await
+ {
+ Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
Err(e) => return Err(e.into()),
- Ok(cv) => {
+ Ok(rv) => {
let mut list = MailboxList::new();
- for v in cv.value {
- if let K2vValue::Value(vbytes) = v {
+ let (row_ref, row_vals) = match rv.into_iter().next() {
+ Some(row_val) => (row_val.row_ref, row_val.value),
+ None => (row_ref, vec![]),
+ };
+
+ for v in row_vals {
+ if let storage::Alternative::Value(vbytes) = v {
let list2 =
open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
list.merge(list2);
}
}
- (list, Some(cv.causality))
+ (list, Some(row_ref))
}
};
- self.ensure_inbox_exists(&mut list, &ct).await?;
+ self.ensure_inbox_exists(&mut list, &row).await?;
- Ok((list, ct))
+ Ok((list, row))
}
async fn ensure_inbox_exists(
&self,
list: &mut MailboxList,
- ct: &Option<CausalityToken>,
+ ct: &Option<storage::RowRef>,
) -> Result<bool> {
// If INBOX doesn't exist, create a new mailbox with that name
// and save new mailbox list.
@@ -277,12 +288,12 @@ impl User {
async fn save_mailbox_list(
&self,
list: &MailboxList,
- ct: Option<CausalityToken>,
+ ct: Option<storage::RowRef>,
) -> Result<()> {
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
- self.k2v
- .insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct)
- .await?;
+ let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK));
+ let row_val = storage::RowVal::new(rref, list_blob);
+ self.storage.row_insert(vec![row_val]).await?;
Ok(())
}
}
@@ -455,6 +466,6 @@ enum CreatedMailbox {
// ---- User cache ----
lazy_static! {
- static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> =
+ static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> =
std::sync::Mutex::new(HashMap::new());
}