aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs94
1 files changed, 13 insertions, 81 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 9f70017..3201783 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -15,9 +15,9 @@ use rusoto_s3::{
};
use crate::cryptoblob::*;
-use crate::k2v_util::k2v_wait_value_changed;
use crate::login::Credentials;
-use crate::time::now_msec;
+use crate::timestamp::*;
+use crate::storage;
const KEEP_STATE_EVERY: usize = 64;
@@ -48,12 +48,11 @@ pub trait BayouState:
}
pub struct Bayou<S: BayouState> {
- bucket: String,
path: String,
key: Key,
- k2v: K2vClient,
- s3: S3Client,
+ k2v: storage::RowStore,
+ s3: storage::BlobStore,
checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>,
@@ -67,13 +66,12 @@ pub struct Bayou<S: BayouState> {
impl<S: BayouState> Bayou<S> {
pub fn new(creds: &Credentials, path: String) -> Result<Self> {
- let k2v_client = creds.k2v_client()?;
- let s3_client = creds.s3_client()?;
+ let k2v_client = creds.row_client()?;
+ let s3_client = creds.blob_client()?;
let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
Ok(Self {
- bucket: creds.bucket().to_string(),
path,
key: creds.keys.master.clone(),
k2v: k2v_client,
@@ -103,17 +101,8 @@ impl<S: BayouState> Bayou<S> {
} else {
debug!("(sync) loading checkpoint: {}", key);
- let gor = GetObjectRequest {
- bucket: self.bucket.clone(),
- key: key.to_string(),
- ..Default::default()
- };
-
- let obj_res = self.s3.get_object(gor).await?;
-
- let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
- let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
- obj_body.into_async_read().read_to_end(&mut buf).await?;
+ let obj_res = self.s3.blob(key).fetch().await?;
+ let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?;
debug!("(sync) checkpoint body length: {}", buf.len());
@@ -145,7 +134,8 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
- let ops_map = self
+ let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?;
+ /*let ops_map = self
.k2v
.read_batch(&[BatchReadOp {
partition_key: &self.path,
@@ -164,13 +154,11 @@ impl<S: BayouState> Bayou<S> {
.into_iter()
.next()
.ok_or(anyhow!("Missing K2V result"))?
- .items;
+ .items;*/
let mut ops = vec![];
- for (tsstr, val) in ops_map {
- let ts = tsstr
- .parse::<Timestamp>()
- .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?;
+ for row_value in ops_map {
+ let ts = row_value.timestamp();
if val.value.len() != 1 {
bail!("Invalid operation, has {} values", val.value.len());
}
@@ -536,59 +524,3 @@ impl K2vWatch {
info!("bayou k2v watch bg loop exiting");
}
}
-
-// ---- TIMESTAMP CLASS ----
-
-#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
-pub struct Timestamp {
- pub msec: u64,
- pub rand: u64,
-}
-
-impl Timestamp {
- #[allow(dead_code)]
- // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future.
- pub fn now() -> Self {
- let mut rng = thread_rng();
- Self {
- msec: now_msec(),
- rand: rng.gen::<u64>(),
- }
- }
-
- pub fn after(other: &Self) -> Self {
- let mut rng = thread_rng();
- Self {
- msec: std::cmp::max(now_msec(), other.msec + 1),
- rand: rng.gen::<u64>(),
- }
- }
-
- pub fn zero() -> Self {
- Self { msec: 0, rand: 0 }
- }
-}
-
-impl ToString for Timestamp {
- fn to_string(&self) -> String {
- let mut bytes = [0u8; 16];
- bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
- bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
- hex::encode(bytes)
- }
-}
-
-impl FromStr for Timestamp {
- type Err = &'static str;
-
- fn from_str(s: &str) -> Result<Timestamp, &'static str> {
- let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
- if bytes.len() != 16 {
- return Err("bad length");
- }
- Ok(Self {
- msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
- rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
- })
- }
-}