aboutsummaryrefslogtreecommitdiff
path: root/src/storage/in_memory.rs
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-03-08 08:17:03 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-03-08 08:17:03 +0100
commit1a43ce5ac7033c148f64a033f2b1d335e95e11d5 (patch)
tree60b234604170fe207248458a9c4cdd3f4b7c36f2 /src/storage/in_memory.rs
parentbb9cb386b65834c44cae86bd100f800883022062 (diff)
downloadaerogramme-1a43ce5ac7033c148f64a033f2b1d335e95e11d5.tar.gz
aerogramme-1a43ce5ac7033c148f64a033f2b1d335e95e11d5.zip
WIP refactor
Diffstat (limited to 'src/storage/in_memory.rs')
-rw-r--r--src/storage/in_memory.rs334
1 files changed, 0 insertions, 334 deletions
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
deleted file mode 100644
index 3c3a94c..0000000
--- a/src/storage/in_memory.rs
+++ /dev/null
@@ -1,334 +0,0 @@
-use crate::storage::*;
-use std::collections::{BTreeMap, HashMap};
-use std::ops::Bound::{self, Excluded, Included, Unbounded};
-use std::sync::{Arc, RwLock};
-use tokio::sync::Notify;
-
-/// This implementation is very inneficient, and not completely correct
-/// Indeed, when the connector is dropped, the memory is freed.
-/// It means that when a user disconnects, its data are lost.
-/// It's intended only for basic debugging, do not use it for advanced tests...
-
-#[derive(Debug, Default)]
-pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>);
-impl MemDb {
- pub fn new() -> Self {
- Self(tokio::sync::Mutex::new(HashMap::new()))
- }
-
- pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
- let mut global_storage = self.0.lock().await;
- global_storage
- .entry(username.to_string())
- .or_insert(MemBuilder::new(username))
- .clone()
- }
-}
-
-#[derive(Debug, Clone)]
-enum InternalData {
- Tombstone,
- Value(Vec<u8>),
-}
-impl InternalData {
- fn to_alternative(&self) -> Alternative {
- match self {
- Self::Tombstone => Alternative::Tombstone,
- Self::Value(x) => Alternative::Value(x.clone()),
- }
- }
-}
-
-#[derive(Debug)]
-struct InternalRowVal {
- data: Vec<InternalData>,
- version: u64,
- change: Arc<Notify>,
-}
-impl std::default::Default for InternalRowVal {
- fn default() -> Self {
- Self {
- data: vec![],
- version: 1,
- change: Arc::new(Notify::new()),
- }
- }
-}
-impl InternalRowVal {
- fn concurrent_values(&self) -> Vec<Alternative> {
- self.data.iter().map(InternalData::to_alternative).collect()
- }
-
- fn to_row_val(&self, row_ref: RowRef) -> RowVal {
- RowVal {
- row_ref: row_ref.with_causality(self.version.to_string()),
- value: self.concurrent_values(),
- }
- }
-}
-
-#[derive(Debug, Default, Clone)]
-struct InternalBlobVal {
- data: Vec<u8>,
- metadata: HashMap<String, String>,
-}
-impl InternalBlobVal {
- fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
- BlobVal {
- blob_ref: bref.clone(),
- meta: self.metadata.clone(),
- value: self.data.clone(),
- }
- }
-}
-
-type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>;
-type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>;
-
-#[derive(Clone, Debug)]
-pub struct MemBuilder {
- unicity: Vec<u8>,
- row: ArcRow,
- blob: ArcBlob,
-}
-
-impl MemBuilder {
- pub fn new(user: &str) -> Arc<Self> {
- tracing::debug!("initialize membuilder for {}", user);
- let mut unicity: Vec<u8> = vec![];
- unicity.extend_from_slice(file!().as_bytes());
- unicity.extend_from_slice(user.as_bytes());
- Arc::new(Self {
- unicity,
- row: Arc::new(RwLock::new(HashMap::new())),
- blob: Arc::new(RwLock::new(BTreeMap::new())),
- })
- }
-}
-
-#[async_trait]
-impl IBuilder for MemBuilder {
- async fn build(&self) -> Result<Store, StorageError> {
- Ok(Box::new(MemStore {
- row: self.row.clone(),
- blob: self.blob.clone(),
- }))
- }
-
- fn unique(&self) -> UnicityBuffer {
- UnicityBuffer(self.unicity.clone())
- }
-}
-
-pub struct MemStore {
- row: ArcRow,
- blob: ArcBlob,
-}
-
-fn prefix_last_bound(prefix: &str) -> Bound<String> {
- let mut sort_end = prefix.to_string();
- match sort_end.pop() {
- None => Unbounded,
- Some(ch) => {
- let nc = char::from_u32(ch as u32 + 1).unwrap();
- sort_end.push(nc);
- Excluded(sort_end)
- }
- }
-}
-
-impl MemStore {
- fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
- tracing::trace!(entry=%entry, command="row_rm_single");
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- let shard = &entry.uid.shard;
- let sort = &entry.uid.sort;
-
- let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let bt = store.entry(shard.to_string()).or_default();
- let intval = bt.entry(sort.to_string()).or_default();
-
- if cauz == intval.version {
- intval.data.clear();
- }
- intval.data.push(InternalData::Tombstone);
- intval.version += 1;
- intval.change.notify_waiters();
-
- Ok(())
- }
-}
-
-#[async_trait]
-impl IStore for MemStore {
- async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
- tracing::trace!(select=%select, command="row_fetch");
- let store = self.row.read().or(Err(StorageError::Internal))?;
-
- match select {
- Selector::Range {
- shard,
- sort_begin,
- sort_end,
- } => Ok(store
- .get(*shard)
- .unwrap_or(&BTreeMap::new())
- .range((
- Included(sort_begin.to_string()),
- Excluded(sort_end.to_string()),
- ))
- .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
- .collect::<Vec<_>>()),
- Selector::List(rlist) => {
- let mut acc = vec![];
- for row_ref in rlist {
- let maybe_intval = store
- .get(&row_ref.uid.shard)
- .map(|v| v.get(&row_ref.uid.sort))
- .flatten();
- if let Some(intval) = maybe_intval {
- acc.push(intval.to_row_val(row_ref.clone()));
- }
- }
- Ok(acc)
- }
- Selector::Prefix { shard, sort_prefix } => {
- let last_bound = prefix_last_bound(sort_prefix);
-
- Ok(store
- .get(*shard)
- .unwrap_or(&BTreeMap::new())
- .range((Included(sort_prefix.to_string()), last_bound))
- .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
- .collect::<Vec<_>>())
- }
- Selector::Single(row_ref) => {
- let intval = store
- .get(&row_ref.uid.shard)
- .ok_or(StorageError::NotFound)?
- .get(&row_ref.uid.sort)
- .ok_or(StorageError::NotFound)?;
- Ok(vec![intval.to_row_val((*row_ref).clone())])
- }
- }
- }
-
- async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
- tracing::trace!(select=%select, command="row_rm");
-
- let values = match select {
- Selector::Range { .. } | Selector::Prefix { .. } => self
- .row_fetch(select)
- .await?
- .into_iter()
- .map(|rv| rv.row_ref)
- .collect::<Vec<_>>(),
- Selector::List(rlist) => rlist.clone(),
- Selector::Single(row_ref) => vec![(*row_ref).clone()],
- };
-
- for v in values.into_iter() {
- self.row_rm_single(&v)?;
- }
- Ok(())
- }
-
- async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
- tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- for v in values.into_iter() {
- let shard = v.row_ref.uid.shard;
- let sort = v.row_ref.uid.sort;
-
- let val = match v.value.into_iter().next() {
- Some(Alternative::Value(x)) => x,
- _ => vec![],
- };
-
- let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let bt = store.entry(shard).or_default();
- let intval = bt.entry(sort).or_default();
-
- if cauz == intval.version {
- intval.data.clear();
- }
- intval.data.push(InternalData::Value(val));
- intval.version += 1;
- intval.change.notify_waiters();
- }
- Ok(())
- }
- async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
- tracing::trace!(entry=%value, command="row_poll");
- let shard = &value.uid.shard;
- let sort = &value.uid.sort;
- let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let notify_me = {
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- let bt = store.entry(shard.to_string()).or_default();
- let intval = bt.entry(sort.to_string()).or_default();
-
- if intval.version != cauz {
- return Ok(intval.to_row_val(value.clone()));
- }
- intval.change.clone()
- };
-
- notify_me.notified().await;
-
- let res = self.row_fetch(&Selector::Single(value)).await?;
- res.into_iter().next().ok_or(StorageError::NotFound)
- }
-
- async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
- tracing::trace!(entry=%blob_ref, command="blob_fetch");
- let store = self.blob.read().or(Err(StorageError::Internal))?;
- store
- .get(&blob_ref.0)
- .ok_or(StorageError::NotFound)
- .map(|v| v.to_blob_val(blob_ref))
- }
- async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
- tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
- let mut store = self.blob.write().or(Err(StorageError::Internal))?;
- let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
- entry.data = blob_val.value.clone();
- entry.metadata = blob_val.meta.clone();
- Ok(())
- }
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
- tracing::trace!(src=%src, dst=%dst, command="blob_copy");
- let mut store = self.blob.write().or(Err(StorageError::Internal))?;
- let blob_src = store.entry(src.0.clone()).or_default().clone();
- store.insert(dst.0.clone(), blob_src);
- Ok(())
- }
- async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- tracing::trace!(prefix = prefix, command = "blob_list");
- let store = self.blob.read().or(Err(StorageError::Internal))?;
- let last_bound = prefix_last_bound(prefix);
- let blist = store
- .range((Included(prefix.to_string()), last_bound))
- .map(|(k, _)| BlobRef(k.to_string()))
- .collect::<Vec<_>>();
- Ok(blist)
- }
- async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
- tracing::trace!(entry=%blob_ref, command="blob_rm");
- let mut store = self.blob.write().or(Err(StorageError::Internal))?;
- store.remove(&blob_ref.0);
- Ok(())
- }
-}