aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mail/incoming.rs28
-rw-r--r--src/mail/mailbox.rs97
2 files changed, 86 insertions, 39 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 86cbc07..3a1e4f9 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, Future, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{
- GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
+ DeleteObjectRequest, GetObjectRequest, HeadObjectRequest, ListObjectsV2Request,
+ PutObjectRequest, S3Client, S3,
};
use tokio::io::AsyncReadExt;
use tokio::sync::watch;
@@ -27,7 +28,7 @@ const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch";
-const MESSAGE_KEY: &str = "Message-Key";
+const MESSAGE_KEY: &str = "message-key";
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
// It is renewed every LOCK_DURATION/3
@@ -183,6 +184,7 @@ async fn move_incoming_message(
let get_result = s3.get_object(gor).await?;
// 1.a decrypt message key from headers
+ info!("Object metadata: {:?}", get_result.metadata);
let key_encrypted_b64 = get_result
.metadata
.as_ref()
@@ -215,6 +217,12 @@ async fn move_incoming_message(
.append_from_s3(msg, id, &object_key, message_key)
.await?;
+ // 3 delete from incoming
+ let mut dor = DeleteObjectRequest::default();
+ dor.bucket = user.creds.storage.bucket.clone();
+ dor.key = object_key.clone();
+ s3.delete_object(dor).await?;
+
Ok(())
}
@@ -250,6 +258,7 @@ async fn k2v_lock_loop_internal(
let watch_lock_loop: BoxFuture<Result<()>> = async {
let mut ct = None;
loop {
+ info!("k2v watch lock loop iter: ct = {:?}", ct);
match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
Err(e) => {
error!(
@@ -275,6 +284,10 @@ async fn k2v_lock_loop_internal(
}
}
}
+ info!(
+ "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
+ ct, cv.causality, lock_state
+ );
state_tx.send(
lock_state
.map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
@@ -283,8 +296,8 @@ async fn k2v_lock_loop_internal(
ct = Some(cv.causality);
}
}
- info!("Stopping lock state watch");
}
+ info!("Stopping lock state watch");
}
.boxed();
@@ -303,7 +316,10 @@ async fn k2v_lock_loop_internal(
}
_ => None,
};
- held_tx.send(held_with_expiration_time.is_some())?;
+ let held = held_with_expiration_time.is_some();
+ if held != *held_tx.borrow() {
+ held_tx.send(held)?;
+ }
let await_expired = async {
match held_with_expiration_time {
@@ -360,7 +376,9 @@ async fn k2v_lock_loop_internal(
// Acquire lock
let mut lock = vec![0u8; 32];
- lock[..8].copy_from_slice(&u64::to_be_bytes(now_msec()));
+ lock[..8].copy_from_slice(&u64::to_be_bytes(
+ now_msec() + LOCK_DURATION.as_millis() as u64,
+ ));
lock[8..].copy_from_slice(&our_pid.0);
if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
error!("Could not take lock: {}", e);
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index e63f712..7389173 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -207,21 +207,33 @@ impl MailboxInternal {
let res_vec = self.k2v.read_batch(&ops).await?;
let mut meta_vec = vec![];
- for res in res_vec {
+ for (op, res) in ops.iter().zip(res_vec.into_iter()) {
if res.items.len() != 1 {
bail!("Expected 1 item, got {}", res.items.len());
}
let (_, cv) = res.items.iter().next().unwrap();
- if cv.value.len() != 1 {
- bail!("Expected 1 value, got {}", cv.value.len());
- }
- match &cv.value[0] {
- K2vValue::Tombstone => bail!("Expected value, got tombstone"),
- K2vValue::Value(v) => {
- let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
- meta_vec.push(meta);
+ let mut meta_opt = None;
+ for v in cv.value.iter() {
+ match v {
+ K2vValue::Tombstone => (),
+ K2vValue::Value(v) => {
+ let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
+ match meta_opt.as_mut() {
+ None => {
+ meta_opt = Some(meta);
+ }
+ Some(prevmeta) => {
+ prevmeta.try_merge(meta)?;
+ }
+ }
+ }
}
}
+ if let Some(meta) = meta_opt {
+ meta_vec.push(meta);
+ } else {
+ bail!("No valid meta value in k2v for {:?}", op.filter.start);
+ }
}
Ok(meta_vec)
@@ -429,38 +441,42 @@ impl MailboxInternal {
// ----
async fn test(&mut self) -> Result<()> {
- self.uid_index.sync().await?;
-
- dump(&self.uid_index);
-
- let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
-Subject: Welcome to Aerogramme!!
+ Ok(())
-This is just a test email, feel free to ignore.
-"#;
- let mail = IMF::try_from(&mail[..]).unwrap();
- self.append(mail, None).await?;
+ /*
+ self.uid_index.sync().await?;
- dump(&self.uid_index);
+ dump(&self.uid_index);
- if self.uid_index.state().idx_by_uid.len() > 6 {
- for i in 0..2 {
- let (_, ident) = self
- .uid_index
- .state()
- .idx_by_uid
- .iter()
- .skip(3 + i)
- .next()
- .unwrap();
+ let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
+ Subject: Welcome to Aerogramme!!
- self.delete(*ident).await?;
+ This is just a test email, feel free to ignore.
+ "#;
+ let mail = IMF::try_from(&mail[..]).unwrap();
+ self.append(mail, None).await?;
dump(&self.uid_index);
- }
- }
- Ok(())
+ if self.uid_index.state().idx_by_uid.len() > 6 {
+ for i in 0..2 {
+ let (_, ident) = self
+ .uid_index
+ .state()
+ .idx_by_uid
+ .iter()
+ .skip(3 + i)
+ .next()
+ .unwrap();
+
+ self.delete(*ident).await?;
+
+ dump(&self.uid_index);
+ }
+ }
+
+ Ok(())
+ */
}
}
@@ -496,3 +512,16 @@ pub struct MailMeta {
/// RFC822 size
pub rfc822_size: usize,
}
+
+impl MailMeta {
+ fn try_merge(&mut self, other: Self) -> Result<()> {
+ if self.headers != other.headers
+ || self.message_key != other.message_key
+ || self.rfc822_size != other.rfc822_size
+ {
+ bail!("Conflicting MailMeta values.");
+ }
+ self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
+ Ok(())
+ }
+}