aboutsummaryrefslogtreecommitdiff
path: root/src/mail/mailbox.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mail/mailbox.rs')
-rw-r--r--src/mail/mailbox.rs159
1 files changed, 64 insertions, 95 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index d92140d..e424ba3 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -1,11 +1,5 @@
use anyhow::{anyhow, bail, Result};
-use k2v_client::K2vClient;
-use k2v_client::{BatchReadOp, Filter, K2vValue};
-use rusoto_s3::{
- CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3,
-};
use serde::{Deserialize, Serialize};
-use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
@@ -14,7 +8,8 @@ use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
-use crate::time::now_msec;
+use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
+use crate::timestamp::now_msec;
pub struct Mailbox {
pub(super) id: UniqueIdent,
@@ -30,7 +25,7 @@ impl Mailbox {
let index_path = format!("index/{}", id);
let mail_path = format!("mail/{}", id);
- let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
+ let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
uid_index.sync().await?;
let uidvalidity = uid_index.state().uidvalidity;
@@ -48,10 +43,8 @@ impl Mailbox {
let mbox = RwLock::new(MailboxInternal {
id,
- bucket: creds.bucket().to_string(),
encryption_key: creds.keys.master.clone(),
- k2v: creds.k2v_client()?,
- s3: creds.s3_client()?,
+ storage: creds.storage.build().await?,
uid_index,
mail_path,
});
@@ -121,13 +114,13 @@ impl Mailbox {
&self,
msg: IMF<'a>,
ident: UniqueIdent,
- s3_key: &str,
+ blob_ref: storage::BlobRef,
message_key: Key,
) -> Result<()> {
self.mbox
.write()
.await
- .append_from_s3(msg, ident, s3_key, message_key)
+ .append_from_s3(msg, ident, blob_ref, message_key)
.await
}
@@ -182,13 +175,9 @@ struct MailboxInternal {
// 2023-05-15 will probably be used later.
#[allow(dead_code)]
id: UniqueIdent,
- bucket: String,
mail_path: String,
encryption_key: Key,
-
- k2v: K2vClient,
- s3: S3Client,
-
+ storage: Store,
uid_index: Bayou<UidIndex>,
}
@@ -209,33 +198,19 @@ impl MailboxInternal {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let ops = ids
.iter()
- .map(|id| BatchReadOp {
- partition_key: &self.mail_path,
- filter: Filter {
- start: Some(id),
- end: None,
- prefix: None,
- limit: None,
- reverse: false,
- },
- single_item: true,
- conflicts_only: false,
- tombstones: false,
- })
+ .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
.collect::<Vec<_>>();
- let res_vec = self.k2v.read_batch(&ops).await?;
+ let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
let mut meta_vec = vec![];
- for (op, res) in ops.iter().zip(res_vec.into_iter()) {
- if res.items.len() != 1 {
- bail!("Expected 1 item, got {}", res.items.len());
- }
- let (_, cv) = res.items.iter().next().unwrap();
+ for res in res_vec.into_iter() {
let mut meta_opt = None;
- for v in cv.value.iter() {
+
+ // Resolve conflicts
+ for v in res.value.iter() {
match v {
- K2vValue::Tombstone => (),
- K2vValue::Value(v) => {
+ storage::Alternative::Tombstone => (),
+ storage::Alternative::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
match meta_opt.as_mut() {
None => {
@@ -251,7 +226,7 @@ impl MailboxInternal {
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
- bail!("No valid meta value in k2v for {:?}", op.filter.start);
+ bail!("No valid meta value in k2v for {:?}", res.row_ref);
}
}
@@ -259,19 +234,12 @@ impl MailboxInternal {
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- let gor = GetObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, id),
- ..Default::default()
- };
-
- let obj_res = self.s3.get_object(gor).await?;
-
- let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
- let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
- obj_body.into_async_read().read_to_end(&mut buf).await?;
-
- cryptoblob::open(&buf, message_key)
+ let obj_res = self
+ .storage
+ .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
+ .await?;
+ let body = obj_res.value;
+ cryptoblob::open(&body, message_key)
}
// ---- Functions for changing the mailbox ----
@@ -304,13 +272,12 @@ impl MailboxInternal {
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
- let por = PutObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- body: Some(message_blob.into()),
- ..Default::default()
- };
- self.s3.put_object(por).await?;
+ self.storage
+ .blob_insert(BlobVal::new(
+ BlobRef(format!("{}/{}", self.mail_path, ident)),
+ message_blob,
+ ))
+ .await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -322,8 +289,11 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
.await?;
Ok::<_, anyhow::Error>(())
},
@@ -349,20 +319,14 @@ impl MailboxInternal {
&mut self,
mail: IMF<'a>,
ident: UniqueIdent,
- s3_key: &str,
+ blob_src: storage::BlobRef,
message_key: Key,
) -> Result<()> {
futures::try_join!(
async {
// Copy mail body from previous location
- let cor = CopyObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- copy_source: format!("{}/{}", self.bucket, s3_key),
- metadata_directive: Some("REPLACE".into()),
- ..Default::default()
- };
- self.s3.copy_object(cor).await?;
+ let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
+ self.storage.blob_copy(&blob_src, &blob_dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -374,8 +338,11 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
.await?;
Ok::<_, anyhow::Error>(())
},
@@ -400,21 +367,26 @@ impl MailboxInternal {
futures::try_join!(
async {
// Delete mail body from S3
- let dor = DeleteObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- ..Default::default()
- };
- self.s3.delete_object(dor).await?;
+ self.storage
+ .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
+ .await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
- let v = self.k2v.read_item(&self.mail_path, &sk).await?;
- self.k2v
- .delete_item(&self.mail_path, &sk, v.causality)
+ let res = self
+ .storage
+ .row_fetch(&storage::Selector::Single(&RowRef::new(
+ &self.mail_path,
+ &sk,
+ )))
.await?;
+ if let Some(row_val) = res.into_iter().next() {
+ self.storage
+ .row_rm(&storage::Selector::Single(&row_val.row_ref))
+ .await?;
+ }
Ok::<_, anyhow::Error>(())
}
)?;
@@ -445,7 +417,7 @@ impl MailboxInternal {
source_id: UniqueIdent,
new_id: UniqueIdent,
) -> Result<()> {
- if self.bucket != from.bucket || self.encryption_key != from.encryption_key {
+ if self.encryption_key != from.encryption_key {
bail!("Message to be copied/moved does not belong to same account.");
}
@@ -460,23 +432,20 @@ impl MailboxInternal {
futures::try_join!(
async {
- // Copy mail body from S3
- let cor = CopyObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, new_id),
- copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id),
- ..Default::default()
- };
-
- self.s3.copy_object(cor).await?;
+ let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
+ let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
+ self.storage.blob_copy(&src, &dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None)
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &new_id.to_string()),
+ meta_blob,
+ )])
.await?;
Ok::<_, anyhow::Error>(())
},