diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-11-15 15:56:43 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-11-15 15:56:43 +0100 |
commit | 916b27d87ec7f5bff41f9dd888914d50ae067fc0 (patch) | |
tree | 68b94498bf0fed5b7abf6d4b7e4d1f2ddbbb80f4 /src/bayou.rs | |
parent | 652da6efd35f198289ba3de26b60eb2e228de73a (diff) | |
download | aerogramme-916b27d87ec7f5bff41f9dd888914d50ae067fc0.tar.gz aerogramme-916b27d87ec7f5bff41f9dd888914d50ae067fc0.zip |
WIP refactor storage (new timestamp.rs file)
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 94 |
1 files changed, 13 insertions, 81 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 9f70017..3201783 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -15,9 +15,9 @@ use rusoto_s3::{ }; use crate::cryptoblob::*; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; -use crate::time::now_msec; +use crate::timestamp::*; +use crate::storage; const KEEP_STATE_EVERY: usize = 64; @@ -48,12 +48,11 @@ pub trait BayouState: } pub struct Bayou<S: BayouState> { - bucket: String, path: String, key: Key, - k2v: K2vClient, - s3: S3Client, + k2v: storage::RowStore, + s3: storage::BlobStore, checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option<S>)>, @@ -67,13 +66,12 @@ pub struct Bayou<S: BayouState> { impl<S: BayouState> Bayou<S> { pub fn new(creds: &Credentials, path: String) -> Result<Self> { - let k2v_client = creds.k2v_client()?; - let s3_client = creds.s3_client()?; + let k2v_client = creds.row_client()?; + let s3_client = creds.blob_client()?; let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; Ok(Self { - bucket: creds.bucket().to_string(), path, key: creds.keys.master.clone(), k2v: k2v_client, @@ -103,17 +101,8 @@ impl<S: BayouState> Bayou<S> { } else { debug!("(sync) loading checkpoint: {}", key); - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; + let obj_res = self.s3.blob(key).fetch().await?; + let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?; debug!("(sync) checkpoint body length: {}", buf.len()); @@ -145,7 +134,8 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self + let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?; + /*let ops_map = self .k2v .read_batch(&[BatchReadOp { partition_key: &self.path, @@ -164,13 +154,11 @@ impl<S: BayouState> Bayou<S> { .into_iter() .next() .ok_or(anyhow!("Missing K2V result"))? - .items; + .items;*/ let mut ops = vec![]; - for (tsstr, val) in ops_map { - let ts = tsstr - .parse::<Timestamp>() - .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?; + for row_value in ops_map { + let ts = row_value.timestamp(); if val.value.len() != 1 { bail!("Invalid operation, has {} values", val.value.len()); } @@ -536,59 +524,3 @@ impl K2vWatch { info!("bayou k2v watch bg loop exiting"); } } - -// ---- TIMESTAMP CLASS ---- - -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] -pub struct Timestamp { - pub msec: u64, - pub rand: u64, -} - -impl Timestamp { - #[allow(dead_code)] - // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future. - pub fn now() -> Self { - let mut rng = thread_rng(); - Self { - msec: now_msec(), - rand: rng.gen::<u64>(), - } - } - - pub fn after(other: &Self) -> Self { - let mut rng = thread_rng(); - Self { - msec: std::cmp::max(now_msec(), other.msec + 1), - rand: rng.gen::<u64>(), - } - } - - pub fn zero() -> Self { - Self { msec: 0, rand: 0 } - } -} - -impl ToString for Timestamp { - fn to_string(&self) -> String { - let mut bytes = [0u8; 16]; - bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); - bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); - hex::encode(bytes) - } -} - -impl FromStr for Timestamp { - type Err = &'static str; - - fn from_str(s: &str) -> Result<Timestamp, &'static str> { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - if bytes.len() != 16 { - return Err("bad length"); - } - Ok(Self { - msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), - rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), - }) - } -} |