aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/storage/garage.rs31
1 files changed, 30 insertions, 1 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 665a0c6..fa6fbc1 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -191,7 +191,36 @@ impl IStore for GarageStore {
}
}
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
- unimplemented!();
+ loop {
+ if let Some(ct) = &value.causality {
+ match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await {
+ Err(e) => {
+ tracing::error!("Unable to poll item: {}", e);
+ return Err(StorageError::Internal);
+ }
+ Ok(None) => continue,
+ Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)),
+ }
+ } else {
+ match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
+ Err(k2v_client::Error::NotFound) => {
+ self
+ .k2v
+ .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
+ .await
+ .map_err(|e| {
+ tracing::error!("Unable to insert item in polling logic: {}", e);
+ StorageError::Internal
+ })?;
+ }
+ Err(e) => {
+ tracing::error!("Unable to read item in polling logic: {}", e);
+ return Err(StorageError::Internal)
+ },
+ Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
+ }
+ }
+ }
}
async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {