diff options
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 80 |
1 files changed, 52 insertions, 28 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index d77e9dc..9faff5a 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; -use log::{debug, error, info}; +use log::error; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::sync::{watch, Notify}; @@ -84,21 +84,21 @@ impl<S: BayouState> Bayou<S> { // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; - debug!("(sync) listed checkpoints: {:?}", checkpoints); + tracing::debug!("(sync) listed checkpoints: {:?}", checkpoints); // 2. Load last checkpoint if different from currently used one let checkpoint = if let Some((ts, key)) = checkpoints.last() { if *ts == self.checkpoint.0 { (*ts, None) } else { - debug!("(sync) loading checkpoint: {}", key); + tracing::debug!("(sync) loading checkpoint: {}", key); let buf = self .storage .blob_fetch(&storage::BlobRef(key.to_string())) .await? .value; - debug!("(sync) checkpoint body length: {}", buf.len()); + tracing::debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; (*ts, Some(ck)) @@ -112,7 +112,7 @@ impl<S: BayouState> Bayou<S> { } if let Some(ck) = checkpoint.1 { - debug!( + tracing::debug!( "(sync) updating checkpoint to loaded state at {:?}", checkpoint.0 ); @@ -127,7 +127,7 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); - debug!("(sync) looking up operations starting at {}", ts_ser); + tracing::debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .storage .row_fetch(&storage::Selector::Range { @@ -161,7 +161,7 @@ impl<S: BayouState> Bayou<S> { } } ops.sort_by_key(|(ts, _)| *ts); - debug!("(sync) {} operations", ops.len()); + tracing::debug!("(sync) {} operations", ops.len()); if ops.len() < self.history.len() { bail!("Some operations have disappeared from storage!"); @@ -238,12 +238,16 @@ impl<S: BayouState> Bayou<S> { Ok(()) } + pub fn notifier(&self) -> std::sync::Weak<Notify> { + Arc::downgrade(&self.watch.learnt_remote_update) + } + /// Applies a new operation on the state. Once this function returns, /// the operation has been safely persisted to storage backend. /// Make sure to call `.opportunistic_sync()` before doing this, /// and even before calculating the `op` argument given here. pub async fn push(&mut self, op: S::Op) -> Result<()> { - debug!("(push) add operation: {:?}", op); + tracing::debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( self.history @@ -257,7 +261,7 @@ impl<S: BayouState> Bayou<S> { seal_serialize(&op, &self.key)?, ); self.storage.row_insert(vec![row_val]).await?; - self.watch.notify.notify_one(); + self.watch.propagate_local_update.notify_one(); let new_state = self.state().apply(&op); self.history.push((ts, op, Some(new_state))); @@ -305,18 +309,18 @@ impl<S: BayouState> Bayou<S> { { Some(i) => i, None => { - debug!("(cp) Oldest operation is too recent to trigger checkpoint"); + tracing::debug!("(cp) Oldest operation is too recent to trigger checkpoint"); return Ok(()); } }; if i_cp < CHECKPOINT_MIN_OPS { - debug!("(cp) Not enough old operations to trigger checkpoint"); + tracing::debug!("(cp) Not enough old operations to trigger checkpoint"); return Ok(()); } let ts_cp = self.history[i_cp].0; - debug!( + tracing::debug!( "(cp) we could checkpoint at time {} (index {} in history)", ts_cp.to_string(), i_cp @@ -324,13 +328,13 @@ impl<S: BayouState> Bayou<S> { // Check existing checkpoints: if last one is too recent, don't checkpoint again. let existing_checkpoints = self.list_checkpoints().await?; - debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); + tracing::debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); if let Some(last_cp) = existing_checkpoints.last() { if (ts_cp.msec as i128 - last_cp.0.msec as i128) < CHECKPOINT_INTERVAL.as_millis() as i128 { - debug!( + tracing::debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", last_cp.0.to_string() ); @@ -338,7 +342,7 @@ impl<S: BayouState> Bayou<S> { } } - debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); + tracing::debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -354,7 +358,7 @@ impl<S: BayouState> Bayou<S> { // Serialize and save checkpoint let cryptoblob = seal_serialize(&state_cp, &self.key)?; - debug!("(cp) checkpoint body length: {}", cryptoblob.len()); + tracing::debug!("(cp) checkpoint body length: {}", cryptoblob.len()); let blob_val = storage::BlobVal::new( storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), @@ -369,7 +373,7 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { - debug!("(cp) drop old checkpoint {}", key); + tracing::debug!("(cp) drop old checkpoint {}", key); self.storage .blob_rm(&storage::BlobRef(key.to_string())) .await?; @@ -423,7 +427,8 @@ impl<S: BayouState> Bayou<S> { struct K2vWatch { target: storage::RowRef, rx: watch::Receiver<storage::RowRef>, - notify: Notify, + propagate_local_update: Notify, + learnt_remote_update: Arc<Notify>, } impl K2vWatch { @@ -434,9 +439,15 @@ impl K2vWatch { let storage = creds.storage.build().await?; let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); - let notify = Notify::new(); + let propagate_local_update = Notify::new(); + let learnt_remote_update = Arc::new(Notify::new()); - let watch = Arc::new(K2vWatch { target, rx, notify }); + let watch = Arc::new(K2vWatch { + target, + rx, + propagate_local_update, + learnt_remote_update, + }); tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); @@ -448,18 +459,24 @@ impl K2vWatch { storage: storage::Store, tx: watch::Sender<storage::RowRef>, ) { - let mut row = match Weak::upgrade(&self_weak) { - Some(this) => this.target.clone(), + let (mut row, remote_update) = match Weak::upgrade(&self_weak) { + Some(this) => (this.target.clone(), this.learnt_remote_update.clone()), None => return, }; while let Some(this) = Weak::upgrade(&self_weak) { - debug!( + tracing::debug!( "bayou k2v watch bg loop iter ({}, {})", - this.target.uid.shard, this.target.uid.sort + this.target.uid.shard, + this.target.uid.sort ); tokio::select!( + // Needed to exit: will force a loop iteration every minutes, + // that will stop the loop if other Arc references have been dropped + // and free resources. Otherwise we would be blocked waiting forever... _ = tokio::time::sleep(Duration::from_secs(60)) => continue, + + // Watch if another instance has modified the log update = storage.row_poll(&row) => { match update { Err(e) => { @@ -468,23 +485,30 @@ impl K2vWatch { } Ok(new_value) => { row = new_value.row_ref; - if tx.send(row.clone()).is_err() { + if let Err(e) = tx.send(row.clone()) { + tracing::warn!(err=?e, "(watch) can't record the new log ref"); break; } + tracing::debug!(row=?row, "(watch) learnt remote update"); + this.learnt_remote_update.notify_waiters(); } } } - _ = this.notify.notified() => { + + // It appears we have modified the log, informing other people + _ = this.propagate_local_update.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); let row_val = storage::RowVal::new(row.clone(), rand); if let Err(e) = storage.row_insert(vec![row_val]).await { - error!("Error in bayou k2v watch updater loop: {}", e); + tracing::error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } } ); } - info!("bayou k2v watch bg loop exiting"); + // unblock listeners + remote_update.notify_waiters(); + tracing::info!("bayou k2v watch bg loop exiting"); } } |