diff options
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 36 |
1 files changed, 31 insertions, 5 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index d77e9dc..14f9728 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -238,6 +238,22 @@ impl<S: BayouState> Bayou<S> { Ok(()) } + pub async fn idle_sync(&mut self) -> Result<()> { + tracing::debug!("start idle_sync"); + loop { + tracing::trace!("idle_sync loop"); + let fut_notif = self.watch.learnt_remote_update.notified(); + + if self.last_sync_watch_ct != *self.watch.rx.borrow() { + break + } + fut_notif.await; + } + tracing::trace!("idle_sync done"); + self.sync().await?; + Ok(()) + } + /// Applies a new operation on the state. Once this function returns, /// the operation has been safely persisted to storage backend. /// Make sure to call `.opportunistic_sync()` before doing this, @@ -257,7 +273,7 @@ impl<S: BayouState> Bayou<S> { seal_serialize(&op, &self.key)?, ); self.storage.row_insert(vec![row_val]).await?; - self.watch.notify.notify_one(); + self.watch.propagate_local_update.notify_one(); let new_state = self.state().apply(&op); self.history.push((ts, op, Some(new_state))); @@ -423,7 +439,8 @@ impl<S: BayouState> Bayou<S> { struct K2vWatch { target: storage::RowRef, rx: watch::Receiver<storage::RowRef>, - notify: Notify, + propagate_local_update: Notify, + learnt_remote_update: Notify, } impl K2vWatch { @@ -434,9 +451,10 @@ impl K2vWatch { let storage = creds.storage.build().await?; let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); - let notify = Notify::new(); + let propagate_local_update = Notify::new(); + let learnt_remote_update = Notify::new(); - let watch = Arc::new(K2vWatch { target, rx, notify }); + let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); @@ -459,7 +477,12 @@ impl K2vWatch { this.target.uid.shard, this.target.uid.sort ); tokio::select!( + // Needed to exit: will force a loop iteration every minutes, + // that will stop the loop if other Arc references have been dropped + // and free resources. Otherwise we would be blocked waiting forever... _ = tokio::time::sleep(Duration::from_secs(60)) => continue, + + // Watch if another instance has modified the log update = storage.row_poll(&row) => { match update { Err(e) => { @@ -471,10 +494,13 @@ impl K2vWatch { if tx.send(row.clone()).is_err() { break; } + this.learnt_remote_update.notify_waiters(); } } } - _ = this.notify.notified() => { + + // It appears we have modified the log, informing other people + _ = this.propagate_local_update.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); let row_val = storage::RowVal::new(row.clone(), rand); if let Err(e) = storage.row_insert(vec![row_val]).await |