diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-13 14:21:14 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-13 14:21:14 +0200 |
commit | cd59be3a007822637a8b48efe988fb052d58a756 (patch) | |
tree | 4f57f0720ed62e9c8704a6ca38a6a4c48b660f6d | |
parent | 9fa2e958b3b37538b80b7f26107b7df2238f335b (diff) | |
download | aerogramme-cd59be3a007822637a8b48efe988fb052d58a756.tar.gz aerogramme-cd59be3a007822637a8b48efe988fb052d58a756.zip |
Implement opportunistic sync based on watch value, and use it
-rw-r--r-- | src/bayou.rs | 41 | ||||
-rw-r--r-- | src/imap/command/examined.rs | 4 | ||||
-rw-r--r-- | src/imap/mailbox_view.rs | 18 | ||||
-rw-r--r-- | src/mail/mailbox.rs | 24 |
4 files changed, 57 insertions, 30 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index f6e0fb7..4d33a8e 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -19,12 +19,12 @@ use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; use crate::time::now_msec; -const SAVE_STATE_EVERY: usize = 64; +const KEEP_STATE_EVERY: usize = 64; // Checkpointing interval constants: a checkpoint is not made earlier // than CHECKPOINT_INTERVAL time after the last one, and is not made // if there are less than CHECKPOINT_MIN_OPS new operations since last one. -const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(3600); +const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(6 * 3600); const CHECKPOINT_MIN_OPS: usize = 16; // HYPOTHESIS: processes are able to communicate in a synchronous // fashion in times that are small compared to CHECKPOINT_INTERVAL. @@ -89,6 +89,9 @@ impl<S: BayouState> Bayou<S> { /// Re-reads the state from persistent storage backend pub async fn sync(&mut self) -> Result<()> { + let new_last_sync = Some(Instant::now()); + let new_last_sync_watch_ct = self.watch.rx.borrow().clone(); + // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; debug!("(sync) listed checkpoints: {:?}", checkpoints); @@ -225,7 +228,7 @@ impl<S: BayouState> Bayou<S> { // Now, apply all operations retrieved from storage after the common part for (ts, op) in ops.drain(i0..) { state = state.apply(&op); - if (self.history.len() + 1) % SAVE_STATE_EVERY == 0 { + if (self.history.len() + 1) % KEEP_STATE_EVERY == 0 { self.history.push((ts, op, Some(state.clone()))); } else { self.history.push((ts, op, None)); @@ -236,22 +239,32 @@ impl<S: BayouState> Bayou<S> { self.history.last_mut().unwrap().2 = Some(state); } - self.last_sync = Some(Instant::now()); + // Save info that sync has been done + self.last_sync = new_last_sync; + self.last_sync_watch_ct = new_last_sync_watch_ct; Ok(()) } - async fn check_recent_sync(&mut self) -> Result<()> { - match self.last_sync { - Some(t) if (Instant::now() - t) < CHECKPOINT_INTERVAL / 10 => Ok(()), - _ => self.sync().await, + /// Does a sync() if either of the two conditions is met: + /// - last sync was more than CHECKPOINT_INTERVAL/5 ago + /// - a change was detected + pub async fn opportunistic_sync(&mut self) -> Result<()> { + let too_old = match self.last_sync { + Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5), + _ => true, + }; + let changed = self.last_sync_watch_ct != *self.watch.rx.borrow(); + if too_old || changed { + self.sync().await?; } + Ok(()) } /// Applies a new operation on the state. Once this function returns, - /// the option has been safely persisted to storage backend + /// the operation has been safely persisted to storage backend. + /// Make sure to call `.opportunistic_sync()` before doing this, + /// and even before calculating the `op` argument given here. pub async fn push(&mut self, op: S::Op) -> Result<()> { - self.check_recent_sync().await?; - debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( @@ -276,7 +289,7 @@ impl<S: BayouState> Bayou<S> { // Clear previously saved state in history if not required let hlen = self.history.len(); - if hlen >= 2 && (hlen - 1) % SAVE_STATE_EVERY != 0 { + if hlen >= 2 && (hlen - 1) % KEEP_STATE_EVERY != 0 { self.history[hlen - 2].2 = None; } @@ -288,7 +301,7 @@ impl<S: BayouState> Bayou<S> { /// Save a new checkpoint if previous checkpoint is too old pub async fn checkpoint(&mut self) -> Result<()> { match self.last_try_checkpoint { - Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()), + Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 5 => Ok(()), _ => { let res = self.checkpoint_internal().await; if res.is_ok() { @@ -300,7 +313,7 @@ impl<S: BayouState> Bayou<S> { } async fn checkpoint_internal(&mut self) -> Result<()> { - self.check_recent_sync().await?; + self.sync().await?; // Check what would be the possible time for a checkpoint in the history we have let now = now_msec() as i128; diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs index 94e8e88..8da68f8 100644 --- a/src/imap/command/examined.rs +++ b/src/imap/command/examined.rs @@ -87,7 +87,9 @@ impl<'a> ExaminedContext<'a> { } pub async fn noop(self) -> Result<(Response, flow::Transition)> { - let updates = self.mailbox.sync_update().await?; + self.mailbox.mailbox.force_sync().await?; + + let updates = self.mailbox.update().await?; Ok(( Response::ok("NOOP completed.")?.with_body(updates), flow::Transition::None, diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 5debcb9..9941428 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -68,16 +68,10 @@ impl MailboxView { Ok((new_view, data)) } - /// Looks up state changes in the mailbox and produces a set of IMAP - /// responses describing the changes. - pub async fn sync_update(&mut self) -> Result<Vec<Body>> { - self.mailbox.sync().await?; - - self.update().await - } - /// Produces a set of IMAP responses describing the change between /// what the client knows and what is actually in the mailbox. + /// This does NOT trigger a sync, it bases itself on what is currently + /// loaded in RAM by Bayou. pub async fn update(&mut self) -> Result<Vec<Body>> { let new_view = MailboxView { mailbox: self.mailbox.clone(), @@ -156,6 +150,8 @@ impl MailboxView { flags: &[Flag], uid: &bool, ) -> Result<Vec<Body>> { + self.mailbox.opportunistic_sync().await?; + if *uid { bail!("UID STORE not implemented"); } @@ -181,9 +177,11 @@ impl MailboxView { } pub async fn expunge(&mut self) -> Result<Vec<Body>> { + self.mailbox.opportunistic_sync().await?; + let deleted_flag = Flag::Deleted.to_string(); - let msgs = self - .known_state + let state = self.mailbox.current_uid_index().await; + let msgs = state .table .iter() .filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag)) diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 4b84cf2..19a95e0 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -60,8 +60,14 @@ impl Mailbox { } /// Sync data with backing store - pub async fn sync(&self) -> Result<()> { - self.mbox.write().await.sync().await + pub async fn force_sync(&self) -> Result<()> { + self.mbox.write().await.force_sync().await + } + + /// Sync data with backing store only if changes are detected + /// or last sync is too old + pub async fn opportunistic_sync(&self) -> Result<()> { + self.mbox.write().await.opportunistic_sync().await } // ---- Functions for reading the mailbox ---- @@ -184,11 +190,16 @@ struct MailboxInternal { } impl MailboxInternal { - async fn sync(&mut self) -> Result<()> { + async fn force_sync(&mut self) -> Result<()> { self.uid_index.sync().await?; Ok(()) } + async fn opportunistic_sync(&mut self) -> Result<()> { + self.uid_index.opportunistic_sync().await?; + Ok(()) + } + // ---- Functions for reading the mailbox ---- async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { @@ -308,7 +319,8 @@ impl MailboxInternal { .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) .await?; Ok::<_, anyhow::Error>(()) - } + }, + self.uid_index.opportunistic_sync() )?; // Add mail to Bayou mail index @@ -357,7 +369,8 @@ impl MailboxInternal { .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) .await?; Ok::<_, anyhow::Error>(()) - } + }, + self.uid_index.opportunistic_sync() )?; // Add mail to Bayou mail index @@ -449,6 +462,7 @@ impl MailboxInternal { .await?; Ok::<_, anyhow::Error>(()) }, + self.uid_index.opportunistic_sync(), )?; // Add mail to Bayou mail index |