aboutsummaryrefslogtreecommitdiff
path: root/src/mail/incoming.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-04 12:44:48 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-04 12:44:48 +0200
commit1abfb60b8e643b1bf5ad04d8e111c83a7ed6bc03 (patch)
treec87a3f2ee323345efeb3c48fe1a4fbd4a2addb9f /src/mail/incoming.rs
parent9b4fcf58df2032ae8cd4622cfd92b42e393db610 (diff)
downloadaerogramme-1abfb60b8e643b1bf5ad04d8e111c83a7ed6bc03.tar.gz
aerogramme-1abfb60b8e643b1bf5ad04d8e111c83a7ed6bc03.zip
Handling of incoming messages from LMTP seems to work!
Diffstat (limited to 'src/mail/incoming.rs')
-rw-r--r--src/mail/incoming.rs28
1 files changed, 23 insertions, 5 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 86cbc07..3a1e4f9 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, Future, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{
- GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
+ DeleteObjectRequest, GetObjectRequest, HeadObjectRequest, ListObjectsV2Request,
+ PutObjectRequest, S3Client, S3,
};
use tokio::io::AsyncReadExt;
use tokio::sync::watch;
@@ -27,7 +28,7 @@ const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch";
-const MESSAGE_KEY: &str = "Message-Key";
+const MESSAGE_KEY: &str = "message-key";
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
// It is renewed every LOCK_DURATION/3
@@ -183,6 +184,7 @@ async fn move_incoming_message(
let get_result = s3.get_object(gor).await?;
// 1.a decrypt message key from headers
+ info!("Object metadata: {:?}", get_result.metadata);
let key_encrypted_b64 = get_result
.metadata
.as_ref()
@@ -215,6 +217,12 @@ async fn move_incoming_message(
.append_from_s3(msg, id, &object_key, message_key)
.await?;
+ // 3 delete from incoming
+ let mut dor = DeleteObjectRequest::default();
+ dor.bucket = user.creds.storage.bucket.clone();
+ dor.key = object_key.clone();
+ s3.delete_object(dor).await?;
+
Ok(())
}
@@ -250,6 +258,7 @@ async fn k2v_lock_loop_internal(
let watch_lock_loop: BoxFuture<Result<()>> = async {
let mut ct = None;
loop {
+ info!("k2v watch lock loop iter: ct = {:?}", ct);
match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
Err(e) => {
error!(
@@ -275,6 +284,10 @@ async fn k2v_lock_loop_internal(
}
}
}
+ info!(
+ "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
+ ct, cv.causality, lock_state
+ );
state_tx.send(
lock_state
.map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
@@ -283,8 +296,8 @@ async fn k2v_lock_loop_internal(
ct = Some(cv.causality);
}
}
- info!("Stopping lock state watch");
}
+ info!("Stopping lock state watch");
}
.boxed();
@@ -303,7 +316,10 @@ async fn k2v_lock_loop_internal(
}
_ => None,
};
- held_tx.send(held_with_expiration_time.is_some())?;
+ let held = held_with_expiration_time.is_some();
+ if held != *held_tx.borrow() {
+ held_tx.send(held)?;
+ }
let await_expired = async {
match held_with_expiration_time {
@@ -360,7 +376,9 @@ async fn k2v_lock_loop_internal(
// Acquire lock
let mut lock = vec![0u8; 32];
- lock[..8].copy_from_slice(&u64::to_be_bytes(now_msec()));
+ lock[..8].copy_from_slice(&u64::to_be_bytes(
+ now_msec() + LOCK_DURATION.as_millis() as u64,
+ ));
lock[8..].copy_from_slice(&our_pid.0);
if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
error!("Could not take lock: {}", e);