diff options
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index f6e0fb7..4d33a8e 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -19,12 +19,12 @@ use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; use crate::time::now_msec; -const SAVE_STATE_EVERY: usize = 64; +const KEEP_STATE_EVERY: usize = 64; // Checkpointing interval constants: a checkpoint is not made earlier // than CHECKPOINT_INTERVAL time after the last one, and is not made // if there are less than CHECKPOINT_MIN_OPS new operations since last one. -const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(3600); +const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(6 * 3600); const CHECKPOINT_MIN_OPS: usize = 16; // HYPOTHESIS: processes are able to communicate in a synchronous // fashion in times that are small compared to CHECKPOINT_INTERVAL. @@ -89,6 +89,9 @@ impl<S: BayouState> Bayou<S> { /// Re-reads the state from persistent storage backend pub async fn sync(&mut self) -> Result<()> { + let new_last_sync = Some(Instant::now()); + let new_last_sync_watch_ct = self.watch.rx.borrow().clone(); + // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; debug!("(sync) listed checkpoints: {:?}", checkpoints); @@ -225,7 +228,7 @@ impl<S: BayouState> Bayou<S> { // Now, apply all operations retrieved from storage after the common part for (ts, op) in ops.drain(i0..) { state = state.apply(&op); - if (self.history.len() + 1) % SAVE_STATE_EVERY == 0 { + if (self.history.len() + 1) % KEEP_STATE_EVERY == 0 { self.history.push((ts, op, Some(state.clone()))); } else { self.history.push((ts, op, None)); @@ -236,22 +239,32 @@ impl<S: BayouState> Bayou<S> { self.history.last_mut().unwrap().2 = Some(state); } - self.last_sync = Some(Instant::now()); + // Save info that sync has been done + self.last_sync = new_last_sync; + self.last_sync_watch_ct = new_last_sync_watch_ct; Ok(()) } - async fn check_recent_sync(&mut self) -> Result<()> { - match self.last_sync { - Some(t) if (Instant::now() - t) < CHECKPOINT_INTERVAL / 10 => Ok(()), - _ => self.sync().await, + /// Does a sync() if either of the two conditions is met: + /// - last sync was more than CHECKPOINT_INTERVAL/5 ago + /// - a change was detected + pub async fn opportunistic_sync(&mut self) -> Result<()> { + let too_old = match self.last_sync { + Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5), + _ => true, + }; + let changed = self.last_sync_watch_ct != *self.watch.rx.borrow(); + if too_old || changed { + self.sync().await?; } + Ok(()) } /// Applies a new operation on the state. Once this function returns, - /// the option has been safely persisted to storage backend + /// the operation has been safely persisted to storage backend. + /// Make sure to call `.opportunistic_sync()` before doing this, + /// and even before calculating the `op` argument given here. pub async fn push(&mut self, op: S::Op) -> Result<()> { - self.check_recent_sync().await?; - debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( @@ -276,7 +289,7 @@ impl<S: BayouState> Bayou<S> { // Clear previously saved state in history if not required let hlen = self.history.len(); - if hlen >= 2 && (hlen - 1) % SAVE_STATE_EVERY != 0 { + if hlen >= 2 && (hlen - 1) % KEEP_STATE_EVERY != 0 { self.history[hlen - 2].2 = None; } @@ -288,7 +301,7 @@ impl<S: BayouState> Bayou<S> { /// Save a new checkpoint if previous checkpoint is too old pub async fn checkpoint(&mut self) -> Result<()> { match self.last_try_checkpoint { - Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()), + Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 5 => Ok(()), _ => { let res = self.checkpoint_internal().await; if res.is_ok() { @@ -300,7 +313,7 @@ impl<S: BayouState> Bayou<S> { } async fn checkpoint_internal(&mut self) -> Result<()> { - self.check_recent_sync().await?; + self.sync().await?; // Check what would be the possible time for a checkpoint in the history we have let now = now_msec() as i128; |