aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs41
1 files changed, 27 insertions, 14 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index f6e0fb7..4d33a8e 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -19,12 +19,12 @@ use crate::k2v_util::k2v_wait_value_changed;
use crate::login::Credentials;
use crate::time::now_msec;
-const SAVE_STATE_EVERY: usize = 64;
+const KEEP_STATE_EVERY: usize = 64;
// Checkpointing interval constants: a checkpoint is not made earlier
// than CHECKPOINT_INTERVAL time after the last one, and is not made
// if there are less than CHECKPOINT_MIN_OPS new operations since last one.
-const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(3600);
+const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(6 * 3600);
const CHECKPOINT_MIN_OPS: usize = 16;
// HYPOTHESIS: processes are able to communicate in a synchronous
// fashion in times that are small compared to CHECKPOINT_INTERVAL.
@@ -89,6 +89,9 @@ impl<S: BayouState> Bayou<S> {
/// Re-reads the state from persistent storage backend
pub async fn sync(&mut self) -> Result<()> {
+ let new_last_sync = Some(Instant::now());
+ let new_last_sync_watch_ct = self.watch.rx.borrow().clone();
+
// 1. List checkpoints
let checkpoints = self.list_checkpoints().await?;
debug!("(sync) listed checkpoints: {:?}", checkpoints);
@@ -225,7 +228,7 @@ impl<S: BayouState> Bayou<S> {
// Now, apply all operations retrieved from storage after the common part
for (ts, op) in ops.drain(i0..) {
state = state.apply(&op);
- if (self.history.len() + 1) % SAVE_STATE_EVERY == 0 {
+ if (self.history.len() + 1) % KEEP_STATE_EVERY == 0 {
self.history.push((ts, op, Some(state.clone())));
} else {
self.history.push((ts, op, None));
@@ -236,22 +239,32 @@ impl<S: BayouState> Bayou<S> {
self.history.last_mut().unwrap().2 = Some(state);
}
- self.last_sync = Some(Instant::now());
+ // Save info that sync has been done
+ self.last_sync = new_last_sync;
+ self.last_sync_watch_ct = new_last_sync_watch_ct;
Ok(())
}
- async fn check_recent_sync(&mut self) -> Result<()> {
- match self.last_sync {
- Some(t) if (Instant::now() - t) < CHECKPOINT_INTERVAL / 10 => Ok(()),
- _ => self.sync().await,
+ /// Does a sync() if either of the two conditions is met:
+ /// - last sync was more than CHECKPOINT_INTERVAL/5 ago
+ /// - a change was detected
+ pub async fn opportunistic_sync(&mut self) -> Result<()> {
+ let too_old = match self.last_sync {
+ Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
+ _ => true,
+ };
+ let changed = self.last_sync_watch_ct != *self.watch.rx.borrow();
+ if too_old || changed {
+ self.sync().await?;
}
+ Ok(())
}
/// Applies a new operation on the state. Once this function returns,
- /// the option has been safely persisted to storage backend
+ /// the operation has been safely persisted to storage backend.
+ /// Make sure to call `.opportunistic_sync()` before doing this,
+ /// and even before calculating the `op` argument given here.
pub async fn push(&mut self, op: S::Op) -> Result<()> {
- self.check_recent_sync().await?;
-
debug!("(push) add operation: {:?}", op);
let ts = Timestamp::after(
@@ -276,7 +289,7 @@ impl<S: BayouState> Bayou<S> {
// Clear previously saved state in history if not required
let hlen = self.history.len();
- if hlen >= 2 && (hlen - 1) % SAVE_STATE_EVERY != 0 {
+ if hlen >= 2 && (hlen - 1) % KEEP_STATE_EVERY != 0 {
self.history[hlen - 2].2 = None;
}
@@ -288,7 +301,7 @@ impl<S: BayouState> Bayou<S> {
/// Save a new checkpoint if previous checkpoint is too old
pub async fn checkpoint(&mut self) -> Result<()> {
match self.last_try_checkpoint {
- Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()),
+ Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 5 => Ok(()),
_ => {
let res = self.checkpoint_internal().await;
if res.is_ok() {
@@ -300,7 +313,7 @@ impl<S: BayouState> Bayou<S> {
}
async fn checkpoint_internal(&mut self) -> Result<()> {
- self.check_recent_sync().await?;
+ self.sync().await?;
// Check what would be the possible time for a checkpoint in the history we have
let now = now_msec() as i128;