diff options
Diffstat (limited to 'src/storage/in_memory.rs')
-rw-r--r-- | src/storage/in_memory.rs | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index ee7c9a6..3c3a94c 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -1,6 +1,6 @@ use crate::storage::*; -use std::collections::{HashMap, BTreeMap}; -use std::ops::Bound::{Included, Unbounded, Excluded, self}; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::sync::{Arc, RwLock}; use tokio::sync::Notify; @@ -16,7 +16,7 @@ impl MemDb { Self(tokio::sync::Mutex::new(HashMap::new())) } - pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { + pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { let mut global_storage = self.0.lock().await; global_storage .entry(username.to_string()) @@ -60,8 +60,8 @@ impl InternalRowVal { } fn to_row_val(&self, row_ref: RowRef) -> RowVal { - RowVal{ - row_ref: row_ref.with_causality(self.version.to_string()), + RowVal { + row_ref: row_ref.with_causality(self.version.to_string()), value: self.concurrent_values(), } } @@ -75,7 +75,7 @@ struct InternalBlobVal { impl InternalBlobVal { fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { BlobVal { - blob_ref: bref.clone(), + blob_ref: bref.clone(), meta: self.metadata.clone(), value: self.data.clone(), } @@ -113,7 +113,7 @@ impl IBuilder for MemBuilder { row: self.row.clone(), blob: self.blob.clone(), })) - } + } fn unique(&self) -> UnicityBuffer { UnicityBuffer(self.unicity.clone()) @@ -170,24 +170,32 @@ impl IStore for MemStore { let store = self.row.read().or(Err(StorageError::Internal))?; match select { - Selector::Range { shard, sort_begin, sort_end } => { - Ok(store - .get(*shard) - .unwrap_or(&BTreeMap::new()) - .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) - .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) - .collect::<Vec<_>>()) - }, + Selector::Range { + shard, + sort_begin, + sort_end, + } => Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range(( + Included(sort_begin.to_string()), + Excluded(sort_end.to_string()), + )) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()), Selector::List(rlist) => { let mut acc = vec![]; for row_ref in rlist { - let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten(); + let maybe_intval = store + .get(&row_ref.uid.shard) + .map(|v| v.get(&row_ref.uid.sort)) + .flatten(); if let Some(intval) = maybe_intval { acc.push(intval.to_row_val(row_ref.clone())); } } Ok(acc) - }, + } Selector::Prefix { shard, sort_prefix } => { let last_bound = prefix_last_bound(sort_prefix); @@ -197,13 +205,13 @@ impl IStore for MemStore { .range((Included(sort_prefix.to_string()), last_bound)) .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::<Vec<_>>()) - }, + } Selector::Single(row_ref) => { let intval = store - .get(&row_ref.uid.shard) - .ok_or(StorageError::NotFound)? - .get(&row_ref.uid.sort) - .ok_or(StorageError::NotFound)?; + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; Ok(vec![intval.to_row_val((*row_ref).clone())]) } } @@ -213,7 +221,12 @@ impl IStore for MemStore { tracing::trace!(select=%select, command="row_rm"); let values = match select { - Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(), + Selector::Range { .. } | Selector::Prefix { .. } => self + .row_fetch(select) + .await? + .into_iter() + .map(|rv| rv.row_ref) + .collect::<Vec<_>>(), Selector::List(rlist) => rlist.clone(), Selector::Single(row_ref) => vec![(*row_ref).clone()], }; @@ -282,7 +295,10 @@ impl IStore for MemStore { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { tracing::trace!(entry=%blob_ref, command="blob_fetch"); let store = self.blob.read().or(Err(StorageError::Internal))?; - store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) + store + .get(&blob_ref.0) + .ok_or(StorageError::NotFound) + .map(|v| v.to_blob_val(blob_ref)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); @@ -300,10 +316,13 @@ impl IStore for MemStore { Ok(()) } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - tracing::trace!(prefix=prefix, command="blob_list"); + tracing::trace!(prefix = prefix, command = "blob_list"); let store = self.blob.read().or(Err(StorageError::Internal))?; let last_bound = prefix_last_bound(prefix); - let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>(); + let blist = store + .range((Included(prefix.to_string()), last_bound)) + .map(|(k, _)| BlobRef(k.to_string())) + .collect::<Vec<_>>(); Ok(blist) } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { |