From 684f4de225c44464abcb6a9cb2ef6dcae90537a8 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 16 Dec 2023 11:13:32 +0100 Subject: new new new storage interface --- src/storage/in_memory.rs | 177 ++++++++++++++++++++++++++++------------------- 1 file changed, 107 insertions(+), 70 deletions(-) (limited to 'src/storage/in_memory.rs') diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 5ba9461..09c6763 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -1,97 +1,134 @@ -use futures::FutureExt; use crate::storage::*; - -#[derive(Clone, Debug, Hash)] -pub struct FullMem {} -pub struct MemStore {} -pub struct MemRef {} -pub struct MemValue {} - -#[derive(Clone, Debug, PartialEq)] -pub struct MemOrphanRowRef {} - -impl IBuilders for FullMem { - fn row_store(&self) -> Result { - unimplemented!(); - } - - fn blob_store(&self) -> Result { - unimplemented!(); - } - - fn url(&self) -> &str { - return "mem://unimplemented;" - } +use std::collections::{HashMap, BTreeMap}; +use std::ops::Bound::{Included, Unbounded, Excluded}; +use std::sync::{Arc, RwLock}; + +/// This implementation is very inneficient, and not completely correct +/// Indeed, when the connector is dropped, the memory is freed. +/// It means that when a user disconnects, its data are lost. +/// It's intended only for basic debugging, do not use it for advanced tests... + +pub type ArcRow = Arc>>>>; +pub type ArcBlob = Arc>>>; + +#[derive(Clone, Debug)] +pub struct MemBuilder { + user: String, + url: String, + row: ArcRow, + blob: ArcBlob, } -impl IRowStore for MemStore { - fn row(&self, partition: &str, sort: &str) -> RowRef { - unimplemented!(); - } - - fn select(&self, selector: Selector) -> AsyncResult> { - unimplemented!() - } - - fn rm(&self, selector: Selector) -> AsyncResult<()> { - unimplemented!(); - } +impl IBuilder for MemBuilder { + fn build(&self) -> Box { + Box::new(MemStore { + row: self.row.clone(), + blob: self.blob.clone(), + }) + } +} - fn from_orphan(&self, orphan: OrphanRowRef) -> Result { - unimplemented!(); - } +pub struct MemStore { + row: ArcRow, + blob: ArcBlob, } -impl IRowRef for MemRef { - fn to_orphan(&self) -> OrphanRowRef { - unimplemented!() +impl MemStore { + fn inner_fetch(&self, row_ref: &RowRef) -> Result, StorageError> { + Ok(self.row + .read() + .or(Err(StorageError::Internal))? + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)? + .clone()) } +} - fn key(&self) -> (&str, &str) { +#[async_trait] +impl IStore for MemStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { + match select { + Selector::Range { shard, sort_begin, sort_end } => { + Ok(self.row + .read() + .or(Err(StorageError::Internal))? + .get(*shard) + .ok_or(StorageError::NotFound)? + .range((Included(sort_begin.to_string()), Included(sort_end.to_string()))) + .map(|(k, v)| RowVal { + row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, + value: vec![Alternative::Value(v.clone())], + }) + .collect::>()) + }, + Selector::List(rlist) => { + let mut acc = vec![]; + for row_ref in rlist { + let bytes = self.inner_fetch(row_ref)?; + let row_val = RowVal { + row_ref: row_ref.clone(), + value: vec![Alternative::Value(bytes)] + }; + acc.push(row_val); + } + Ok(acc) + }, + Selector::Prefix { shard, sort_prefix } => { + let mut sort_end = sort_prefix.to_string(); + let last_bound = match sort_end.pop() { + None => Unbounded, + Some(ch) => { + let nc = char::from_u32(ch as u32 + 1).unwrap(); + sort_end.push(nc); + Excluded(sort_end) + } + }; + + Ok(self.row + .read() + .or(Err(StorageError::Internal))? + .get(*shard) + .ok_or(StorageError::NotFound)? + .range((Included(sort_prefix.to_string()), last_bound)) + .map(|(k, v)| RowVal { + row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, + value: vec![Alternative::Value(v.clone())], + }) + .collect::>()) + }, + Selector::Single(row_ref) => { + let bytes = self.inner_fetch(row_ref)?; + Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}]) + } + } + } + + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { unimplemented!(); } - /*fn clone_boxed(&self) -> RowRef { + async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { unimplemented!(); - }*/ - fn set_value(&self, content: &[u8]) -> RowValue { - unimplemented!(); - } - fn fetch(&self) -> AsyncResult { - unimplemented!(); } - fn rm(&self) -> AsyncResult<()> { + async fn row_poll(&self, value: RowRef) -> Result { unimplemented!(); } - fn poll(&self) -> AsyncResult { - async { - let rv: RowValue = Box::new(MemValue{}); - Ok(rv) - }.boxed() - } -} -impl std::fmt::Debug for MemRef { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { unimplemented!(); - } -} -impl IRowValue for MemValue { - fn to_ref(&self) -> RowRef { - unimplemented!(); } - fn content(&self) -> ConcurrentValues { + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { unimplemented!(); + } - fn push(&self) -> AsyncResult<()> { + async fn blob_list(&self, prefix: &str) -> Result, StorageError> { unimplemented!(); } -} - -impl std::fmt::Debug for MemValue { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { unimplemented!(); } } -- cgit v1.2.3