diff options
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 56203eb..c9ae67f 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,6 +1,7 @@ use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; +use log::debug; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; @@ -76,14 +77,14 @@ impl<S: BayouState> Bayou<S> { pub async fn sync(&mut self) -> Result<()> { // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; - eprintln!("(sync) listed checkpoints: {:?}", checkpoints); + debug!("(sync) listed checkpoints: {:?}", checkpoints); // 2. Load last checkpoint if different from currently used one let checkpoint = if let Some((ts, key)) = checkpoints.last() { if *ts == self.checkpoint.0 { (*ts, None) } else { - eprintln!("(sync) loading checkpoint: {}", key); + debug!("(sync) loading checkpoint: {}", key); let mut gor = GetObjectRequest::default(); gor.bucket = self.bucket.clone(); @@ -94,7 +95,7 @@ impl<S: BayouState> Bayou<S> { let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); obj_body.into_async_read().read_to_end(&mut buf).await?; - eprintln!("(sync) checkpoint body length: {}", buf.len()); + debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; (*ts, Some(ck)) @@ -108,7 +109,7 @@ impl<S: BayouState> Bayou<S> { } if let Some(ck) = checkpoint.1 { - eprintln!( + debug!( "(sync) updating checkpoint to loaded state at {:?}", checkpoint.0 ); @@ -123,7 +124,7 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.serialize(); - eprintln!("(sync) looking up operations starting at {}", ts_ser); + debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .k2v .read_batch(&[BatchReadOp { @@ -155,7 +156,7 @@ impl<S: BayouState> Bayou<S> { match &val.value[0] { K2vValue::Value(v) => { let op = open_deserialize::<S::Op>(&v, &self.key)?; - eprintln!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); + debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); ops.push((ts, op)); } K2vValue::Tombstone => { @@ -164,7 +165,7 @@ impl<S: BayouState> Bayou<S> { } } ops.sort_by_key(|(ts, _)| *ts); - eprintln!("(sync) {} operations", ops.len()); + debug!("(sync) {} operations", ops.len()); if ops.len() < self.history.len() { bail!("Some operations have disappeared from storage!"); @@ -239,7 +240,7 @@ impl<S: BayouState> Bayou<S> { pub async fn push(&mut self, op: S::Op) -> Result<()> { self.check_recent_sync().await?; - eprintln!("(push) add operation: {:?}", op); + debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( self.history @@ -302,18 +303,18 @@ impl<S: BayouState> Bayou<S> { { Some(i) => i, None => { - eprintln!("(cp) Oldest operation is too recent to trigger checkpoint"); + debug!("(cp) Oldest operation is too recent to trigger checkpoint"); return Ok(()); } }; if i_cp < CHECKPOINT_MIN_OPS { - eprintln!("(cp) Not enough old operations to trigger checkpoint"); + debug!("(cp) Not enough old operations to trigger checkpoint"); return Ok(()); } let ts_cp = self.history[i_cp].0; - eprintln!( + debug!( "(cp) we could checkpoint at time {} (index {} in history)", ts_cp.serialize(), i_cp @@ -321,13 +322,13 @@ impl<S: BayouState> Bayou<S> { // Check existing checkpoints: if last one is too recent, don't checkpoint again. let existing_checkpoints = self.list_checkpoints().await?; - eprintln!("(cp) listed checkpoints: {:?}", existing_checkpoints); + debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); if let Some(last_cp) = existing_checkpoints.last() { if (ts_cp.msec as i128 - last_cp.0.msec as i128) < CHECKPOINT_INTERVAL.as_millis() as i128 { - eprintln!( + debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", last_cp.0.serialize() ); @@ -335,7 +336,7 @@ impl<S: BayouState> Bayou<S> { } } - eprintln!("(cp) saving checkpoint at {}", ts_cp.serialize()); + debug!("(cp) saving checkpoint at {}", ts_cp.serialize()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -351,7 +352,7 @@ impl<S: BayouState> Bayou<S> { // Serialize and save checkpoint let cryptoblob = seal_serialize(&state_cp, &self.key)?; - eprintln!("(cp) checkpoint body length: {}", cryptoblob.len()); + debug!("(cp) checkpoint body length: {}", cryptoblob.len()); let mut por = PutObjectRequest::default(); por.bucket = self.bucket.clone(); @@ -366,7 +367,7 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { - eprintln!("(cp) drop old checkpoint {}", key); + debug!("(cp) drop old checkpoint {}", key); let mut dor = DeleteObjectRequest::default(); dor.bucket = self.bucket.clone(); dor.key = key.to_string(); |