diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-11-17 12:15:44 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-11-17 12:15:44 +0100 |
commit | 4a33ac2265dae0e8fd1f7fbaec54ab7120334cbe (patch) | |
tree | 6b9bbbda40d64305c6e452af38cba4daed254105 /src/mail/incoming.rs | |
parent | 7eb690e49dd995663e8ea35b1a1f5b14584b4509 (diff) | |
download | aerogramme-4a33ac2265dae0e8fd1f7fbaec54ab7120334cbe.tar.gz aerogramme-4a33ac2265dae0e8fd1f7fbaec54ab7120334cbe.zip |
incoming has been fully ported
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r-- | src/mail/incoming.rs | 74 |
1 files changed, 32 insertions, 42 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 9899ae8..db22f3e 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +//use std::collections::HashMap; use std::convert::TryFrom; use std::sync::{Arc, Weak}; @@ -6,11 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, FutureExt}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; -use tokio::io::AsyncReadExt; +//use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; @@ -81,7 +77,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan())), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -220,7 +216,7 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, CausalityToken), + Held(UniqueIdent, u64, storage::OrphanRowRef), } async fn k2v_lock_loop_internal( @@ -236,10 +232,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture<Result<()>> = async { - let mut ct = None; + let mut ct = k2v.row(pk, sk); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { + match ct.poll().await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -250,8 +246,8 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.value.iter() { - if let K2vValue::Value(vbytes) = v { + for v in cv.content().iter() { + if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); @@ -264,16 +260,18 @@ async fn k2v_lock_loop_internal( } } } + let new_ct = cv.to_ref(); + info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", - ct, cv.causality, lock_state + ct, new_ct, lock_state ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) .unwrap_or(LockState::Empty), )?; - ct = Some(cv.causality); + ct = new_ct; } } } @@ -359,7 +357,11 @@ async fn k2v_lock_loop_internal( now_msec() + LOCK_DURATION.as_millis() as u64, )); lock[8..].copy_from_slice(&our_pid.0); - if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { + let row = match ct { + Some(orphan) => k2v.from_orphan(orphan), + None => k2v.row(pk, sk), + }; + if let Err(e) = row.set_value(lock).push().await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -385,7 +387,8 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let _ = k2v.delete_item(pk, sk, ct.clone()).await; + let row = k2v.from_orphan(ct); + let _ = row.rm().await; } } @@ -407,13 +410,14 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.s3_client()?; - let k2v_client = creds.storage.k2v_client()?; + let s3_client = creds.storage.blob_store()?; + let k2v_client = creds.storage.row_store()?; // Get causality token of previous watch key - let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { - Err(_) => None, - Ok(cv) => Some(cv.causality), + let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match query.fetch().await { + Err(_) => query, + Ok(cv) => cv.to_ref(), }; // Write mail to encrypted storage @@ -421,28 +425,14 @@ impl EncryptedMessage { sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); let key_header = base64::encode(&encrypted_key); - let por = PutObjectRequest { - bucket: creds.storage.bucket.clone(), - key: format!("incoming/{}", gen_ident()), - metadata: Some( - [(MESSAGE_KEY.to_string(), key_header)] - .into_iter() - .collect::<HashMap<_, _>>(), - ), - body: Some(self.encrypted_body.clone().into()), - ..Default::default() - }; - s3_client.put_object(por).await?; + let mut send = s3_client + .blob(&format!("incoming/{}", gen_ident())) + .set_value(self.encrypted_body.clone().into()); + send.set_meta(MESSAGE_KEY, &key_header); + send.push().await?; // Update watch key to signal new mail - k2v_client - .insert_item( - INCOMING_PK, - INCOMING_WATCH_SK, - gen_ident().0.to_vec(), - watch_ct, - ) - .await?; + watch_ct.set_value(gen_ident().0.to_vec()).push().await?; Ok(()) } |