aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-13 12:30:35 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-13 12:30:35 +0200
commit9fa2e958b3b37538b80b7f26107b7df2238f335b (patch)
treeda34cfe10a8c5d6f9590e7e477635008f509efd4
parent33fa51021cce58e170ae0c943ab8c8b9fa94d6a9 (diff)
downloadaerogramme-9fa2e958b3b37538b80b7f26107b7df2238f335b.tar.gz
aerogramme-9fa2e958b3b37538b80b7f26107b7df2238f335b.zip
Begin add watch mechanism to Bayou
-rw-r--r--src/bayou.rs119
-rw-r--r--src/k2v_util.rs29
-rw-r--r--src/mail/incoming.rs29
-rw-r--r--src/main.rs1
4 files changed, 142 insertions, 36 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 7a76222..f6e0fb7 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -1,18 +1,21 @@
use std::str::FromStr;
+use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use anyhow::{anyhow, bail, Result};
-use log::debug;
+use log::{debug, error, info};
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
+use tokio::sync::{watch, Notify};
-use k2v_client::{BatchDeleteOp, BatchReadOp, Filter, K2vClient, K2vValue};
+use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
use crate::cryptoblob::*;
+use crate::k2v_util::k2v_wait_value_changed;
use crate::login::Credentials;
use crate::time::now_msec;
@@ -34,6 +37,8 @@ const CHECKPOINT_MIN_OPS: usize = 16;
// between processes doing .checkpoint() and those doing .sync()
const CHECKPOINTS_TO_KEEP: usize = 3;
+const WATCH_SK: &str = "watch";
+
pub trait BayouState:
Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
{
@@ -52,8 +57,12 @@ pub struct Bayou<S: BayouState> {
checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>,
+
last_sync: Option<Instant>,
last_try_checkpoint: Option<Instant>,
+
+ watch: Arc<K2vWatch>,
+ last_sync_watch_ct: Option<CausalityToken>,
}
impl<S: BayouState> Bayou<S> {
@@ -61,6 +70,8 @@ impl<S: BayouState> Bayou<S> {
let k2v_client = creds.k2v_client()?;
let s3_client = creds.s3_client()?;
+ let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
+
Ok(Self {
bucket: creds.bucket().to_string(),
path,
@@ -71,6 +82,8 @@ impl<S: BayouState> Bayou<S> {
history: vec![],
last_sync: None,
last_try_checkpoint: None,
+ watch,
+ last_sync_watch_ct: None,
})
}
@@ -106,7 +119,7 @@ impl<S: BayouState> Bayou<S> {
};
if self.checkpoint.0 > checkpoint.0 {
- bail!("Existing checkpoint is more recent than stored one");
+ bail!("Loaded checkpoint is more recent than stored one");
}
if let Some(ck) = checkpoint.1 {
@@ -132,7 +145,7 @@ impl<S: BayouState> Bayou<S> {
partition_key: &self.path,
filter: Filter {
start: Some(&ts_ser),
- end: None,
+ end: Some(WATCH_SK),
prefix: None,
limit: None,
reverse: false,
@@ -186,12 +199,9 @@ impl<S: BayouState> Bayou<S> {
let i0 = self
.history
.iter()
- .enumerate()
.zip(ops.iter())
- .skip_while(|((_, (ts1, _, _)), (ts2, _))| ts1 == ts2)
- .map(|((i, _), _)| i)
- .next()
- .unwrap_or(self.history.len());
+ .take_while(|((ts1, _, _), (ts2, _))| ts1 == ts2)
+ .count();
if ops.len() > i0 {
// Remove operations from first position where histories differ
@@ -259,6 +269,8 @@ impl<S: BayouState> Bayou<S> {
)
.await?;
+ self.watch.notify.notify_one();
+
let new_state = self.state().apply(&op);
self.history.push((ts, op, Some(new_state)));
@@ -427,6 +439,95 @@ impl<S: BayouState> Bayou<S> {
}
}
+// ---- Bayou watch in K2V ----
+
+struct K2vWatch {
+ pk: String,
+ sk: String,
+ rx: watch::Receiver<Option<CausalityToken>>,
+ notify: Notify,
+}
+
+impl K2vWatch {
+ /// Creates a new watch and launches subordinate threads.
+ /// These threads hold Weak pointers to the struct;
+ /// the exit when the Arc is dropped.
+ fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
+ let (tx, rx) = watch::channel::<Option<CausalityToken>>(None);
+ let notify = Notify::new();
+
+ let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
+
+ tokio::spawn(Self::watcher_task(
+ Arc::downgrade(&watch),
+ creds.k2v_client()?,
+ tx,
+ ));
+ tokio::spawn(Self::updater_task(
+ Arc::downgrade(&watch),
+ creds.k2v_client()?,
+ ));
+
+ Ok(watch)
+ }
+
+ async fn watcher_task(
+ self_weak: Weak<Self>,
+ k2v: K2vClient,
+ tx: watch::Sender<Option<CausalityToken>>,
+ ) {
+ let mut ct = None;
+ while let Some(this) = Weak::upgrade(&self_weak) {
+ info!("bayou k2v watch loop iter: ct = {:?}", ct);
+ let update = tokio::select!(
+ _ = tokio::time::sleep(Duration::from_secs(60)) => continue,
+ r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r,
+ );
+ match update {
+ Err(e) => {
+ error!("Error in bayou k2v wait value changed: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ Ok(cv) => {
+ if tx.send(Some(cv.causality.clone())).is_err() {
+ break;
+ }
+ ct = Some(cv.causality);
+ }
+ }
+ }
+ info!("bayou k2v watch loop exiting");
+ }
+
+ async fn updater_task(self_weak: Weak<Self>, k2v: K2vClient) {
+ while let Some(this) = Weak::upgrade(&self_weak) {
+ let ct: Option<CausalityToken> = this.rx.borrow().clone();
+ let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
+
+ tokio::select!(
+ _ = tokio::time::sleep(Duration::from_secs(60)) => (),
+ _ = this.notify.notified() => {
+ if let Err(e) = k2v
+ .insert_item(
+ &this.pk,
+ &this.sk,
+ rand,
+ ct,
+ )
+ .await
+ {
+ error!("Error in bayou k2v watch updater loop: {}", e);
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ }
+ }
+ )
+ }
+ info!("bayou k2v watch updater loop exiting");
+ }
+}
+
+// ---- TIMESTAMP CLASS ----
+
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct Timestamp {
pub msec: u64,
diff --git a/src/k2v_util.rs b/src/k2v_util.rs
new file mode 100644
index 0000000..9dadab4
--- /dev/null
+++ b/src/k2v_util.rs
@@ -0,0 +1,29 @@
+use anyhow::Result;
+
+use k2v_client::{CausalValue, CausalityToken, K2vClient};
+
+// ---- UTIL: function to wait for a value to have changed in K2V ----
+
+pub async fn k2v_wait_value_changed(
+ k2v: &K2vClient,
+ pk: &str,
+ sk: &str,
+ prev_ct: &Option<CausalityToken>,
+) -> Result<CausalValue> {
+ loop {
+ if let Some(ct) = prev_ct {
+ match k2v.poll_item(pk, sk, ct.clone(), None).await? {
+ None => continue,
+ Some(cv) => return Ok(cv),
+ }
+ } else {
+ match k2v.read_item(pk, sk).await {
+ Err(k2v_client::Error::NotFound) => {
+ k2v.insert_item(pk, sk, vec![0u8], None).await?;
+ }
+ Err(e) => return Err(e.into()),
+ Ok(cv) => return Ok(cv),
+ }
+ }
+ }
+}
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 9643985..66513bf 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -6,7 +6,7 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, FutureExt};
-use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
+use k2v_client::{CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
@@ -15,6 +15,7 @@ use tokio::sync::watch;
use tracing::{error, info, warn};
use crate::cryptoblob;
+use crate::k2v_util::k2v_wait_value_changed;
use crate::login::{Credentials, PublicCredentials};
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
@@ -408,32 +409,6 @@ async fn k2v_lock_loop_internal(
}
}
-// ---- UTIL: function to wait for a value to have changed in K2V ----
-
-async fn k2v_wait_value_changed<'a>(
- k2v: &'a K2vClient,
- pk: &'static str,
- sk: &'static str,
- prev_ct: &'a Option<CausalityToken>,
-) -> Result<CausalValue> {
- loop {
- if let Some(ct) = prev_ct {
- match k2v.poll_item(pk, sk, ct.clone(), None).await? {
- None => continue,
- Some(cv) => return Ok(cv),
- }
- } else {
- match k2v.read_item(pk, sk).await {
- Err(k2v_client::Error::NotFound) => {
- k2v.insert_item(pk, sk, vec![0u8], None).await?;
- }
- Err(e) => return Err(e.into()),
- Ok(cv) => return Ok(cv),
- }
- }
- }
-}
-
// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
pub struct EncryptedMessage {
diff --git a/src/main.rs b/src/main.rs
index b27c891..a4e22ff 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,6 +2,7 @@ mod bayou;
mod config;
mod cryptoblob;
mod imap;
+mod k2v_util;
mod lmtp;
mod login;
mod mail;