aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mail/incoming.rs177
1 files changed, 144 insertions, 33 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 398f555..1d16d38 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -1,8 +1,10 @@
use std::collections::HashMap;
+use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
-use anyhow::Result;
+use anyhow::{anyhow, bail, Result};
+use futures::{future::BoxFuture, Future, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{PutObjectRequest, S3Client, S3};
use tokio::sync::watch;
@@ -20,6 +22,12 @@ const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch";
+// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
+// It is renewed every LOCK_DURATION/3
+// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
+// lost the lock.
+const LOCK_DURATION: Duration = Duration::from_secs(300);
+
pub async fn incoming_mail_watch_process(
user: Weak<User>,
creds: Credentials,
@@ -124,55 +132,158 @@ async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailb
fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
let (held_tx, held_rx) = watch::channel(false);
- tokio::spawn(async move {
- let _ = k2v_lock_loop_internal(k2v, pk, sk, held_tx).await;
- });
+ tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx));
held_rx
}
+#[derive(Clone, Debug)]
+enum LockState {
+ Unknown,
+ Empty,
+ Held(UniqueIdent, u64, CausalityToken),
+}
+
async fn k2v_lock_loop_internal(
k2v: K2vClient,
pk: &'static str,
sk: &'static str,
held_tx: watch::Sender<bool>,
-) -> std::result::Result<(), watch::error::SendError<bool>> {
- let pid = gen_ident();
-
- let mut state: Option<(UniqueIdent, u64, CausalityToken)> = None;
- loop {
- let held_until = match &state {
- None => None,
- Some((_holder, expiration_time, _ct)) => Some(expiration_time),
- };
-
- let now = now_msec();
- let wait_half_held_time = async {
- match held_until {
- None => futures::future::pending().await,
- Some(t) => tokio::time::sleep(Duration::from_millis((now_msec() - t) / 2)).await,
+) {
+ let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
+ let mut state_rx_2 = state_rx.clone();
+
+ let our_pid = gen_ident();
+
+ // Loop 1: watch state of lock in K2V, save that in corresponding watch channel
+ let watch_lock_loop: BoxFuture<Result<()>> = async {
+ let mut ct = None;
+ loop {
+ match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
+ Err(e) => {
+ error!(
+ "Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
+ e
+ );
+ state_tx.send(LockState::Unknown)?;
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(cv) => {
+ let mut lock_state = None;
+ for v in cv.value.iter() {
+ if let K2vValue::Value(vbytes) = v {
+ if vbytes.len() == 32 {
+ let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
+ let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
+ if lock_state
+ .map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
+ .unwrap_or(true)
+ {
+ lock_state = Some((pid, ts));
+ }
+ }
+ }
+ }
+ state_tx.send(
+ lock_state
+ .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
+ .unwrap_or(LockState::Empty),
+ )?;
+ ct = Some(cv.causality);
+ }
}
- };
-
- unimplemented!();
+ info!("Stopping lock state watch");
+ }
+ }
+ .boxed();
+
+ // Loop 2: notify user whether we are holding the lock or not
+ let lock_notify_loop: BoxFuture<Result<()>> = async {
+ loop {
+ let now = now_msec();
+ let held_with_expiration_time = match &*state_rx.borrow_and_update() {
+ LockState::Held(pid, ts, _ct) if *pid == our_pid => {
+ let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
+ if now < expiration_time {
+ Some(expiration_time)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ };
+ held_tx.send(held_with_expiration_time.is_some())?;
- /*
- tokio::select! {
- ret = k2v_wait_value_changed(&k2v, pk, sk, &state.as_ref().map(|(_, _, ct)| ct.clone())) => {
- match ret {
- Err(e) => {
- held_tx.send(false)?;
- tokio::time::sleep(Duration::from_secs(30)).await;
- continue;
+ let await_expired = async {
+ match held_with_expiration_time {
+ None => futures::future::pending().await,
+ Some(expiration_time) => {
+ tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
}
- Ok(cv) => {
- unimplemented!();
+ };
+ };
+
+ tokio::select!(
+ r = state_rx.changed() => {
+ r?;
+ }
+ _ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
+ _ = await_expired => continue,
+ );
+ }
+ }
+ .boxed();
+
+ // Loop 3: acquire lock when relevant
+ let take_lock_loop: BoxFuture<Result<()>> = async {
+ loop {
+ let now = now_msec();
+ let state: LockState = state_rx_2.borrow_and_update().clone();
+ let (acquire_at, ct) = match state {
+ LockState::Unknown => {
+ // If state of the lock is unknown, don't try to acquire
+ state_rx_2.changed().await?;
+ continue;
+ }
+ LockState::Empty => (now, None),
+ LockState::Held(pid, ts, ct) => {
+ if pid == our_pid {
+ (ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
+ } else {
+ (ts, Some(ct))
}
}
+ };
+
+ // Wait until it is time to acquire lock
+ if acquire_at > now {
+ tokio::select!(
+ r = state_rx_2.changed() => {
+ // If lock state changed in the meantime, don't acquire and loop around
+ r?;
+ continue;
+ }
+ _ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
+ );
}
+
+ // Acquire lock
+ let mut lock = vec![0u8; 32];
+ lock[..8].copy_from_slice(&u64::to_be_bytes(now_msec()));
+ lock[8..].copy_from_slice(&our_pid.0);
+ if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
+ error!("Could not take lock: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+
+ // Wait for new information to loop back
+ state_rx_2.changed().await?;
}
- */
}
+ .boxed();
+
+ let res = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
+ info!("lock loop exited: {:?}", res);
}
async fn k2v_wait_value_changed<'a>(