aboutsummaryrefslogtreecommitdiff
path: root/src/mail/incoming.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r--src/mail/incoming.rs74
1 files changed, 32 insertions, 42 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 9899ae8..db22f3e 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -1,4 +1,4 @@
-use std::collections::HashMap;
+//use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::{Arc, Weak};
@@ -6,11 +6,7 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, FutureExt};
-use k2v_client::{CausalityToken, K2vClient, K2vValue};
-use rusoto_s3::{
- DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
-};
-use tokio::io::AsyncReadExt;
+//use tokio::io::AsyncReadExt;
use tokio::sync::watch;
use tracing::{error, info, warn};
@@ -81,7 +77,7 @@ async fn incoming_mail_watch_process_internal(
tokio::select! {
inc_k = wait_new_mail => Some(inc_k),
- _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key),
+ _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan())),
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
}
@@ -220,7 +216,7 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) ->
enum LockState {
Unknown,
Empty,
- Held(UniqueIdent, u64, CausalityToken),
+ Held(UniqueIdent, u64, storage::OrphanRowRef),
}
async fn k2v_lock_loop_internal(
@@ -236,10 +232,10 @@ async fn k2v_lock_loop_internal(
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
let watch_lock_loop: BoxFuture<Result<()>> = async {
- let mut ct = None;
+ let mut ct = k2v.row(pk, sk);
loop {
info!("k2v watch lock loop iter: ct = {:?}", ct);
- match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
+ match ct.poll().await {
Err(e) => {
error!(
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
@@ -250,8 +246,8 @@ async fn k2v_lock_loop_internal(
}
Ok(cv) => {
let mut lock_state = None;
- for v in cv.value.iter() {
- if let K2vValue::Value(vbytes) = v {
+ for v in cv.content().iter() {
+ if let storage::Alternative::Value(vbytes) = v {
if vbytes.len() == 32 {
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
@@ -264,16 +260,18 @@ async fn k2v_lock_loop_internal(
}
}
}
+ let new_ct = cv.to_ref();
+
info!(
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
- ct, cv.causality, lock_state
+ ct, new_ct, lock_state
);
state_tx.send(
lock_state
- .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
+ .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan()))
.unwrap_or(LockState::Empty),
)?;
- ct = Some(cv.causality);
+ ct = new_ct;
}
}
}
@@ -359,7 +357,11 @@ async fn k2v_lock_loop_internal(
now_msec() + LOCK_DURATION.as_millis() as u64,
));
lock[8..].copy_from_slice(&our_pid.0);
- if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
+ let row = match ct {
+ Some(orphan) => k2v.from_orphan(orphan),
+ None => k2v.row(pk, sk),
+ };
+ if let Err(e) = row.set_value(lock).push().await {
error!("Could not take lock: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
@@ -385,7 +387,8 @@ async fn k2v_lock_loop_internal(
_ => None,
};
if let Some(ct) = release {
- let _ = k2v.delete_item(pk, sk, ct.clone()).await;
+ let row = k2v.from_orphan(ct);
+ let _ = row.rm().await;
}
}
@@ -407,13 +410,14 @@ impl EncryptedMessage {
}
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
- let s3_client = creds.storage.s3_client()?;
- let k2v_client = creds.storage.k2v_client()?;
+ let s3_client = creds.storage.blob_store()?;
+ let k2v_client = creds.storage.row_store()?;
// Get causality token of previous watch key
- let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
- Err(_) => None,
- Ok(cv) => Some(cv.causality),
+ let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK);
+ let watch_ct = match query.fetch().await {
+ Err(_) => query,
+ Ok(cv) => cv.to_ref(),
};
// Write mail to encrypted storage
@@ -421,28 +425,14 @@ impl EncryptedMessage {
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
let key_header = base64::encode(&encrypted_key);
- let por = PutObjectRequest {
- bucket: creds.storage.bucket.clone(),
- key: format!("incoming/{}", gen_ident()),
- metadata: Some(
- [(MESSAGE_KEY.to_string(), key_header)]
- .into_iter()
- .collect::<HashMap<_, _>>(),
- ),
- body: Some(self.encrypted_body.clone().into()),
- ..Default::default()
- };
- s3_client.put_object(por).await?;
+ let mut send = s3_client
+ .blob(&format!("incoming/{}", gen_ident()))
+ .set_value(self.encrypted_body.clone().into());
+ send.set_meta(MESSAGE_KEY, &key_header);
+ send.push().await?;
// Update watch key to signal new mail
- k2v_client
- .insert_item(
- INCOMING_PK,
- INCOMING_WATCH_SK,
- gen_ident().0.to_vec(),
- watch_ct,
- )
- .await?;
+ watch_ct.set_value(gen_ident().0.to_vec()).push().await?;
Ok(())
}