aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs33
1 files changed, 17 insertions, 16 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 56203eb..c9ae67f 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -1,6 +1,7 @@
use std::time::{Duration, Instant};
use anyhow::{anyhow, bail, Result};
+use log::debug;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
@@ -76,14 +77,14 @@ impl<S: BayouState> Bayou<S> {
pub async fn sync(&mut self) -> Result<()> {
// 1. List checkpoints
let checkpoints = self.list_checkpoints().await?;
- eprintln!("(sync) listed checkpoints: {:?}", checkpoints);
+ debug!("(sync) listed checkpoints: {:?}", checkpoints);
// 2. Load last checkpoint if different from currently used one
let checkpoint = if let Some((ts, key)) = checkpoints.last() {
if *ts == self.checkpoint.0 {
(*ts, None)
} else {
- eprintln!("(sync) loading checkpoint: {}", key);
+ debug!("(sync) loading checkpoint: {}", key);
let mut gor = GetObjectRequest::default();
gor.bucket = self.bucket.clone();
@@ -94,7 +95,7 @@ impl<S: BayouState> Bayou<S> {
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
obj_body.into_async_read().read_to_end(&mut buf).await?;
- eprintln!("(sync) checkpoint body length: {}", buf.len());
+ debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?;
(*ts, Some(ck))
@@ -108,7 +109,7 @@ impl<S: BayouState> Bayou<S> {
}
if let Some(ck) = checkpoint.1 {
- eprintln!(
+ debug!(
"(sync) updating checkpoint to loaded state at {:?}",
checkpoint.0
);
@@ -123,7 +124,7 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.serialize();
- eprintln!("(sync) looking up operations starting at {}", ts_ser);
+ debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self
.k2v
.read_batch(&[BatchReadOp {
@@ -155,7 +156,7 @@ impl<S: BayouState> Bayou<S> {
match &val.value[0] {
K2vValue::Value(v) => {
let op = open_deserialize::<S::Op>(&v, &self.key)?;
- eprintln!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op);
+ debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op);
ops.push((ts, op));
}
K2vValue::Tombstone => {
@@ -164,7 +165,7 @@ impl<S: BayouState> Bayou<S> {
}
}
ops.sort_by_key(|(ts, _)| *ts);
- eprintln!("(sync) {} operations", ops.len());
+ debug!("(sync) {} operations", ops.len());
if ops.len() < self.history.len() {
bail!("Some operations have disappeared from storage!");
@@ -239,7 +240,7 @@ impl<S: BayouState> Bayou<S> {
pub async fn push(&mut self, op: S::Op) -> Result<()> {
self.check_recent_sync().await?;
- eprintln!("(push) add operation: {:?}", op);
+ debug!("(push) add operation: {:?}", op);
let ts = Timestamp::after(
self.history
@@ -302,18 +303,18 @@ impl<S: BayouState> Bayou<S> {
{
Some(i) => i,
None => {
- eprintln!("(cp) Oldest operation is too recent to trigger checkpoint");
+ debug!("(cp) Oldest operation is too recent to trigger checkpoint");
return Ok(());
}
};
if i_cp < CHECKPOINT_MIN_OPS {
- eprintln!("(cp) Not enough old operations to trigger checkpoint");
+ debug!("(cp) Not enough old operations to trigger checkpoint");
return Ok(());
}
let ts_cp = self.history[i_cp].0;
- eprintln!(
+ debug!(
"(cp) we could checkpoint at time {} (index {} in history)",
ts_cp.serialize(),
i_cp
@@ -321,13 +322,13 @@ impl<S: BayouState> Bayou<S> {
// Check existing checkpoints: if last one is too recent, don't checkpoint again.
let existing_checkpoints = self.list_checkpoints().await?;
- eprintln!("(cp) listed checkpoints: {:?}", existing_checkpoints);
+ debug!("(cp) listed checkpoints: {:?}", existing_checkpoints);
if let Some(last_cp) = existing_checkpoints.last() {
if (ts_cp.msec as i128 - last_cp.0.msec as i128)
< CHECKPOINT_INTERVAL.as_millis() as i128
{
- eprintln!(
+ debug!(
"(cp) last checkpoint is too recent: {}, not checkpointing",
last_cp.0.serialize()
);
@@ -335,7 +336,7 @@ impl<S: BayouState> Bayou<S> {
}
}
- eprintln!("(cp) saving checkpoint at {}", ts_cp.serialize());
+ debug!("(cp) saving checkpoint at {}", ts_cp.serialize());
// Calculate state at time of checkpoint
let mut last_known_state = (0, &self.checkpoint.1);
@@ -351,7 +352,7 @@ impl<S: BayouState> Bayou<S> {
// Serialize and save checkpoint
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
- eprintln!("(cp) checkpoint body length: {}", cryptoblob.len());
+ debug!("(cp) checkpoint body length: {}", cryptoblob.len());
let mut por = PutObjectRequest::default();
por.bucket = self.bucket.clone();
@@ -366,7 +367,7 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
- eprintln!("(cp) drop old checkpoint {}", key);
+ debug!("(cp) drop old checkpoint {}", key);
let mut dor = DeleteObjectRequest::default();
dor.bucket = self.bucket.clone();
dor.key = key.to_string();