aboutsummaryrefslogtreecommitdiff
path: root/src/storage/in_memory.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage/in_memory.rs')
-rw-r--r--src/storage/in_memory.rs71
1 files changed, 45 insertions, 26 deletions
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index ee7c9a6..3c3a94c 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -1,6 +1,6 @@
use crate::storage::*;
-use std::collections::{HashMap, BTreeMap};
-use std::ops::Bound::{Included, Unbounded, Excluded, self};
+use std::collections::{BTreeMap, HashMap};
+use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::sync::{Arc, RwLock};
use tokio::sync::Notify;
@@ -16,7 +16,7 @@ impl MemDb {
Self(tokio::sync::Mutex::new(HashMap::new()))
}
- pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
+ pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
let mut global_storage = self.0.lock().await;
global_storage
.entry(username.to_string())
@@ -60,8 +60,8 @@ impl InternalRowVal {
}
fn to_row_val(&self, row_ref: RowRef) -> RowVal {
- RowVal{
- row_ref: row_ref.with_causality(self.version.to_string()),
+ RowVal {
+ row_ref: row_ref.with_causality(self.version.to_string()),
value: self.concurrent_values(),
}
}
@@ -75,7 +75,7 @@ struct InternalBlobVal {
impl InternalBlobVal {
fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
BlobVal {
- blob_ref: bref.clone(),
+ blob_ref: bref.clone(),
meta: self.metadata.clone(),
value: self.data.clone(),
}
@@ -113,7 +113,7 @@ impl IBuilder for MemBuilder {
row: self.row.clone(),
blob: self.blob.clone(),
}))
- }
+ }
fn unique(&self) -> UnicityBuffer {
UnicityBuffer(self.unicity.clone())
@@ -170,24 +170,32 @@ impl IStore for MemStore {
let store = self.row.read().or(Err(StorageError::Internal))?;
match select {
- Selector::Range { shard, sort_begin, sort_end } => {
- Ok(store
- .get(*shard)
- .unwrap_or(&BTreeMap::new())
- .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string())))
- .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
- .collect::<Vec<_>>())
- },
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => Ok(store
+ .get(*shard)
+ .unwrap_or(&BTreeMap::new())
+ .range((
+ Included(sort_begin.to_string()),
+ Excluded(sort_end.to_string()),
+ ))
+ .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
+ .collect::<Vec<_>>()),
Selector::List(rlist) => {
let mut acc = vec![];
for row_ref in rlist {
- let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten();
+ let maybe_intval = store
+ .get(&row_ref.uid.shard)
+ .map(|v| v.get(&row_ref.uid.sort))
+ .flatten();
if let Some(intval) = maybe_intval {
acc.push(intval.to_row_val(row_ref.clone()));
}
}
Ok(acc)
- },
+ }
Selector::Prefix { shard, sort_prefix } => {
let last_bound = prefix_last_bound(sort_prefix);
@@ -197,13 +205,13 @@ impl IStore for MemStore {
.range((Included(sort_prefix.to_string()), last_bound))
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
.collect::<Vec<_>>())
- },
+ }
Selector::Single(row_ref) => {
let intval = store
- .get(&row_ref.uid.shard)
- .ok_or(StorageError::NotFound)?
- .get(&row_ref.uid.sort)
- .ok_or(StorageError::NotFound)?;
+ .get(&row_ref.uid.shard)
+ .ok_or(StorageError::NotFound)?
+ .get(&row_ref.uid.sort)
+ .ok_or(StorageError::NotFound)?;
Ok(vec![intval.to_row_val((*row_ref).clone())])
}
}
@@ -213,7 +221,12 @@ impl IStore for MemStore {
tracing::trace!(select=%select, command="row_rm");
let values = match select {
- Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(),
+ Selector::Range { .. } | Selector::Prefix { .. } => self
+ .row_fetch(select)
+ .await?
+ .into_iter()
+ .map(|rv| rv.row_ref)
+ .collect::<Vec<_>>(),
Selector::List(rlist) => rlist.clone(),
Selector::Single(row_ref) => vec![(*row_ref).clone()],
};
@@ -282,7 +295,10 @@ impl IStore for MemStore {
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
tracing::trace!(entry=%blob_ref, command="blob_fetch");
let store = self.blob.read().or(Err(StorageError::Internal))?;
- store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
+ store
+ .get(&blob_ref.0)
+ .ok_or(StorageError::NotFound)
+ .map(|v| v.to_blob_val(blob_ref))
}
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
@@ -300,10 +316,13 @@ impl IStore for MemStore {
Ok(())
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- tracing::trace!(prefix=prefix, command="blob_list");
+ tracing::trace!(prefix = prefix, command = "blob_list");
let store = self.blob.read().or(Err(StorageError::Internal))?;
let last_bound = prefix_last_bound(prefix);
- let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>();
+ let blist = store
+ .range((Included(prefix.to_string()), last_bound))
+ .map(|(k, _)| BlobRef(k.to_string()))
+ .collect::<Vec<_>>();
Ok(blist)
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {