aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-12-18 17:09:44 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-12-18 17:09:44 +0100
commit3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf (patch)
treefff5d16e266788b28e812c24669f50118831512b /src/bayou.rs
parent684f4de225c44464abcb6a9cb2ef6dcae90537a8 (diff)
downloadaerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.tar.gz
aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.zip
Storage trait new implementation
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs134
1 files changed, 53 insertions, 81 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 3042f94..afe3c75 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -45,8 +45,7 @@ pub struct Bayou<S: BayouState> {
path: String,
key: Key,
- k2v: storage::RowStore,
- s3: storage::BlobStore,
+ storage: storage::Store,
checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>,
@@ -60,17 +59,16 @@ pub struct Bayou<S: BayouState> {
impl<S: BayouState> Bayou<S> {
pub fn new(creds: &Credentials, path: String) -> Result<Self> {
- let k2v_client = creds.row_client()?;
- let s3_client = creds.blob_client()?;
+ let storage = creds.storage.build()?;
- let target = k2v_client.row(&path, WATCH_SK);
- let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
+ //let target = k2v_client.row(&path, WATCH_SK);
+ let target = storage::RowRef::new(&path, WATCH_SK);
+ let watch = K2vWatch::new(creds, target.clone())?;
Ok(Self {
path,
+ storage,
key: creds.keys.master.clone(),
- k2v: k2v_client,
- s3: s3_client,
checkpoint: (Timestamp::zero(), S::default()),
history: vec![],
last_sync: None,
@@ -96,9 +94,7 @@ impl<S: BayouState> Bayou<S> {
} else {
debug!("(sync) loading checkpoint: {}", key);
- let obj_res = self.s3.blob(key).fetch().await?;
- let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?;
-
+ let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value;
debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?;
@@ -129,42 +125,26 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
- let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?;
- /*let ops_map = self
- .k2v
- .read_batch(&[BatchReadOp {
- partition_key: &self.path,
- filter: Filter {
- start: Some(&ts_ser),
- end: Some(WATCH_SK),
- prefix: None,
- limit: None,
- reverse: false,
- },
- single_item: false,
- conflicts_only: false,
- tombstones: false,
- }])
- .await?
- .into_iter()
- .next()
- .ok_or(anyhow!("Missing K2V result"))?
- .items;*/
+ let ops_map = self.storage.row_fetch(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: &ts_ser,
+ sort_end: WATCH_SK
+ }).await?;
let mut ops = vec![];
for row_value in ops_map {
- let row = row_value.to_ref();
- let sort_key = row.key().1;
+ let row = row_value.row_ref;
+ let sort_key = row.uid.sort;
let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
- let val = row_value.content();
+ let val = row_value.value;
if val.len() != 1 {
- bail!("Invalid operation, has {} values", row_value.content().len());
+ bail!("Invalid operation, has {} values", val.len());
}
match &val[0] {
storage::Alternative::Value(v) => {
let op = open_deserialize::<S::Op>(v, &self.key)?;
- debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op);
+ debug!("(sync) operation {}: {:?}", sort_key, op);
ops.push((ts, op));
}
storage::Alternative::Tombstone => {
@@ -231,7 +211,7 @@ impl<S: BayouState> Bayou<S> {
// Save info that sync has been done
self.last_sync = new_last_sync;
- self.last_sync_watch_ct = self.k2v.from_orphan(new_last_sync_watch_ct).expect("Source & target storage must be compatible");
+ self.last_sync_watch_ct = new_last_sync_watch_ct;
Ok(())
}
@@ -243,7 +223,7 @@ impl<S: BayouState> Bayou<S> {
Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
_ => true,
};
- let changed = self.last_sync_watch_ct.to_orphan() != *self.watch.rx.borrow();
+ let changed = self.last_sync_watch_ct != *self.watch.rx.borrow();
if too_old || changed {
self.sync().await?;
}
@@ -263,12 +243,12 @@ impl<S: BayouState> Bayou<S> {
.map(|(ts, _, _)| ts)
.unwrap_or(&self.checkpoint.0),
);
- self.k2v
- .row(&self.path, &ts.to_string())
- .set_value(&seal_serialize(&op, &self.key)?)
- .push()
- .await?;
+ let row_val = storage::RowVal::new(
+ storage::RowRef::new(&self.path, &ts.to_string()),
+ seal_serialize(&op, &self.key)?,
+ );
+ self.storage.row_insert(vec![row_val]).await?;
self.watch.notify.notify_one();
let new_state = self.state().apply(&op);
@@ -368,12 +348,11 @@ impl<S: BayouState> Bayou<S> {
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
debug!("(cp) checkpoint body length: {}", cryptoblob.len());
- self.s3
- .blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str())
- .set_value(cryptoblob.into())
- .push()
- .await?;
-
+ let blob_val = storage::BlobVal::new(
+ storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
+ cryptoblob.into(),
+ );
+ self.storage.blob_insert(&blob_val).await?;
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
let ecp_len = existing_checkpoints.len();
@@ -383,22 +362,16 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key);
- self.s3
- .blob(key)
- .rm()
- .await?;
+ self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?;
}
// Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
- self.k2v
- .rm(storage::Selector::Range{
- shard_key: &self.path,
- begin: "",
- end: &ts_ser
- })
- .await?;
-
+ self.storage.row_rm(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: "",
+ sort_end: &ts_ser
+ }).await?
}
Ok(())
@@ -417,11 +390,11 @@ impl<S: BayouState> Bayou<S> {
async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> {
let prefix = format!("{}/checkpoint/", self.path);
- let checkpoints_res = self.s3.list(&prefix).await?;
+ let checkpoints_res = self.storage.blob_list(&prefix).await?;
let mut checkpoints = vec![];
for object in checkpoints_res {
- let key = object.key();
+ let key = object.0;
if let Some(ckid) = key.strip_prefix(&prefix) {
if let Ok(ts) = ckid.parse::<Timestamp>() {
checkpoints.push((ts, key.into()));
@@ -436,9 +409,8 @@ impl<S: BayouState> Bayou<S> {
// ---- Bayou watch in K2V ----
struct K2vWatch {
- pk: String,
- sk: String,
- rx: watch::Receiver<storage::OrphanRowRef>,
+ target: storage::RowRef,
+ rx: watch::Receiver<storage::RowRef>,
notify: Notify,
}
@@ -446,17 +418,17 @@ impl K2vWatch {
/// Creates a new watch and launches subordinate threads.
/// These threads hold Weak pointers to the struct;
/// they exit when the Arc is dropped.
- fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
- let row_client = creds.row_client()?;
+ fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> {
+ let storage = creds.storage.build()?;
- let (tx, rx) = watch::channel::<storage::OrphanRowRef>(row_client.row(&pk, &sk).to_orphan());
+ let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
let notify = Notify::new();
- let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
+ let watch = Arc::new(K2vWatch { target, rx, notify });
tokio::spawn(Self::background_task(
Arc::downgrade(&watch),
- row_client,
+ storage,
tx,
));
@@ -465,11 +437,11 @@ impl K2vWatch {
async fn background_task(
self_weak: Weak<Self>,
- k2v: storage::RowStore,
- tx: watch::Sender<storage::OrphanRowRef>,
+ storage: storage::Store,
+ tx: watch::Sender<storage::RowRef>,
) {
let mut row = match Weak::upgrade(&self_weak) {
- Some(this) => k2v.row(&this.pk, &this.sk),
+ Some(this) => this.target.clone(),
None => {
error!("can't start loop");
return
@@ -479,20 +451,19 @@ impl K2vWatch {
while let Some(this) = Weak::upgrade(&self_weak) {
debug!(
"bayou k2v watch bg loop iter ({}, {})",
- this.pk, this.sk
+ this.target.uid.shard, this.target.uid.sort
);
tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
- update = row.poll() => {
- //update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
+ update = storage.row_poll(&row) => {
match update {
Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(new_value) => {
- row = new_value.to_ref();
- if tx.send(row.to_orphan()).is_err() {
+ row = new_value.row_ref;
+ if tx.send(row.clone()).is_err() {
break;
}
}
@@ -500,7 +471,8 @@ impl K2vWatch {
}
_ = this.notify.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
- if let Err(e) = row.set_value(&rand).push().await
+ let row_val = storage::RowVal::new(row.clone(), rand);
+ if let Err(e) = storage.row_insert(vec![row_val]).await
{
error!("Error in bayou k2v watch updater loop: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;