From 4a33ac2265dae0e8fd1f7fbaec54ab7120334cbe Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 17 Nov 2023 12:15:44 +0100 Subject: incoming has been fully ported --- src/login/mod.rs | 2 +- src/mail/incoming.rs | 74 +++++++++++++++++++++--------------------------- src/storage/garage.rs | 6 ++++ src/storage/in_memory.rs | 6 ++++ src/storage/mod.rs | 3 +- 5 files changed, 47 insertions(+), 44 deletions(-) (limited to 'src') diff --git a/src/login/mod.rs b/src/login/mod.rs index f4bf4d2..a150829 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -47,7 +47,7 @@ pub struct Credentials { #[derive(Clone, Debug)] pub struct PublicCredentials { /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) - pub storage: StorageCredentials, + pub storage: Builders, pub public_key: PublicKey, } diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 9899ae8..db22f3e 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +//use std::collections::HashMap; use std::convert::TryFrom; use std::sync::{Arc, Weak}; @@ -6,11 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, FutureExt}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; -use tokio::io::AsyncReadExt; +//use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; @@ -81,7 +77,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan())), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -220,7 +216,7 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, CausalityToken), + Held(UniqueIdent, u64, storage::OrphanRowRef), } async fn k2v_lock_loop_internal( @@ -236,10 +232,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture> = async { - let mut ct = None; + let mut ct = k2v.row(pk, sk); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { + match ct.poll().await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -250,8 +246,8 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.value.iter() { - if let K2vValue::Value(vbytes) = v { + for v in cv.content().iter() { + if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); @@ -264,16 +260,18 @@ async fn k2v_lock_loop_internal( } } } + let new_ct = cv.to_ref(); + info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", - ct, cv.causality, lock_state + ct, new_ct, lock_state ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) .unwrap_or(LockState::Empty), )?; - ct = Some(cv.causality); + ct = new_ct; } } } @@ -359,7 +357,11 @@ async fn k2v_lock_loop_internal( now_msec() + LOCK_DURATION.as_millis() as u64, )); lock[8..].copy_from_slice(&our_pid.0); - if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { + let row = match ct { + Some(orphan) => k2v.from_orphan(orphan), + None => k2v.row(pk, sk), + }; + if let Err(e) = row.set_value(lock).push().await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -385,7 +387,8 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let _ = k2v.delete_item(pk, sk, ct.clone()).await; + let row = k2v.from_orphan(ct); + let _ = row.rm().await; } } @@ -407,13 +410,14 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.s3_client()?; - let k2v_client = creds.storage.k2v_client()?; + let s3_client = creds.storage.blob_store()?; + let k2v_client = creds.storage.row_store()?; // Get causality token of previous watch key - let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { - Err(_) => None, - Ok(cv) => Some(cv.causality), + let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match query.fetch().await { + Err(_) => query, + Ok(cv) => cv.to_ref(), }; // Write mail to encrypted storage @@ -421,28 +425,14 @@ impl EncryptedMessage { sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); let key_header = base64::encode(&encrypted_key); - let por = PutObjectRequest { - bucket: creds.storage.bucket.clone(), - key: format!("incoming/{}", gen_ident()), - metadata: Some( - [(MESSAGE_KEY.to_string(), key_header)] - .into_iter() - .collect::>(), - ), - body: Some(self.encrypted_body.clone().into()), - ..Default::default() - }; - s3_client.put_object(por).await?; + let mut send = s3_client + .blob(&format!("incoming/{}", gen_ident())) + .set_value(self.encrypted_body.clone().into()); + send.set_meta(MESSAGE_KEY, &key_header); + send.push().await?; // Update watch key to signal new mail - k2v_client - .insert_item( - INCOMING_PK, - INCOMING_WATCH_SK, - gen_ident().0.to_vec(), - watch_ct, - ) - .await?; + watch_ct.set_value(gen_ident().0.to_vec()).push().await?; Ok(()) } diff --git a/src/storage/garage.rs b/src/storage/garage.rs index d6ac7ac..f458aeb 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -67,6 +67,12 @@ impl IRowRef for GrgRef { } } +impl std::fmt::Debug for GrgRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unimplemented!(); + } +} + impl IRowValue for GrgValue { fn to_ref(&self) -> RowRef { unimplemented!(); diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 0bdf9b1..20f96a4 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -72,6 +72,12 @@ impl IRowRef for MemRef { } } +impl std::fmt::Debug for MemRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unimplemented!(); + } +} + impl IRowValue for MemValue { fn to_ref(&self) -> RowRef { unimplemented!(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c9a49c5..08ccfec 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -91,7 +91,7 @@ pub trait IRowStore } pub type RowStore = Box; -pub trait IRowRef +pub trait IRowRef: std::fmt::Debug { /*fn clone_boxed(&self) -> RowRef;*/ fn to_orphan(&self) -> OrphanRowRef; @@ -138,6 +138,7 @@ pub type BlobRef = Box; pub trait IBlobValue { fn to_ref(&self) -> BlobRef; fn get_meta(&self, key: &str) -> Option<&[u8]>; + fn set_meta(&mut self, key: &str, val: &str); fn content(&self) -> Option<&[u8]>; fn push(&self) -> AsyncResult<()>; } -- cgit v1.2.3