diff options
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 281 |
1 files changed, 90 insertions, 191 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 9f70017..7253a30 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; @@ -6,18 +5,12 @@ use anyhow::{anyhow, bail, Result}; use log::{debug, error, info}; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::{watch, Notify}; -use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; - use crate::cryptoblob::*; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; -use crate::time::now_msec; +use crate::storage; +use crate::timestamp::*; const KEEP_STATE_EVERY: usize = 64; @@ -48,12 +41,10 @@ pub trait BayouState: } pub struct Bayou<S: BayouState> { - bucket: String, path: String, key: Key, - k2v: K2vClient, - s3: S3Client, + storage: storage::Store, checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option<S>)>, @@ -62,28 +53,27 @@ pub struct Bayou<S: BayouState> { last_try_checkpoint: Option<Instant>, watch: Arc<K2vWatch>, - last_sync_watch_ct: Option<CausalityToken>, + last_sync_watch_ct: storage::RowRef, } impl<S: BayouState> Bayou<S> { - pub fn new(creds: &Credentials, path: String) -> Result<Self> { - let k2v_client = creds.k2v_client()?; - let s3_client = creds.s3_client()?; + pub async fn new(creds: &Credentials, path: String) -> Result<Self> { + let storage = creds.storage.build().await?; - let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; + //let target = k2v_client.row(&path, WATCH_SK); + let target = storage::RowRef::new(&path, WATCH_SK); + let watch = K2vWatch::new(creds, target.clone()).await?; Ok(Self { - bucket: creds.bucket().to_string(), path, + storage, key: creds.keys.master.clone(), - k2v: k2v_client, - s3: s3_client, checkpoint: (Timestamp::zero(), S::default()), history: vec![], last_sync: None, last_try_checkpoint: None, watch, - last_sync_watch_ct: None, + last_sync_watch_ct: target, }) } @@ -103,18 +93,11 @@ impl<S: BayouState> Bayou<S> { } else { debug!("(sync) loading checkpoint: {}", key); - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - + let buf = self + .storage + .blob_fetch(&storage::BlobRef(key.to_string())) + .await? + .value; debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; @@ -146,42 +129,34 @@ impl<S: BayouState> Bayou<S> { let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self - .k2v - .read_batch(&[BatchReadOp { - partition_key: &self.path, - filter: Filter { - start: Some(&ts_ser), - end: Some(WATCH_SK), - prefix: None, - limit: None, - reverse: false, - }, - single_item: false, - conflicts_only: false, - tombstones: false, - }]) - .await? - .into_iter() - .next() - .ok_or(anyhow!("Missing K2V result"))? - .items; + .storage + .row_fetch(&storage::Selector::Range { + shard: &self.path, + sort_begin: &ts_ser, + sort_end: WATCH_SK, + }) + .await?; let mut ops = vec![]; - for (tsstr, val) in ops_map { - let ts = tsstr + for row_value in ops_map { + let row = row_value.row_ref; + let sort_key = row.uid.sort; + let ts = sort_key .parse::<Timestamp>() - .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?; - if val.value.len() != 1 { - bail!("Invalid operation, has {} values", val.value.len()); + .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; + + let val = row_value.value; + if val.len() != 1 { + bail!("Invalid operation, has {} values", val.len()); } - match &val.value[0] { - K2vValue::Value(v) => { + match &val[0] { + storage::Alternative::Value(v) => { let op = open_deserialize::<S::Op>(v, &self.key)?; - debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); + debug!("(sync) operation {}: {:?}", sort_key, op); ops.push((ts, op)); } - K2vValue::Tombstone => { - unreachable!(); + storage::Alternative::Tombstone => { + continue; } } } @@ -276,15 +251,12 @@ impl<S: BayouState> Bayou<S> { .map(|(ts, _, _)| ts) .unwrap_or(&self.checkpoint.0), ); - self.k2v - .insert_item( - &self.path, - &ts.to_string(), - seal_serialize(&op, &self.key)?, - None, - ) - .await?; + let row_val = storage::RowVal::new( + storage::RowRef::new(&self.path, &ts.to_string()), + seal_serialize(&op, &self.key)?, + ); + self.storage.row_insert(vec![row_val]).await?; self.watch.notify.notify_one(); let new_state = self.state().apply(&op); @@ -384,13 +356,11 @@ impl<S: BayouState> Bayou<S> { let cryptoblob = seal_serialize(&state_cp, &self.key)?; debug!("(cp) checkpoint body length: {}", cryptoblob.len()); - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/checkpoint/{}", self.path, ts_cp.to_string()), - body: Some(cryptoblob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), + cryptoblob.into(), + ); + self.storage.blob_insert(blob_val).await?; // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) let ecp_len = existing_checkpoints.len(); @@ -400,25 +370,20 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.storage + .blob_rm(&storage::BlobRef(key.to_string())) + .await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); - self.k2v - .delete_batch(&[BatchDeleteOp { - partition_key: &self.path, - prefix: None, - start: None, - end: Some(&ts_ser), - single_item: false, - }]) - .await?; + self.storage + .row_rm(&storage::Selector::Range { + shard: &self.path, + sort_begin: "", + sort_end: &ts_ser, + }) + .await? } Ok(()) @@ -437,22 +402,14 @@ impl<S: BayouState> Bayou<S> { async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> { let prefix = format!("{}/checkpoint/", self.path); - let lor = ListObjectsV2Request { - bucket: self.bucket.clone(), - max_keys: Some(1000), - prefix: Some(prefix.clone()), - ..Default::default() - }; - - let checkpoints_res = self.s3.list_objects_v2(lor).await?; + let checkpoints_res = self.storage.blob_list(&prefix).await?; let mut checkpoints = vec![]; - for object in checkpoints_res.contents.unwrap_or_default() { - if let Some(key) = object.key { - if let Some(ckid) = key.strip_prefix(&prefix) { - if let Ok(ts) = ckid.parse::<Timestamp>() { - checkpoints.push((ts, key)); - } + for object in checkpoints_res { + let key = object.0; + if let Some(ckid) = key.strip_prefix(&prefix) { + if let Ok(ts) = ckid.parse::<Timestamp>() { + checkpoints.push((ts, key.into())); } } } @@ -464,68 +421,66 @@ impl<S: BayouState> Bayou<S> { // ---- Bayou watch in K2V ---- struct K2vWatch { - pk: String, - sk: String, - rx: watch::Receiver<Option<CausalityToken>>, + target: storage::RowRef, + rx: watch::Receiver<storage::RowRef>, notify: Notify, } impl K2vWatch { /// Creates a new watch and launches subordinate threads. /// These threads hold Weak pointers to the struct; - /// the exit when the Arc is dropped. - fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> { - let (tx, rx) = watch::channel::<Option<CausalityToken>>(None); + /// they exit when the Arc is dropped. + async fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> { + let storage = creds.storage.build().await?; + + let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); let notify = Notify::new(); - let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); + let watch = Arc::new(K2vWatch { target, rx, notify }); - tokio::spawn(Self::background_task( - Arc::downgrade(&watch), - creds.k2v_client()?, - tx, - )); + tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); Ok(watch) } async fn background_task( self_weak: Weak<Self>, - k2v: K2vClient, - tx: watch::Sender<Option<CausalityToken>>, + storage: storage::Store, + tx: watch::Sender<storage::RowRef>, ) { - let mut ct = None; + let mut row = match Weak::upgrade(&self_weak) { + Some(this) => this.target.clone(), + None => { + error!("can't start loop"); + return; + } + }; + while let Some(this) = Weak::upgrade(&self_weak) { debug!( - "bayou k2v watch bg loop iter ({}, {}): ct = {:?}", - this.pk, this.sk, ct + "bayou k2v watch bg loop iter ({}, {})", + this.target.uid.shard, this.target.uid.sort ); tokio::select!( _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + update = storage.row_poll(&row) => { match update { Err(e) => { error!("Error in bayou k2v wait value changed: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } - Ok(cv) => { - if tx.send(Some(cv.causality.clone())).is_err() { + Ok(new_value) => { + row = new_value.row_ref; + if tx.send(row.clone()).is_err() { break; } - ct = Some(cv.causality); } } } _ = this.notify.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - if let Err(e) = k2v - .insert_item( - &this.pk, - &this.sk, - rand, - ct.clone(), - ) - .await + let row_val = storage::RowVal::new(row.clone(), rand); + if let Err(e) = storage.row_insert(vec![row_val]).await { error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -536,59 +491,3 @@ impl K2vWatch { info!("bayou k2v watch bg loop exiting"); } } - -// ---- TIMESTAMP CLASS ---- - -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] -pub struct Timestamp { - pub msec: u64, - pub rand: u64, -} - -impl Timestamp { - #[allow(dead_code)] - // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future. - pub fn now() -> Self { - let mut rng = thread_rng(); - Self { - msec: now_msec(), - rand: rng.gen::<u64>(), - } - } - - pub fn after(other: &Self) -> Self { - let mut rng = thread_rng(); - Self { - msec: std::cmp::max(now_msec(), other.msec + 1), - rand: rng.gen::<u64>(), - } - } - - pub fn zero() -> Self { - Self { msec: 0, rand: 0 } - } -} - -impl ToString for Timestamp { - fn to_string(&self) -> String { - let mut bytes = [0u8; 16]; - bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); - bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); - hex::encode(bytes) - } -} - -impl FromStr for Timestamp { - type Err = &'static str; - - fn from_str(s: &str) -> Result<Timestamp, &'static str> { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - if bytes.len() != 16 { - return Err("bad length"); - } - Ok(Self { - msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), - rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), - }) - } -} |