aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mail/mailbox.rs106
-rw-r--r--src/storage/mod.rs14
2 files changed, 32 insertions, 88 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index 83039d5..e8111df 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -1,11 +1,5 @@
use anyhow::{anyhow, bail, Result};
-use k2v_client::K2vClient;
-use k2v_client::{BatchReadOp, Filter, K2vValue};
-use rusoto_s3::{
- CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3,
-};
use serde::{Deserialize, Serialize};
-use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
@@ -206,35 +200,18 @@ impl MailboxInternal {
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
- let ops = ids
- .iter()
- .map(|id| BatchReadOp {
- partition_key: &self.mail_path,
- filter: Filter {
- start: Some(id),
- end: None,
- prefix: None,
- limit: None,
- reverse: false,
- },
- single_item: true,
- conflicts_only: false,
- tombstones: false,
- })
- .collect::<Vec<_>>();
- let res_vec = self.k2v.read_batch(&ops).await?;
+ let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
+ let res_vec = self.k2v.select(storage::Selector::List(ops)).await?;
let mut meta_vec = vec![];
- for (op, res) in ops.iter().zip(res_vec.into_iter()) {
- if res.items.len() != 1 {
- bail!("Expected 1 item, got {}", res.items.len());
- }
- let (_, cv) = res.items.iter().next().unwrap();
+ for res in res_vec.into_iter() {
let mut meta_opt = None;
- for v in cv.value.iter() {
+
+ // Resolve conflicts
+ for v in res.content().iter() {
match v {
- K2vValue::Tombstone => (),
- K2vValue::Value(v) => {
+ storage::Alternative::Tombstone => (),
+ storage::Alternative::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
match meta_opt.as_mut() {
None => {
@@ -250,7 +227,7 @@ impl MailboxInternal {
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
- bail!("No valid meta value in k2v for {:?}", op.filter.start);
+ bail!("No valid meta value in k2v for {:?}", res.to_ref().sk());
}
}
@@ -258,19 +235,9 @@ impl MailboxInternal {
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- let gor = GetObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, id),
- ..Default::default()
- };
-
- let obj_res = self.s3.get_object(gor).await?;
-
- let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
- let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
- obj_body.into_async_read().read_to_end(&mut buf).await?;
-
- cryptoblob::open(&buf, message_key)
+ let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?;
+ let body = obj_res.content().ok_or(anyhow!("missing body"))?;
+ cryptoblob::open(body, message_key)
}
// ---- Functions for changing the mailbox ----
@@ -303,13 +270,7 @@ impl MailboxInternal {
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
- let por = PutObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- body: Some(message_blob.into()),
- ..Default::default()
- };
- self.s3.put_object(por).await?;
+ self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -321,9 +282,7 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
- .await?;
+ self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@@ -354,8 +313,8 @@ impl MailboxInternal {
futures::try_join!(
async {
// Copy mail body from previous location
- let dst = self.s3.blob(format!("{}/{}", self.mail_path, ident));
- blob_ref.copy(dst).await?;
+ let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident));
+ blob_ref.copy(&dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -367,9 +326,7 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
- .await?;
+ self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@@ -393,21 +350,13 @@ impl MailboxInternal {
futures::try_join!(
async {
// Delete mail body from S3
- let dor = DeleteObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, ident),
- ..Default::default()
- };
- self.s3.delete_object(dor).await?;
+ self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
- let v = self.k2v.read_item(&self.mail_path, &sk).await?;
- self.k2v
- .delete_item(&self.mail_path, &sk, v.causality)
- .await?;
+ self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?;
Ok::<_, anyhow::Error>(())
}
)?;
@@ -438,7 +387,7 @@ impl MailboxInternal {
source_id: UniqueIdent,
new_id: UniqueIdent,
) -> Result<()> {
- if self.bucket != from.bucket || self.encryption_key != from.encryption_key {
+ if self.encryption_key != from.encryption_key {
bail!("Message to be copied/moved does not belong to same account.");
}
@@ -453,24 +402,15 @@ impl MailboxInternal {
futures::try_join!(
async {
- // Copy mail body from S3
- let cor = CopyObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/{}", self.mail_path, new_id),
- copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id),
- ..Default::default()
- };
-
- self.s3.copy_object(cor).await?;
+ let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id));
+ self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
- self.k2v
- .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None)
- .await?;
+ self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(meta_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync(),
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index c5ed1f8..b687959 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -14,17 +14,18 @@ use futures::future::BoxFuture;
pub mod in_memory;
pub mod garage;
-pub enum Selector<'a> {
- Range{ begin: &'a str, end: &'a str },
- Filter(u64),
-}
-
pub enum Alternative {
Tombstone,
Value(Vec<u8>),
}
type ConcurrentValues = Vec<Alternative>;
+pub enum Selector<'a> {
+ Range { begin: &'a str, end: &'a str },
+ List (Vec<(&'a str, &'a str)>),
+ Prefix (&'a str),
+}
+
#[derive(Debug)]
pub enum StorageError {
NotFound,
@@ -78,12 +79,15 @@ impl Hash for Builders {
pub trait IRowStore
{
fn row(&self, partition: &str, sort: &str) -> RowRef;
+ fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>;
}
pub type RowStore = Box<dyn IRowStore + Sync + Send>;
pub trait IRowRef
{
fn clone_boxed(&self) -> RowRef;
+ fn pk(&self) -> &str;
+ fn sk(&self) -> &str;
fn set_value(&self, content: Vec<u8>) -> RowValue;
fn fetch(&self) -> AsyncResult<RowValue>;
fn rm(&self) -> AsyncResult<()>;