diff options
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r-- | src/mail/incoming.rs | 28 |
1 files changed, 23 insertions, 5 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 86cbc07..3a1e4f9 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, Future, FutureExt}; use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; use rusoto_s3::{ - GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, + DeleteObjectRequest, GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, + PutObjectRequest, S3Client, S3, }; use tokio::io::AsyncReadExt; use tokio::sync::watch; @@ -27,7 +28,7 @@ const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; const INCOMING_WATCH_SK: &str = "watch"; -const MESSAGE_KEY: &str = "Message-Key"; +const MESSAGE_KEY: &str = "message-key"; // When a lock is held, it is held for LOCK_DURATION (here 5 minutes) // It is renewed every LOCK_DURATION/3 @@ -183,6 +184,7 @@ async fn move_incoming_message( let get_result = s3.get_object(gor).await?; // 1.a decrypt message key from headers + info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = get_result .metadata .as_ref() @@ -215,6 +217,12 @@ async fn move_incoming_message( .append_from_s3(msg, id, &object_key, message_key) .await?; + // 3 delete from incoming + let mut dor = DeleteObjectRequest::default(); + dor.bucket = user.creds.storage.bucket.clone(); + dor.key = object_key.clone(); + s3.delete_object(dor).await?; + Ok(()) } @@ -250,6 +258,7 @@ async fn k2v_lock_loop_internal( let watch_lock_loop: BoxFuture<Result<()>> = async { let mut ct = None; loop { + info!("k2v watch lock loop iter: ct = {:?}", ct); match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { Err(e) => { error!( @@ -275,6 +284,10 @@ async fn k2v_lock_loop_internal( } } } + info!( + "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", + ct, cv.causality, lock_state + ); state_tx.send( lock_state .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) @@ -283,8 +296,8 @@ async fn k2v_lock_loop_internal( ct = Some(cv.causality); } } - info!("Stopping lock state watch"); } + info!("Stopping lock state watch"); } .boxed(); @@ -303,7 +316,10 @@ async fn k2v_lock_loop_internal( } _ => None, }; - held_tx.send(held_with_expiration_time.is_some())?; + let held = held_with_expiration_time.is_some(); + if held != *held_tx.borrow() { + held_tx.send(held)?; + } let await_expired = async { match held_with_expiration_time { @@ -360,7 +376,9 @@ async fn k2v_lock_loop_internal( // Acquire lock let mut lock = vec![0u8; 32]; - lock[..8].copy_from_slice(&u64::to_be_bytes(now_msec())); + lock[..8].copy_from_slice(&u64::to_be_bytes( + now_msec() + LOCK_DURATION.as_millis() as u64, + )); lock[8..].copy_from_slice(&our_pid.0); if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { error!("Could not take lock: {}", e); |