aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs38
1 files changed, 23 insertions, 15 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index c9ae67f..7a76222 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -1,3 +1,4 @@
+use std::str::FromStr;
use std::time::{Duration, Instant};
use anyhow::{anyhow, bail, Result};
@@ -123,7 +124,7 @@ impl<S: BayouState> Bayou<S> {
.collect();
// 3. List all operations starting from checkpoint
- let ts_ser = self.checkpoint.0.serialize();
+ let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self
.k2v
@@ -148,8 +149,9 @@ impl<S: BayouState> Bayou<S> {
let mut ops = vec![];
for (tsstr, val) in ops_map {
- let ts = Timestamp::parse(&tsstr)
- .ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?;
+ let ts = tsstr
+ .parse::<Timestamp>()
+ .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?;
if val.value.len() != 1 {
bail!("Invalid operation, has {} values", val.value.len());
}
@@ -251,7 +253,7 @@ impl<S: BayouState> Bayou<S> {
self.k2v
.insert_item(
&self.path,
- &ts.serialize(),
+ &ts.to_string(),
seal_serialize(&op, &self.key)?,
None,
)
@@ -316,7 +318,7 @@ impl<S: BayouState> Bayou<S> {
let ts_cp = self.history[i_cp].0;
debug!(
"(cp) we could checkpoint at time {} (index {} in history)",
- ts_cp.serialize(),
+ ts_cp.to_string(),
i_cp
);
@@ -330,13 +332,13 @@ impl<S: BayouState> Bayou<S> {
{
debug!(
"(cp) last checkpoint is too recent: {}, not checkpointing",
- last_cp.0.serialize()
+ last_cp.0.to_string()
);
return Ok(());
}
}
- debug!("(cp) saving checkpoint at {}", ts_cp.serialize());
+ debug!("(cp) saving checkpoint at {}", ts_cp.to_string());
// Calculate state at time of checkpoint
let mut last_known_state = (0, &self.checkpoint.1);
@@ -356,7 +358,7 @@ impl<S: BayouState> Bayou<S> {
let mut por = PutObjectRequest::default();
por.bucket = self.bucket.clone();
- por.key = format!("{}/checkpoint/{}", self.path, ts_cp.serialize());
+ por.key = format!("{}/checkpoint/{}", self.path, ts_cp.to_string());
por.body = Some(cryptoblob.into());
self.s3.put_object(por).await?;
@@ -375,7 +377,7 @@ impl<S: BayouState> Bayou<S> {
}
// Delete corresponding range of operations
- let ts_ser = existing_checkpoints[last_to_keep].0.serialize();
+ let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
self.k2v
.delete_batch(&[BatchDeleteOp {
partition_key: &self.path,
@@ -414,7 +416,7 @@ impl<S: BayouState> Bayou<S> {
for object in checkpoints_res.contents.unwrap_or_default() {
if let Some(key) = object.key {
if let Some(ckid) = key.strip_prefix(&prefix) {
- if let Some(ts) = Timestamp::parse(ckid) {
+ if let Ok(ts) = ckid.parse::<Timestamp>() {
checkpoints.push((ts, key));
}
}
@@ -451,20 +453,26 @@ impl Timestamp {
pub fn zero() -> Self {
Self { msec: 0, rand: 0 }
}
+}
- pub fn serialize(&self) -> String {
+impl ToString for Timestamp {
+ fn to_string(&self) -> String {
let mut bytes = [0u8; 16];
bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
hex::encode(&bytes)
}
+}
- pub fn parse(v: &str) -> Option<Self> {
- let bytes = hex::decode(v).ok()?;
+impl FromStr for Timestamp {
+ type Err = &'static str;
+
+ fn from_str(s: &str) -> Result<Timestamp, &'static str> {
+ let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
if bytes.len() != 16 {
- return None;
+ return Err("bad length");
}
- Some(Self {
+ Ok(Self {
msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
})