diff options
Diffstat (limited to 'aero-user/src/storage/in_memory.rs')
-rw-r--r-- | aero-user/src/storage/in_memory.rs | 334 |
1 files changed, 334 insertions, 0 deletions
diff --git a/aero-user/src/storage/in_memory.rs b/aero-user/src/storage/in_memory.rs new file mode 100644 index 0000000..a676797 --- /dev/null +++ b/aero-user/src/storage/in_memory.rs @@ -0,0 +1,334 @@ +use crate::storage::*; +use std::collections::BTreeMap; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; +use std::sync::RwLock; +use tokio::sync::Notify; + +/// This implementation is very inneficient, and not completely correct +/// Indeed, when the connector is dropped, the memory is freed. +/// It means that when a user disconnects, its data are lost. +/// It's intended only for basic debugging, do not use it for advanced tests... + +#[derive(Debug, Default)] +pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>); +impl MemDb { + pub fn new() -> Self { + Self(tokio::sync::Mutex::new(HashMap::new())) + } + + pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { + let mut global_storage = self.0.lock().await; + global_storage + .entry(username.to_string()) + .or_insert(MemBuilder::new(username)) + .clone() + } +} + +#[derive(Debug, Clone)] +enum InternalData { + Tombstone, + Value(Vec<u8>), +} +impl InternalData { + fn to_alternative(&self) -> Alternative { + match self { + Self::Tombstone => Alternative::Tombstone, + Self::Value(x) => Alternative::Value(x.clone()), + } + } +} + +#[derive(Debug)] +struct InternalRowVal { + data: Vec<InternalData>, + version: u64, + change: Arc<Notify>, +} +impl std::default::Default for InternalRowVal { + fn default() -> Self { + Self { + data: vec![], + version: 1, + change: Arc::new(Notify::new()), + } + } +} +impl InternalRowVal { + fn concurrent_values(&self) -> Vec<Alternative> { + self.data.iter().map(InternalData::to_alternative).collect() + } + + fn to_row_val(&self, row_ref: RowRef) -> RowVal { + RowVal { + row_ref: row_ref.with_causality(self.version.to_string()), + value: self.concurrent_values(), + } + } +} + +#[derive(Debug, Default, Clone)] +struct InternalBlobVal { + data: Vec<u8>, + metadata: HashMap<String, String>, +} +impl InternalBlobVal { + fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { + BlobVal { + blob_ref: bref.clone(), + meta: self.metadata.clone(), + value: self.data.clone(), + } + } +} + +type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>; +type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>; + +#[derive(Clone, Debug)] +pub struct MemBuilder { + unicity: Vec<u8>, + row: ArcRow, + blob: ArcBlob, +} + +impl MemBuilder { + pub fn new(user: &str) -> Arc<Self> { + tracing::debug!("initialize membuilder for {}", user); + let mut unicity: Vec<u8> = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.extend_from_slice(user.as_bytes()); + Arc::new(Self { + unicity, + row: Arc::new(RwLock::new(HashMap::new())), + blob: Arc::new(RwLock::new(BTreeMap::new())), + }) + } +} + +#[async_trait] +impl IBuilder for MemBuilder { + async fn build(&self) -> Result<Store, StorageError> { + Ok(Box::new(MemStore { + row: self.row.clone(), + blob: self.blob.clone(), + })) + } + + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } +} + +pub struct MemStore { + row: ArcRow, + blob: ArcBlob, +} + +fn prefix_last_bound(prefix: &str) -> Bound<String> { + let mut sort_end = prefix.to_string(); + match sort_end.pop() { + None => Unbounded, + Some(ch) => { + let nc = char::from_u32(ch as u32 + 1).unwrap(); + sort_end.push(nc); + Excluded(sort_end) + } + } +} + +impl MemStore { + fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + tracing::trace!(entry=%entry, command="row_rm_single"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let shard = &entry.uid.shard; + let sort = &entry.uid.sort; + + let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Tombstone); + intval.version += 1; + intval.change.notify_waiters(); + + Ok(()) + } +} + +#[async_trait] +impl IStore for MemStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); + let store = self.row.read().or(Err(StorageError::Internal))?; + + match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range(( + Included(sort_begin.to_string()), + Excluded(sort_end.to_string()), + )) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()), + Selector::List(rlist) => { + let mut acc = vec![]; + for row_ref in rlist { + let maybe_intval = store + .get(&row_ref.uid.shard) + .map(|v| v.get(&row_ref.uid.sort)) + .flatten(); + if let Some(intval) = maybe_intval { + acc.push(intval.to_row_val(row_ref.clone())); + } + } + Ok(acc) + } + Selector::Prefix { shard, sort_prefix } => { + let last_bound = prefix_last_bound(sort_prefix); + + Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range((Included(sort_prefix.to_string()), last_bound)) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()) + } + Selector::Single(row_ref) => { + let intval = store + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; + Ok(vec![intval.to_row_val((*row_ref).clone())]) + } + } + } + + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); + + let values = match select { + Selector::Range { .. } | Selector::Prefix { .. } => self + .row_fetch(select) + .await? + .into_iter() + .map(|rv| rv.row_ref) + .collect::<Vec<_>>(), + Selector::List(rlist) => rlist.clone(), + Selector::Single(row_ref) => vec![(*row_ref).clone()], + }; + + for v in values.into_iter() { + self.row_rm_single(&v)?; + } + Ok(()) + } + + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + for v in values.into_iter() { + let shard = v.row_ref.uid.shard; + let sort = v.row_ref.uid.sort; + + let val = match v.value.into_iter().next() { + Some(Alternative::Value(x)) => x, + _ => vec![], + }; + + let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard).or_default(); + let intval = bt.entry(sort).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Value(val)); + intval.version += 1; + intval.change.notify_waiters(); + } + Ok(()) + } + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + tracing::trace!(entry=%value, command="row_poll"); + let shard = &value.uid.shard; + let sort = &value.uid.sort; + let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let notify_me = { + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if intval.version != cauz { + return Ok(intval.to_row_val(value.clone())); + } + intval.change.clone() + }; + + notify_me.notified().await; + + let res = self.row_fetch(&Selector::Single(value)).await?; + res.into_iter().next().ok_or(StorageError::NotFound) + } + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); + let store = self.blob.read().or(Err(StorageError::Internal))?; + store + .get(&blob_ref.0) + .ok_or(StorageError::NotFound) + .map(|v| v.to_blob_val(blob_ref)) + } + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); + entry.data = blob_val.value.clone(); + entry.metadata = blob_val.meta.clone(); + Ok(()) + } + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let blob_src = store.entry(src.0.clone()).or_default().clone(); + store.insert(dst.0.clone(), blob_src); + Ok(()) + } + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + tracing::trace!(prefix = prefix, command = "blob_list"); + let store = self.blob.read().or(Err(StorageError::Internal))?; + let last_bound = prefix_last_bound(prefix); + let blist = store + .range((Included(prefix.to_string()), last_bound)) + .map(|(k, _)| BlobRef(k.to_string())) + .collect::<Vec<_>>(); + Ok(blist) + } + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + store.remove(&blob_ref.0); + Ok(()) + } +} |