From 3a1f68c6bf56b572c1513a8358970536d4555078 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 19 Dec 2023 21:41:35 +0100 Subject: better handle non existing keys --- src/storage/in_memory.rs | 24 ++++++++++++++++-------- src/storage/mod.rs | 21 +++++++++++++++++++++ 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 7d8d108..b1f0508 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -122,13 +122,14 @@ fn prefix_last_bound(prefix: &str) -> Bound { #[async_trait] impl IStore for MemStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); let store = self.row.read().or(Err(StorageError::Internal))?; match select { Selector::Range { shard, sort_begin, sort_end } => { Ok(store .get(*shard) - .ok_or(StorageError::NotFound)? + .unwrap_or(&BTreeMap::new()) .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::>()) @@ -136,12 +137,10 @@ impl IStore for MemStore { Selector::List(rlist) => { let mut acc = vec![]; for row_ref in rlist { - let intval = store - .get(&row_ref.uid.shard) - .ok_or(StorageError::NotFound)? - .get(&row_ref.uid.sort) - .ok_or(StorageError::NotFound)?; - acc.push(intval.to_row_val(row_ref.clone())); + let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten(); + if let Some(intval) = maybe_intval { + acc.push(intval.to_row_val(row_ref.clone())); + } } Ok(acc) }, @@ -150,7 +149,7 @@ impl IStore for MemStore { Ok(store .get(*shard) - .ok_or(StorageError::NotFound)? + .unwrap_or(&BTreeMap::new()) .range((Included(sort_prefix.to_string()), last_bound)) .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::>()) @@ -167,6 +166,7 @@ impl IStore for MemStore { } async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + tracing::trace!(entry=%entry, command="row_rm_single"); let mut store = self.row.write().or(Err(StorageError::Internal))?; let shard = &entry.uid.shard; let sort = &entry.uid.sort; @@ -190,6 +190,7 @@ impl IStore for MemStore { } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); //@FIXME not efficient at all... let values = self.row_fetch(select).await?; @@ -200,6 +201,7 @@ impl IStore for MemStore { } async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::>().join(","), command="row_insert"); let mut store = self.row.write().or(Err(StorageError::Internal))?; for v in values.into_iter() { let shard = v.row_ref.uid.shard; @@ -228,6 +230,7 @@ impl IStore for MemStore { Ok(()) } async fn row_poll(&self, value: &RowRef) -> Result { + tracing::trace!(entry=%value, command="row_poll"); let shard = &value.uid.shard; let sort = &value.uid.sort; let cauz = match value.causality.as_ref().map(|v| v.parse::()) { @@ -253,10 +256,12 @@ impl IStore for MemStore { } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); let store = self.blob.read().or(Err(StorageError::Internal))?; store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) } async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let mut store = self.blob.write().or(Err(StorageError::Internal))?; let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); entry.data = blob_val.value.clone(); @@ -264,18 +269,21 @@ impl IStore for MemStore { Ok(()) } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); let mut store = self.blob.write().or(Err(StorageError::Internal))?; let blob_src = store.entry(src.0.clone()).or_default().clone(); store.insert(dst.0.clone(), blob_src); Ok(()) } async fn blob_list(&self, prefix: &str) -> Result, StorageError> { + tracing::trace!(prefix=prefix, command="blob_list"); let store = self.blob.read().or(Err(StorageError::Internal))?; let last_bound = prefix_last_bound(prefix); let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::>(); Ok(blist) } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); let mut store = self.blob.write().or(Err(StorageError::Internal))?; store.remove(&blob_ref.0); Ok(()) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a21e07d..0fedfab 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -50,6 +50,11 @@ pub struct RowRef { pub uid: RowUid, pub causality: Option, } +impl std::fmt::Display for RowRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "RowRef({}, {}, {:?})", self.uid.shard, self.uid.sort, self.causality) + } +} impl RowRef { pub fn new(shard: &str, sort: &str) -> Self { @@ -90,6 +95,11 @@ impl BlobRef { Self(key.to_string()) } } +impl std::fmt::Display for BlobRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BlobRef({})", self.0) + } +} #[derive(Debug, Clone)] pub struct BlobVal { @@ -111,12 +121,23 @@ impl BlobVal { } } +#[derive(Debug)] pub enum Selector<'a> { Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, List (Vec), // list of (shard_key, sort_key) Prefix { shard: &'a str, sort_prefix: &'a str }, Single(&'a RowRef), } +impl<'a> std::fmt::Display for Selector<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Range { shard, sort_begin, sort_end } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), + Self::List(list) => write!(f, "List({:?})", list), + Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), + Self::Single(row_ref) => write!(f, "Single({})", row_ref), + } + } +} #[async_trait] pub trait IStore { -- cgit v1.2.3