aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs281
1 files changed, 90 insertions, 191 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 9f70017..7253a30 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -1,4 +1,3 @@
-use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
@@ -6,18 +5,12 @@ use anyhow::{anyhow, bail, Result};
use log::{debug, error, info};
use rand::prelude::*;
use serde::{Deserialize, Serialize};
-use tokio::io::AsyncReadExt;
use tokio::sync::{watch, Notify};
-use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue};
-use rusoto_s3::{
- DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
-};
-
use crate::cryptoblob::*;
-use crate::k2v_util::k2v_wait_value_changed;
use crate::login::Credentials;
-use crate::time::now_msec;
+use crate::storage;
+use crate::timestamp::*;
const KEEP_STATE_EVERY: usize = 64;
@@ -48,12 +41,10 @@ pub trait BayouState:
}
pub struct Bayou<S: BayouState> {
- bucket: String,
path: String,
key: Key,
- k2v: K2vClient,
- s3: S3Client,
+ storage: storage::Store,
checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>,
@@ -62,28 +53,27 @@ pub struct Bayou<S: BayouState> {
last_try_checkpoint: Option<Instant>,
watch: Arc<K2vWatch>,
- last_sync_watch_ct: Option<CausalityToken>,
+ last_sync_watch_ct: storage::RowRef,
}
impl<S: BayouState> Bayou<S> {
- pub fn new(creds: &Credentials, path: String) -> Result<Self> {
- let k2v_client = creds.k2v_client()?;
- let s3_client = creds.s3_client()?;
+ pub async fn new(creds: &Credentials, path: String) -> Result<Self> {
+ let storage = creds.storage.build().await?;
- let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
+ //let target = k2v_client.row(&path, WATCH_SK);
+ let target = storage::RowRef::new(&path, WATCH_SK);
+ let watch = K2vWatch::new(creds, target.clone()).await?;
Ok(Self {
- bucket: creds.bucket().to_string(),
path,
+ storage,
key: creds.keys.master.clone(),
- k2v: k2v_client,
- s3: s3_client,
checkpoint: (Timestamp::zero(), S::default()),
history: vec![],
last_sync: None,
last_try_checkpoint: None,
watch,
- last_sync_watch_ct: None,
+ last_sync_watch_ct: target,
})
}
@@ -103,18 +93,11 @@ impl<S: BayouState> Bayou<S> {
} else {
debug!("(sync) loading checkpoint: {}", key);
- let gor = GetObjectRequest {
- bucket: self.bucket.clone(),
- key: key.to_string(),
- ..Default::default()
- };
-
- let obj_res = self.s3.get_object(gor).await?;
-
- let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
- let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
- obj_body.into_async_read().read_to_end(&mut buf).await?;
-
+ let buf = self
+ .storage
+ .blob_fetch(&storage::BlobRef(key.to_string()))
+ .await?
+ .value;
debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?;
@@ -146,42 +129,34 @@ impl<S: BayouState> Bayou<S> {
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self
- .k2v
- .read_batch(&[BatchReadOp {
- partition_key: &self.path,
- filter: Filter {
- start: Some(&ts_ser),
- end: Some(WATCH_SK),
- prefix: None,
- limit: None,
- reverse: false,
- },
- single_item: false,
- conflicts_only: false,
- tombstones: false,
- }])
- .await?
- .into_iter()
- .next()
- .ok_or(anyhow!("Missing K2V result"))?
- .items;
+ .storage
+ .row_fetch(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: &ts_ser,
+ sort_end: WATCH_SK,
+ })
+ .await?;
let mut ops = vec![];
- for (tsstr, val) in ops_map {
- let ts = tsstr
+ for row_value in ops_map {
+ let row = row_value.row_ref;
+ let sort_key = row.uid.sort;
+ let ts = sort_key
.parse::<Timestamp>()
- .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?;
- if val.value.len() != 1 {
- bail!("Invalid operation, has {} values", val.value.len());
+ .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
+
+ let val = row_value.value;
+ if val.len() != 1 {
+ bail!("Invalid operation, has {} values", val.len());
}
- match &val.value[0] {
- K2vValue::Value(v) => {
+ match &val[0] {
+ storage::Alternative::Value(v) => {
let op = open_deserialize::<S::Op>(v, &self.key)?;
- debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op);
+ debug!("(sync) operation {}: {:?}", sort_key, op);
ops.push((ts, op));
}
- K2vValue::Tombstone => {
- unreachable!();
+ storage::Alternative::Tombstone => {
+ continue;
}
}
}
@@ -276,15 +251,12 @@ impl<S: BayouState> Bayou<S> {
.map(|(ts, _, _)| ts)
.unwrap_or(&self.checkpoint.0),
);
- self.k2v
- .insert_item(
- &self.path,
- &ts.to_string(),
- seal_serialize(&op, &self.key)?,
- None,
- )
- .await?;
+ let row_val = storage::RowVal::new(
+ storage::RowRef::new(&self.path, &ts.to_string()),
+ seal_serialize(&op, &self.key)?,
+ );
+ self.storage.row_insert(vec![row_val]).await?;
self.watch.notify.notify_one();
let new_state = self.state().apply(&op);
@@ -384,13 +356,11 @@ impl<S: BayouState> Bayou<S> {
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
debug!("(cp) checkpoint body length: {}", cryptoblob.len());
- let por = PutObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/checkpoint/{}", self.path, ts_cp.to_string()),
- body: Some(cryptoblob.into()),
- ..Default::default()
- };
- self.s3.put_object(por).await?;
+ let blob_val = storage::BlobVal::new(
+ storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
+ cryptoblob.into(),
+ );
+ self.storage.blob_insert(blob_val).await?;
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
let ecp_len = existing_checkpoints.len();
@@ -400,25 +370,20 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key);
- let dor = DeleteObjectRequest {
- bucket: self.bucket.clone(),
- key: key.to_string(),
- ..Default::default()
- };
- self.s3.delete_object(dor).await?;
+ self.storage
+ .blob_rm(&storage::BlobRef(key.to_string()))
+ .await?;
}
// Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
- self.k2v
- .delete_batch(&[BatchDeleteOp {
- partition_key: &self.path,
- prefix: None,
- start: None,
- end: Some(&ts_ser),
- single_item: false,
- }])
- .await?;
+ self.storage
+ .row_rm(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: "",
+ sort_end: &ts_ser,
+ })
+ .await?
}
Ok(())
@@ -437,22 +402,14 @@ impl<S: BayouState> Bayou<S> {
async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> {
let prefix = format!("{}/checkpoint/", self.path);
- let lor = ListObjectsV2Request {
- bucket: self.bucket.clone(),
- max_keys: Some(1000),
- prefix: Some(prefix.clone()),
- ..Default::default()
- };
-
- let checkpoints_res = self.s3.list_objects_v2(lor).await?;
+ let checkpoints_res = self.storage.blob_list(&prefix).await?;
let mut checkpoints = vec![];
- for object in checkpoints_res.contents.unwrap_or_default() {
- if let Some(key) = object.key {
- if let Some(ckid) = key.strip_prefix(&prefix) {
- if let Ok(ts) = ckid.parse::<Timestamp>() {
- checkpoints.push((ts, key));
- }
+ for object in checkpoints_res {
+ let key = object.0;
+ if let Some(ckid) = key.strip_prefix(&prefix) {
+ if let Ok(ts) = ckid.parse::<Timestamp>() {
+ checkpoints.push((ts, key.into()));
}
}
}
@@ -464,68 +421,66 @@ impl<S: BayouState> Bayou<S> {
// ---- Bayou watch in K2V ----
struct K2vWatch {
- pk: String,
- sk: String,
- rx: watch::Receiver<Option<CausalityToken>>,
+ target: storage::RowRef,
+ rx: watch::Receiver<storage::RowRef>,
notify: Notify,
}
impl K2vWatch {
/// Creates a new watch and launches subordinate threads.
/// These threads hold Weak pointers to the struct;
- /// the exit when the Arc is dropped.
- fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
- let (tx, rx) = watch::channel::<Option<CausalityToken>>(None);
+ /// they exit when the Arc is dropped.
+ async fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> {
+ let storage = creds.storage.build().await?;
+
+ let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
let notify = Notify::new();
- let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
+ let watch = Arc::new(K2vWatch { target, rx, notify });
- tokio::spawn(Self::background_task(
- Arc::downgrade(&watch),
- creds.k2v_client()?,
- tx,
- ));
+ tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
Ok(watch)
}
async fn background_task(
self_weak: Weak<Self>,
- k2v: K2vClient,
- tx: watch::Sender<Option<CausalityToken>>,
+ storage: storage::Store,
+ tx: watch::Sender<storage::RowRef>,
) {
- let mut ct = None;
+ let mut row = match Weak::upgrade(&self_weak) {
+ Some(this) => this.target.clone(),
+ None => {
+ error!("can't start loop");
+ return;
+ }
+ };
+
while let Some(this) = Weak::upgrade(&self_weak) {
debug!(
- "bayou k2v watch bg loop iter ({}, {}): ct = {:?}",
- this.pk, this.sk, ct
+ "bayou k2v watch bg loop iter ({}, {})",
+ this.target.uid.shard, this.target.uid.sort
);
tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
- update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
+ update = storage.row_poll(&row) => {
match update {
Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
- Ok(cv) => {
- if tx.send(Some(cv.causality.clone())).is_err() {
+ Ok(new_value) => {
+ row = new_value.row_ref;
+ if tx.send(row.clone()).is_err() {
break;
}
- ct = Some(cv.causality);
}
}
}
_ = this.notify.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
- if let Err(e) = k2v
- .insert_item(
- &this.pk,
- &this.sk,
- rand,
- ct.clone(),
- )
- .await
+ let row_val = storage::RowVal::new(row.clone(), rand);
+ if let Err(e) = storage.row_insert(vec![row_val]).await
{
error!("Error in bayou k2v watch updater loop: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
@@ -536,59 +491,3 @@ impl K2vWatch {
info!("bayou k2v watch bg loop exiting");
}
}
-
-// ---- TIMESTAMP CLASS ----
-
-#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
-pub struct Timestamp {
- pub msec: u64,
- pub rand: u64,
-}
-
-impl Timestamp {
- #[allow(dead_code)]
- // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future.
- pub fn now() -> Self {
- let mut rng = thread_rng();
- Self {
- msec: now_msec(),
- rand: rng.gen::<u64>(),
- }
- }
-
- pub fn after(other: &Self) -> Self {
- let mut rng = thread_rng();
- Self {
- msec: std::cmp::max(now_msec(), other.msec + 1),
- rand: rng.gen::<u64>(),
- }
- }
-
- pub fn zero() -> Self {
- Self { msec: 0, rand: 0 }
- }
-}
-
-impl ToString for Timestamp {
- fn to_string(&self) -> String {
- let mut bytes = [0u8; 16];
- bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
- bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
- hex::encode(bytes)
- }
-}
-
-impl FromStr for Timestamp {
- type Err = &'static str;
-
- fn from_str(s: &str) -> Result<Timestamp, &'static str> {
- let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
- if bytes.len() != 16 {
- return Err("bad length");
- }
- Ok(Self {
- msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
- rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
- })
- }
-}