From 185033c462b92854117bc57258bf33b3579a7ca5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 18 Jan 2024 17:33:57 +0100 Subject: idling works!!! --- src/bayou.rs | 68 +++++++++++++++++++------------------------- src/imap/command/selected.rs | 2 +- src/imap/flow.rs | 11 +++---- src/imap/mailbox_view.rs | 6 ++++ src/imap/session.rs | 38 ++++++++++++++++--------- src/mail/mailbox.rs | 9 +++--- 6 files changed, 72 insertions(+), 62 deletions(-) diff --git a/src/bayou.rs b/src/bayou.rs index 14f9728..1c157e6 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; -use log::{debug, error, info}; +use log::error; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::sync::{watch, Notify}; @@ -84,21 +84,21 @@ impl Bayou { // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; - debug!("(sync) listed checkpoints: {:?}", checkpoints); + tracing::debug!("(sync) listed checkpoints: {:?}", checkpoints); // 2. Load last checkpoint if different from currently used one let checkpoint = if let Some((ts, key)) = checkpoints.last() { if *ts == self.checkpoint.0 { (*ts, None) } else { - debug!("(sync) loading checkpoint: {}", key); + tracing::debug!("(sync) loading checkpoint: {}", key); let buf = self .storage .blob_fetch(&storage::BlobRef(key.to_string())) .await? .value; - debug!("(sync) checkpoint body length: {}", buf.len()); + tracing::debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::(&buf, &self.key)?; (*ts, Some(ck)) @@ -112,7 +112,7 @@ impl Bayou { } if let Some(ck) = checkpoint.1 { - debug!( + tracing::debug!( "(sync) updating checkpoint to loaded state at {:?}", checkpoint.0 ); @@ -127,7 +127,7 @@ impl Bayou { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); - debug!("(sync) looking up operations starting at {}", ts_ser); + tracing::debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .storage .row_fetch(&storage::Selector::Range { @@ -161,7 +161,7 @@ impl Bayou { } } ops.sort_by_key(|(ts, _)| *ts); - debug!("(sync) {} operations", ops.len()); + tracing::debug!("(sync) {} operations", ops.len()); if ops.len() < self.history.len() { bail!("Some operations have disappeared from storage!"); @@ -238,20 +238,8 @@ impl Bayou { Ok(()) } - pub async fn idle_sync(&mut self) -> Result<()> { - tracing::debug!("start idle_sync"); - loop { - tracing::trace!("idle_sync loop"); - let fut_notif = self.watch.learnt_remote_update.notified(); - - if self.last_sync_watch_ct != *self.watch.rx.borrow() { - break - } - fut_notif.await; - } - tracing::trace!("idle_sync done"); - self.sync().await?; - Ok(()) + pub fn notifier(&self) -> std::sync::Weak { + Arc::downgrade(&self.watch.learnt_remote_update) } /// Applies a new operation on the state. Once this function returns, @@ -259,7 +247,7 @@ impl Bayou { /// Make sure to call `.opportunistic_sync()` before doing this, /// and even before calculating the `op` argument given here. pub async fn push(&mut self, op: S::Op) -> Result<()> { - debug!("(push) add operation: {:?}", op); + tracing::debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( self.history @@ -321,18 +309,18 @@ impl Bayou { { Some(i) => i, None => { - debug!("(cp) Oldest operation is too recent to trigger checkpoint"); + tracing::debug!("(cp) Oldest operation is too recent to trigger checkpoint"); return Ok(()); } }; if i_cp < CHECKPOINT_MIN_OPS { - debug!("(cp) Not enough old operations to trigger checkpoint"); + tracing::debug!("(cp) Not enough old operations to trigger checkpoint"); return Ok(()); } let ts_cp = self.history[i_cp].0; - debug!( + tracing::debug!( "(cp) we could checkpoint at time {} (index {} in history)", ts_cp.to_string(), i_cp @@ -340,13 +328,13 @@ impl Bayou { // Check existing checkpoints: if last one is too recent, don't checkpoint again. let existing_checkpoints = self.list_checkpoints().await?; - debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); + tracing::debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); if let Some(last_cp) = existing_checkpoints.last() { if (ts_cp.msec as i128 - last_cp.0.msec as i128) < CHECKPOINT_INTERVAL.as_millis() as i128 { - debug!( + tracing::debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", last_cp.0.to_string() ); @@ -354,7 +342,7 @@ impl Bayou { } } - debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); + tracing::debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -370,7 +358,7 @@ impl Bayou { // Serialize and save checkpoint let cryptoblob = seal_serialize(&state_cp, &self.key)?; - debug!("(cp) checkpoint body length: {}", cryptoblob.len()); + tracing::debug!("(cp) checkpoint body length: {}", cryptoblob.len()); let blob_val = storage::BlobVal::new( storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), @@ -385,7 +373,7 @@ impl Bayou { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { - debug!("(cp) drop old checkpoint {}", key); + tracing::debug!("(cp) drop old checkpoint {}", key); self.storage .blob_rm(&storage::BlobRef(key.to_string())) .await?; @@ -440,7 +428,7 @@ struct K2vWatch { target: storage::RowRef, rx: watch::Receiver, propagate_local_update: Notify, - learnt_remote_update: Notify, + learnt_remote_update: Arc, } impl K2vWatch { @@ -452,7 +440,7 @@ impl K2vWatch { let (tx, rx) = watch::channel::(target.clone()); let propagate_local_update = Notify::new(); - let learnt_remote_update = Notify::new(); + let learnt_remote_update = Arc::new(Notify::new()); let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); @@ -466,13 +454,13 @@ impl K2vWatch { storage: storage::Store, tx: watch::Sender, ) { - let mut row = match Weak::upgrade(&self_weak) { - Some(this) => this.target.clone(), + let (mut row, remote_update) = match Weak::upgrade(&self_weak) { + Some(this) => (this.target.clone(), this.learnt_remote_update.clone()), None => return, }; while let Some(this) = Weak::upgrade(&self_weak) { - debug!( + tracing::debug!( "bayou k2v watch bg loop iter ({}, {})", this.target.uid.shard, this.target.uid.sort ); @@ -491,9 +479,11 @@ impl K2vWatch { } Ok(new_value) => { row = new_value.row_ref; - if tx.send(row.clone()).is_err() { + if let Err(e) = tx.send(row.clone()) { + tracing::warn!(err=?e, "(watch) can't record the new log ref"); break; } + tracing::debug!(row=?row, "(watch) learnt remote update"); this.learnt_remote_update.notify_waiters(); } } @@ -505,12 +495,14 @@ impl K2vWatch { let row_val = storage::RowVal::new(row.clone(), rand); if let Err(e) = storage.row_insert(vec![row_val]).await { - error!("Error in bayou k2v watch updater loop: {}", e); + tracing::error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } } ); } - info!("bayou k2v watch bg loop exiting"); + // unblock listeners + remote_update.notify_waiters(); + tracing::info!("bayou k2v watch bg loop exiting"); } } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 4eb4e61..ca2e268 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -83,7 +83,7 @@ pub async fn dispatch<'a>( CommandBody::Idle => { Ok(( Response::build().to_req(ctx.req).message("DUMMY command due to anti-pattern in the code").ok()?, - flow::Transition::Idle(tokio::sync::Notify::new()), + flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), )) } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 37f225b..72d9e8e 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -3,6 +3,7 @@ use std::fmt; use std::sync::Arc; use tokio::sync::Notify; +use imap_codec::imap_types::core::Tag; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; @@ -21,7 +22,7 @@ pub enum State { NotAuthenticated, Authenticated(Arc), Selected(Arc, MailboxView, MailboxPerm), - Idle(Arc, MailboxView, MailboxPerm, Arc), + Idle(Arc, MailboxView, MailboxPerm, Tag<'static>, Arc), Logout, } @@ -35,7 +36,7 @@ pub enum Transition { None, Authenticate(Arc), Select(MailboxView, MailboxPerm), - Idle(Notify), + Idle(Tag<'static>, Notify), UnIdle, Unselect, Logout, @@ -55,10 +56,10 @@ impl State { (State::Selected(u, _, _) , Transition::Unselect) => { State::Authenticated(u.clone()) } - (State::Selected(u, m, p), Transition::Idle(s)) => { - State::Idle(u, m, p, Arc::new(s)) + (State::Selected(u, m, p), Transition::Idle(t, s)) => { + State::Idle(u, m, p, t, Arc::new(s)) }, - (State::Idle(u, m, p, _), Transition::UnIdle) => { + (State::Idle(u, m, p, _, _), Transition::UnIdle) => { State::Selected(u, m, p) }, (_, Transition::Logout) => State::Logout, diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 07fa3ad..85a4961 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -224,6 +224,12 @@ impl MailboxView { Ok((summary, conflict_id_or_uid)) } + pub async fn idle_sync(&mut self) -> Result>> { + self.internal.mailbox.notify().await.upgrade().ok_or(anyhow!("test"))?.notified().await; + self.internal.mailbox.opportunistic_sync().await?; + self.update(UpdateParameters::default()).await + } + pub async fn expunge(&mut self) -> Result>> { self.internal.sync().await?; let state = self.internal.peek().await; diff --git a/src/imap/session.rs b/src/imap/session.rs index 1d473ed..f4e3d0f 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::{Result, anyhow, bail}; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anonymous, authenticated, selected}; use crate::imap::flow; @@ -27,28 +27,40 @@ impl Instance { pub async fn request(&mut self, req: Request) -> ResponseOrIdle { match req { - Request::Idle => ResponseOrIdle::Response(self.idle().await), + Request::Idle => self.idle().await, Request::ImapCommand(cmd) => self.command(cmd).await, } } - pub async fn idle(&mut self) -> Response<'static> { - let (user, mbx, perm, stop) = match &mut self.state { - flow::State::Idle(ref user, ref mut mailbox, ref perm, ref stop) => (user, mailbox, perm, stop), - _ => unreachable!(), + pub async fn idle(&mut self) -> ResponseOrIdle { + match self.idle_happy().await { + Ok(r) => r, + Err(e) => { + tracing::error!(err=?e, "something bad happened in idle"); + ResponseOrIdle::Response(Response::bye().unwrap()) + } + } + } + + pub async fn idle_happy(&mut self) -> Result { + let (mbx, tag, stop) = match &mut self.state { + flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()), + _ => bail!("Invalid session state, can't idle"), }; tokio::select! { _ = stop.notified() => { - return Response::build() - .tag(imap_codec::imap_types::core::Tag::try_from("FIXME").unwrap()) + self.state.apply(flow::Transition::UnIdle)?; + return Ok(ResponseOrIdle::Response(Response::build() + .tag(tag.clone()) .message("IDLE completed") - .ok() - .unwrap() + .ok()?)) + }, + change = mbx.idle_sync() => { + tracing::debug!("idle event"); + return Ok(ResponseOrIdle::IdleEvent(change?)); } } - - unimplemented!(); } @@ -119,7 +131,7 @@ impl Instance { } match &self.state { - flow::State::Idle(_, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), + flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), _ => ResponseOrIdle::Response(resp), } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 4310a73..c20d815 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -68,8 +68,8 @@ impl Mailbox { } /// Block until a sync has been done (due to changes in the event log) - pub async fn idle_sync(&self) -> Result<()> { - self.mbox.write().await.idle_sync().await + pub async fn notify(&self) -> std::sync::Weak { + self.mbox.read().await.notifier() } // ---- Functions for reading the mailbox ---- @@ -204,9 +204,8 @@ impl MailboxInternal { Ok(()) } - async fn idle_sync(&mut self) -> Result<()> { - self.uid_index.idle_sync().await?; - Ok(()) + fn notifier(&self) -> std::sync::Weak { + self.uid_index.notifier() } // ---- Functions for reading the mailbox ---- -- cgit v1.2.3