diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-18 17:09:44 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-18 17:09:44 +0100 |
commit | 3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf (patch) | |
tree | fff5d16e266788b28e812c24669f50118831512b /src/bayou.rs | |
parent | 684f4de225c44464abcb6a9cb2ef6dcae90537a8 (diff) | |
download | aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.tar.gz aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.zip |
Storage trait new implementation
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 134 |
1 files changed, 53 insertions, 81 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 3042f94..afe3c75 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -45,8 +45,7 @@ pub struct Bayou<S: BayouState> { path: String, key: Key, - k2v: storage::RowStore, - s3: storage::BlobStore, + storage: storage::Store, checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option<S>)>, @@ -60,17 +59,16 @@ pub struct Bayou<S: BayouState> { impl<S: BayouState> Bayou<S> { pub fn new(creds: &Credentials, path: String) -> Result<Self> { - let k2v_client = creds.row_client()?; - let s3_client = creds.blob_client()?; + let storage = creds.storage.build()?; - let target = k2v_client.row(&path, WATCH_SK); - let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; + //let target = k2v_client.row(&path, WATCH_SK); + let target = storage::RowRef::new(&path, WATCH_SK); + let watch = K2vWatch::new(creds, target.clone())?; Ok(Self { path, + storage, key: creds.keys.master.clone(), - k2v: k2v_client, - s3: s3_client, checkpoint: (Timestamp::zero(), S::default()), history: vec![], last_sync: None, @@ -96,9 +94,7 @@ impl<S: BayouState> Bayou<S> { } else { debug!("(sync) loading checkpoint: {}", key); - let obj_res = self.s3.blob(key).fetch().await?; - let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?; - + let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value; debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; @@ -129,42 +125,26 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?; - /*let ops_map = self - .k2v - .read_batch(&[BatchReadOp { - partition_key: &self.path, - filter: Filter { - start: Some(&ts_ser), - end: Some(WATCH_SK), - prefix: None, - limit: None, - reverse: false, - }, - single_item: false, - conflicts_only: false, - tombstones: false, - }]) - .await? - .into_iter() - .next() - .ok_or(anyhow!("Missing K2V result"))? - .items;*/ + let ops_map = self.storage.row_fetch(&storage::Selector::Range { + shard: &self.path, + sort_begin: &ts_ser, + sort_end: WATCH_SK + }).await?; let mut ops = vec![]; for row_value in ops_map { - let row = row_value.to_ref(); - let sort_key = row.key().1; + let row = row_value.row_ref; + let sort_key = row.uid.sort; let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; - let val = row_value.content(); + let val = row_value.value; if val.len() != 1 { - bail!("Invalid operation, has {} values", row_value.content().len()); + bail!("Invalid operation, has {} values", val.len()); } match &val[0] { storage::Alternative::Value(v) => { let op = open_deserialize::<S::Op>(v, &self.key)?; - debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op); + debug!("(sync) operation {}: {:?}", sort_key, op); ops.push((ts, op)); } storage::Alternative::Tombstone => { @@ -231,7 +211,7 @@ impl<S: BayouState> Bayou<S> { // Save info that sync has been done self.last_sync = new_last_sync; - self.last_sync_watch_ct = self.k2v.from_orphan(new_last_sync_watch_ct).expect("Source & target storage must be compatible"); + self.last_sync_watch_ct = new_last_sync_watch_ct; Ok(()) } @@ -243,7 +223,7 @@ impl<S: BayouState> Bayou<S> { Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5), _ => true, }; - let changed = self.last_sync_watch_ct.to_orphan() != *self.watch.rx.borrow(); + let changed = self.last_sync_watch_ct != *self.watch.rx.borrow(); if too_old || changed { self.sync().await?; } @@ -263,12 +243,12 @@ impl<S: BayouState> Bayou<S> { .map(|(ts, _, _)| ts) .unwrap_or(&self.checkpoint.0), ); - self.k2v - .row(&self.path, &ts.to_string()) - .set_value(&seal_serialize(&op, &self.key)?) - .push() - .await?; + let row_val = storage::RowVal::new( + storage::RowRef::new(&self.path, &ts.to_string()), + seal_serialize(&op, &self.key)?, + ); + self.storage.row_insert(vec![row_val]).await?; self.watch.notify.notify_one(); let new_state = self.state().apply(&op); @@ -368,12 +348,11 @@ impl<S: BayouState> Bayou<S> { let cryptoblob = seal_serialize(&state_cp, &self.key)?; debug!("(cp) checkpoint body length: {}", cryptoblob.len()); - self.s3 - .blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str()) - .set_value(cryptoblob.into()) - .push() - .await?; - + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), + cryptoblob.into(), + ); + self.storage.blob_insert(&blob_val).await?; // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) let ecp_len = existing_checkpoints.len(); @@ -383,22 +362,16 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - self.s3 - .blob(key) - .rm() - .await?; + self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); - self.k2v - .rm(storage::Selector::Range{ - shard_key: &self.path, - begin: "", - end: &ts_ser - }) - .await?; - + self.storage.row_rm(&storage::Selector::Range { + shard: &self.path, + sort_begin: "", + sort_end: &ts_ser + }).await? } Ok(()) @@ -417,11 +390,11 @@ impl<S: BayouState> Bayou<S> { async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> { let prefix = format!("{}/checkpoint/", self.path); - let checkpoints_res = self.s3.list(&prefix).await?; + let checkpoints_res = self.storage.blob_list(&prefix).await?; let mut checkpoints = vec![]; for object in checkpoints_res { - let key = object.key(); + let key = object.0; if let Some(ckid) = key.strip_prefix(&prefix) { if let Ok(ts) = ckid.parse::<Timestamp>() { checkpoints.push((ts, key.into())); @@ -436,9 +409,8 @@ impl<S: BayouState> Bayou<S> { // ---- Bayou watch in K2V ---- struct K2vWatch { - pk: String, - sk: String, - rx: watch::Receiver<storage::OrphanRowRef>, + target: storage::RowRef, + rx: watch::Receiver<storage::RowRef>, notify: Notify, } @@ -446,17 +418,17 @@ impl K2vWatch { /// Creates a new watch and launches subordinate threads. /// These threads hold Weak pointers to the struct; /// they exit when the Arc is dropped. - fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> { - let row_client = creds.row_client()?; + fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> { + let storage = creds.storage.build()?; - let (tx, rx) = watch::channel::<storage::OrphanRowRef>(row_client.row(&pk, &sk).to_orphan()); + let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); let notify = Notify::new(); - let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); + let watch = Arc::new(K2vWatch { target, rx, notify }); tokio::spawn(Self::background_task( Arc::downgrade(&watch), - row_client, + storage, tx, )); @@ -465,11 +437,11 @@ impl K2vWatch { async fn background_task( self_weak: Weak<Self>, - k2v: storage::RowStore, - tx: watch::Sender<storage::OrphanRowRef>, + storage: storage::Store, + tx: watch::Sender<storage::RowRef>, ) { let mut row = match Weak::upgrade(&self_weak) { - Some(this) => k2v.row(&this.pk, &this.sk), + Some(this) => this.target.clone(), None => { error!("can't start loop"); return @@ -479,20 +451,19 @@ impl K2vWatch { while let Some(this) = Weak::upgrade(&self_weak) { debug!( "bayou k2v watch bg loop iter ({}, {})", - this.pk, this.sk + this.target.uid.shard, this.target.uid.sort ); tokio::select!( _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - update = row.poll() => { - //update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + update = storage.row_poll(&row) => { match update { Err(e) => { error!("Error in bayou k2v wait value changed: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } Ok(new_value) => { - row = new_value.to_ref(); - if tx.send(row.to_orphan()).is_err() { + row = new_value.row_ref; + if tx.send(row.clone()).is_err() { break; } } @@ -500,7 +471,8 @@ impl K2vWatch { } _ = this.notify.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - if let Err(e) = row.set_value(&rand).push().await + let row_val = storage::RowVal::new(row.clone(), rand); + if let Err(e) = storage.row_insert(vec![row_val]).await { error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; |