aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs36
1 files changed, 31 insertions, 5 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index d77e9dc..14f9728 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -238,6 +238,22 @@ impl<S: BayouState> Bayou<S> {
Ok(())
}
+ pub async fn idle_sync(&mut self) -> Result<()> {
+ tracing::debug!("start idle_sync");
+ loop {
+ tracing::trace!("idle_sync loop");
+ let fut_notif = self.watch.learnt_remote_update.notified();
+
+ if self.last_sync_watch_ct != *self.watch.rx.borrow() {
+ break
+ }
+ fut_notif.await;
+ }
+ tracing::trace!("idle_sync done");
+ self.sync().await?;
+ Ok(())
+ }
+
/// Applies a new operation on the state. Once this function returns,
/// the operation has been safely persisted to storage backend.
/// Make sure to call `.opportunistic_sync()` before doing this,
@@ -257,7 +273,7 @@ impl<S: BayouState> Bayou<S> {
seal_serialize(&op, &self.key)?,
);
self.storage.row_insert(vec![row_val]).await?;
- self.watch.notify.notify_one();
+ self.watch.propagate_local_update.notify_one();
let new_state = self.state().apply(&op);
self.history.push((ts, op, Some(new_state)));
@@ -423,7 +439,8 @@ impl<S: BayouState> Bayou<S> {
struct K2vWatch {
target: storage::RowRef,
rx: watch::Receiver<storage::RowRef>,
- notify: Notify,
+ propagate_local_update: Notify,
+ learnt_remote_update: Notify,
}
impl K2vWatch {
@@ -434,9 +451,10 @@ impl K2vWatch {
let storage = creds.storage.build().await?;
let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
- let notify = Notify::new();
+ let propagate_local_update = Notify::new();
+ let learnt_remote_update = Notify::new();
- let watch = Arc::new(K2vWatch { target, rx, notify });
+ let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update });
tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
@@ -459,7 +477,12 @@ impl K2vWatch {
this.target.uid.shard, this.target.uid.sort
);
tokio::select!(
+ // Needed to exit: will force a loop iteration every minutes,
+ // that will stop the loop if other Arc references have been dropped
+ // and free resources. Otherwise we would be blocked waiting forever...
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
+
+ // Watch if another instance has modified the log
update = storage.row_poll(&row) => {
match update {
Err(e) => {
@@ -471,10 +494,13 @@ impl K2vWatch {
if tx.send(row.clone()).is_err() {
break;
}
+ this.learnt_remote_update.notify_waiters();
}
}
}
- _ = this.notify.notified() => {
+
+ // It appears we have modified the log, informing other people
+ _ = this.propagate_local_update.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
let row_val = storage::RowVal::new(row.clone(), rand);
if let Err(e) = storage.row_insert(vec![row_val]).await