diff options
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 68 |
1 files changed, 30 insertions, 38 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 14f9728..1c157e6 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; -use log::{debug, error, info}; +use log::error; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::sync::{watch, Notify}; @@ -84,21 +84,21 @@ impl<S: BayouState> Bayou<S> { // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; - debug!("(sync) listed checkpoints: {:?}", checkpoints); + tracing::debug!("(sync) listed checkpoints: {:?}", checkpoints); // 2. Load last checkpoint if different from currently used one let checkpoint = if let Some((ts, key)) = checkpoints.last() { if *ts == self.checkpoint.0 { (*ts, None) } else { - debug!("(sync) loading checkpoint: {}", key); + tracing::debug!("(sync) loading checkpoint: {}", key); let buf = self .storage .blob_fetch(&storage::BlobRef(key.to_string())) .await? .value; - debug!("(sync) checkpoint body length: {}", buf.len()); + tracing::debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; (*ts, Some(ck)) @@ -112,7 +112,7 @@ impl<S: BayouState> Bayou<S> { } if let Some(ck) = checkpoint.1 { - debug!( + tracing::debug!( "(sync) updating checkpoint to loaded state at {:?}", checkpoint.0 ); @@ -127,7 +127,7 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); - debug!("(sync) looking up operations starting at {}", ts_ser); + tracing::debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .storage .row_fetch(&storage::Selector::Range { @@ -161,7 +161,7 @@ impl<S: BayouState> Bayou<S> { } } ops.sort_by_key(|(ts, _)| *ts); - debug!("(sync) {} operations", ops.len()); + tracing::debug!("(sync) {} operations", ops.len()); if ops.len() < self.history.len() { bail!("Some operations have disappeared from storage!"); @@ -238,20 +238,8 @@ impl<S: BayouState> Bayou<S> { Ok(()) } - pub async fn idle_sync(&mut self) -> Result<()> { - tracing::debug!("start idle_sync"); - loop { - tracing::trace!("idle_sync loop"); - let fut_notif = self.watch.learnt_remote_update.notified(); - - if self.last_sync_watch_ct != *self.watch.rx.borrow() { - break - } - fut_notif.await; - } - tracing::trace!("idle_sync done"); - self.sync().await?; - Ok(()) + pub fn notifier(&self) -> std::sync::Weak<Notify> { + Arc::downgrade(&self.watch.learnt_remote_update) } /// Applies a new operation on the state. Once this function returns, @@ -259,7 +247,7 @@ impl<S: BayouState> Bayou<S> { /// Make sure to call `.opportunistic_sync()` before doing this, /// and even before calculating the `op` argument given here. pub async fn push(&mut self, op: S::Op) -> Result<()> { - debug!("(push) add operation: {:?}", op); + tracing::debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( self.history @@ -321,18 +309,18 @@ impl<S: BayouState> Bayou<S> { { Some(i) => i, None => { - debug!("(cp) Oldest operation is too recent to trigger checkpoint"); + tracing::debug!("(cp) Oldest operation is too recent to trigger checkpoint"); return Ok(()); } }; if i_cp < CHECKPOINT_MIN_OPS { - debug!("(cp) Not enough old operations to trigger checkpoint"); + tracing::debug!("(cp) Not enough old operations to trigger checkpoint"); return Ok(()); } let ts_cp = self.history[i_cp].0; - debug!( + tracing::debug!( "(cp) we could checkpoint at time {} (index {} in history)", ts_cp.to_string(), i_cp @@ -340,13 +328,13 @@ impl<S: BayouState> Bayou<S> { // Check existing checkpoints: if last one is too recent, don't checkpoint again. let existing_checkpoints = self.list_checkpoints().await?; - debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); + tracing::debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); if let Some(last_cp) = existing_checkpoints.last() { if (ts_cp.msec as i128 - last_cp.0.msec as i128) < CHECKPOINT_INTERVAL.as_millis() as i128 { - debug!( + tracing::debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", last_cp.0.to_string() ); @@ -354,7 +342,7 @@ impl<S: BayouState> Bayou<S> { } } - debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); + tracing::debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -370,7 +358,7 @@ impl<S: BayouState> Bayou<S> { // Serialize and save checkpoint let cryptoblob = seal_serialize(&state_cp, &self.key)?; - debug!("(cp) checkpoint body length: {}", cryptoblob.len()); + tracing::debug!("(cp) checkpoint body length: {}", cryptoblob.len()); let blob_val = storage::BlobVal::new( storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), @@ -385,7 +373,7 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { - debug!("(cp) drop old checkpoint {}", key); + tracing::debug!("(cp) drop old checkpoint {}", key); self.storage .blob_rm(&storage::BlobRef(key.to_string())) .await?; @@ -440,7 +428,7 @@ struct K2vWatch { target: storage::RowRef, rx: watch::Receiver<storage::RowRef>, propagate_local_update: Notify, - learnt_remote_update: Notify, + learnt_remote_update: Arc<Notify>, } impl K2vWatch { @@ -452,7 +440,7 @@ impl K2vWatch { let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); let propagate_local_update = Notify::new(); - let learnt_remote_update = Notify::new(); + let learnt_remote_update = Arc::new(Notify::new()); let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); @@ -466,13 +454,13 @@ impl K2vWatch { storage: storage::Store, tx: watch::Sender<storage::RowRef>, ) { - let mut row = match Weak::upgrade(&self_weak) { - Some(this) => this.target.clone(), + let (mut row, remote_update) = match Weak::upgrade(&self_weak) { + Some(this) => (this.target.clone(), this.learnt_remote_update.clone()), None => return, }; while let Some(this) = Weak::upgrade(&self_weak) { - debug!( + tracing::debug!( "bayou k2v watch bg loop iter ({}, {})", this.target.uid.shard, this.target.uid.sort ); @@ -491,9 +479,11 @@ impl K2vWatch { } Ok(new_value) => { row = new_value.row_ref; - if tx.send(row.clone()).is_err() { + if let Err(e) = tx.send(row.clone()) { + tracing::warn!(err=?e, "(watch) can't record the new log ref"); break; } + tracing::debug!(row=?row, "(watch) learnt remote update"); this.learnt_remote_update.notify_waiters(); } } @@ -505,12 +495,14 @@ impl K2vWatch { let row_val = storage::RowVal::new(row.clone(), rand); if let Err(e) = storage.row_insert(vec![row_val]).await { - error!("Error in bayou k2v watch updater loop: {}", e); + tracing::error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } } ); } - info!("bayou k2v watch bg loop exiting"); + // unblock listeners + remote_update.notify_waiters(); + tracing::info!("bayou k2v watch bg loop exiting"); } } |