aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-11-16 18:27:24 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-11-16 18:27:24 +0100
commit6da8b815b694a37d39a2be04c8e1585aac17954a (patch)
treec9df6389e4272e91de9131a8fbf2b8fe6df3eaf0 /src/bayou.rs
parent916b27d87ec7f5bff41f9dd888914d50ae067fc0 (diff)
downloadaerogramme-6da8b815b694a37d39a2be04c8e1585aac17954a.tar.gz
aerogramme-6da8b815b694a37d39a2be04c8e1585aac17954a.zip
not very clear how we pass data across channel
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs134
1 files changed, 63 insertions, 71 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 3201783..f95bd82 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -9,16 +9,12 @@ use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio::sync::{watch, Notify};
-use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue};
-use rusoto_s3::{
- DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
-};
-
use crate::cryptoblob::*;
use crate::login::Credentials;
use crate::timestamp::*;
use crate::storage;
+
const KEEP_STATE_EVERY: usize = 64;
// Checkpointing interval constants: a checkpoint is not made earlier
@@ -61,7 +57,7 @@ pub struct Bayou<S: BayouState> {
last_try_checkpoint: Option<Instant>,
watch: Arc<K2vWatch>,
- last_sync_watch_ct: Option<CausalityToken>,
+ last_sync_watch_ct: storage::RowRef,
}
impl<S: BayouState> Bayou<S> {
@@ -69,6 +65,7 @@ impl<S: BayouState> Bayou<S> {
let k2v_client = creds.row_client()?;
let s3_client = creds.blob_client()?;
+ let target = k2v_client.row(&path, WATCH_SK);
let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
Ok(Self {
@@ -81,7 +78,7 @@ impl<S: BayouState> Bayou<S> {
last_sync: None,
last_try_checkpoint: None,
watch,
- last_sync_watch_ct: None,
+ last_sync_watch_ct: target,
})
}
@@ -134,7 +131,7 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
- let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?;
+ let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?;
/*let ops_map = self
.k2v
.read_batch(&[BatchReadOp {
@@ -158,18 +155,22 @@ impl<S: BayouState> Bayou<S> {
let mut ops = vec![];
for row_value in ops_map {
- let ts = row_value.timestamp();
- if val.value.len() != 1 {
- bail!("Invalid operation, has {} values", val.value.len());
+ let row = row_value.to_ref();
+ let sort_key = row.key().1;
+ let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
+
+ let val = row_value.content();
+ if val.len() != 1 {
+ bail!("Invalid operation, has {} values", row_value.content().len());
}
- match &val.value[0] {
- K2vValue::Value(v) => {
+ match &val[0] {
+ storage::Alternative::Value(v) => {
let op = open_deserialize::<S::Op>(v, &self.key)?;
- debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op);
+ debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op);
ops.push((ts, op));
}
- K2vValue::Tombstone => {
- unreachable!();
+ storage::Alternative::Tombstone => {
+ continue;
}
}
}
@@ -372,13 +373,12 @@ impl<S: BayouState> Bayou<S> {
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
debug!("(cp) checkpoint body length: {}", cryptoblob.len());
- let por = PutObjectRequest {
- bucket: self.bucket.clone(),
- key: format!("{}/checkpoint/{}", self.path, ts_cp.to_string()),
- body: Some(cryptoblob.into()),
- ..Default::default()
- };
- self.s3.put_object(por).await?;
+ self.s3
+ .blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str())
+ .set_value(cryptoblob.into())
+ .push()
+ .await?;
+
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
let ecp_len = existing_checkpoints.len();
@@ -388,25 +388,22 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key);
- let dor = DeleteObjectRequest {
- bucket: self.bucket.clone(),
- key: key.to_string(),
- ..Default::default()
- };
- self.s3.delete_object(dor).await?;
+ self.s3
+ .blob(key)
+ .rm()
+ .await?;
}
// Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
self.k2v
- .delete_batch(&[BatchDeleteOp {
- partition_key: &self.path,
- prefix: None,
- start: None,
- end: Some(&ts_ser),
- single_item: false,
- }])
+ .rm(storage::Selector::Range{
+ shard_key: &self.path,
+ begin: "",
+ end: &ts_ser
+ })
.await?;
+
}
Ok(())
@@ -425,22 +422,14 @@ impl<S: BayouState> Bayou<S> {
async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> {
let prefix = format!("{}/checkpoint/", self.path);
- let lor = ListObjectsV2Request {
- bucket: self.bucket.clone(),
- max_keys: Some(1000),
- prefix: Some(prefix.clone()),
- ..Default::default()
- };
-
- let checkpoints_res = self.s3.list_objects_v2(lor).await?;
+ let checkpoints_res = self.s3.list(&prefix).await?;
let mut checkpoints = vec![];
- for object in checkpoints_res.contents.unwrap_or_default() {
- if let Some(key) = object.key {
- if let Some(ckid) = key.strip_prefix(&prefix) {
- if let Ok(ts) = ckid.parse::<Timestamp>() {
- checkpoints.push((ts, key));
- }
+ for object in checkpoints_res {
+ let key = object.key();
+ if let Some(ckid) = key.strip_prefix(&prefix) {
+ if let Ok(ts) = ckid.parse::<Timestamp>() {
+ checkpoints.push((ts, key.into()));
}
}
}
@@ -454,23 +443,25 @@ impl<S: BayouState> Bayou<S> {
struct K2vWatch {
pk: String,
sk: String,
- rx: watch::Receiver<Option<CausalityToken>>,
+ rx: watch::Receiver<storage::RowRef>,
notify: Notify,
}
impl K2vWatch {
/// Creates a new watch and launches subordinate threads.
/// These threads hold Weak pointers to the struct;
- /// the exit when the Arc is dropped.
+ /// they exit when the Arc is dropped.
fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
- let (tx, rx) = watch::channel::<Option<CausalityToken>>(None);
+ let row_client = creds.row_client()?;
+
+ let (tx, rx) = watch::channel::<storage::RowRef>(row_client.row(&pk, &sk));
let notify = Notify::new();
let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
tokio::spawn(Self::background_task(
Arc::downgrade(&watch),
- creds.k2v_client()?,
+ row_client,
tx,
));
@@ -479,41 +470,42 @@ impl K2vWatch {
async fn background_task(
self_weak: Weak<Self>,
- k2v: K2vClient,
- tx: watch::Sender<Option<CausalityToken>>,
+ k2v: storage::RowStore,
+ tx: watch::Sender<storage::RowRef>,
) {
- let mut ct = None;
+ let mut row = match Weak::upgrade(&self_weak) {
+ Some(this) => k2v.row(&this.pk, &this.sk),
+ None => {
+ error!("can't start loop");
+ return
+ },
+ };
+
while let Some(this) = Weak::upgrade(&self_weak) {
debug!(
- "bayou k2v watch bg loop iter ({}, {}): ct = {:?}",
- this.pk, this.sk, ct
+ "bayou k2v watch bg loop iter ({}, {})",
+ this.pk, this.sk
);
tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
- update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
+ update = row.poll() => {
+ //update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
match update {
Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
- Ok(cv) => {
- if tx.send(Some(cv.causality.clone())).is_err() {
+ Ok(new_value) => {
+ row = new_value.to_ref();
+ if tx.send(XXX).is_err() {
break;
}
- ct = Some(cv.causality);
}
}
}
_ = this.notify.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
- if let Err(e) = k2v
- .insert_item(
- &this.pk,
- &this.sk,
- rand,
- ct.clone(),
- )
- .await
+ if let Err(e) = row.set_value(rand).push().await
{
error!("Error in bayou k2v watch updater loop: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;