diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-26 20:02:13 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-26 20:02:13 +0100 |
commit | 477a784e45d07d414fea77cf5b49ee241dc01f65 (patch) | |
tree | 24ae8419e8a35f30ab6f47aadc0ab02a25912b82 /src/storage/garage.rs | |
parent | 18bba784eee2331e7f27fe31b89e7c674e24ded0 (diff) | |
download | aerogramme-477a784e45d07d414fea77cf5b49ee241dc01f65.tar.gz aerogramme-477a784e45d07d414fea77cf5b49ee241dc01f65.zip |
implement poll
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r-- | src/storage/garage.rs | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 665a0c6..fa6fbc1 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -191,7 +191,36 @@ impl IStore for GarageStore { } } async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { - unimplemented!(); + loop { + if let Some(ct) = &value.causality { + match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await { + Err(e) => { + tracing::error!("Unable to poll item: {}", e); + return Err(StorageError::Internal); + } + Ok(None) => continue, + Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)), + } + } else { + match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { + Err(k2v_client::Error::NotFound) => { + self + .k2v + .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) + .await + .map_err(|e| { + tracing::error!("Unable to insert item in polling logic: {}", e); + StorageError::Internal + })?; + } + Err(e) => { + tracing::error!("Unable to read item in polling logic: {}", e); + return Err(StorageError::Internal) + }, + Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), + } + } + } } async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { |