aboutsummaryrefslogblamecommitdiff
path: root/aero-user/src/storage/in_memory.rs
blob: a676797681bacf49b9c48d9cf6481b7927b84864 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
                      
                               
                                                           
                      
                        





                                                                               






                                                                       
                                                                    







                                                     













                                                            
                




                            








                                               





                                                                    

                                                                      












                                                      
                                   







                                                                             


                       
                     

                  

 

                                         
                                                              



                                                      

                                                       
                                                         



          
              
                              
                                                          
                              

                                    
           
     



                                           
 
 


                     

 








                                                            
     
 
 

























                                                                             


                                                                                               
                                                             

                                                                     
                      












                                                                  


                                      



                                                          


                                                                     

                       
             
                                                        


                                                                
                                
                                                
                                                                           
                                                                      
                                         
             
                                          
                                  



                                                    
                                                               




                                                                                   
                                                          

                                   





                                                                    


                                                                  

                                     
                                    

              

     
                                                                                 
                                                                                                                                   













                                                                            
 










                                                       
     
                                                                              
                                                          







                                                                             


                                                                              










                                                                  
     
 
                                                                                     
                                                               
                                                                      



                                             
     
                                                                                
                                                                         




                                                                           
     
                                                                                         
                                                                 



                                                                           
     
                                                                                   
                                                                

                                                                      



                                                              
                 
     
                                                                             
                                                            


                                                                           

     
use crate::storage::*;
use std::collections::BTreeMap;
use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::sync::RwLock;
use tokio::sync::Notify;

/// This implementation is very inneficient, and not completely correct
/// Indeed, when the connector is dropped, the memory is freed.
/// It means that when a user disconnects, its data are lost.
/// It's intended only for basic debugging, do not use it for advanced tests...

#[derive(Debug, Default)]
pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>);
impl MemDb {
    pub fn new() -> Self {
        Self(tokio::sync::Mutex::new(HashMap::new()))
    }

    pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
        let mut global_storage = self.0.lock().await;
        global_storage
            .entry(username.to_string())
            .or_insert(MemBuilder::new(username))
            .clone()
    }
}

#[derive(Debug, Clone)]
enum InternalData {
    Tombstone,
    Value(Vec<u8>),
}
impl InternalData {
    fn to_alternative(&self) -> Alternative {
        match self {
            Self::Tombstone => Alternative::Tombstone,
            Self::Value(x) => Alternative::Value(x.clone()),
        }
    }
}

#[derive(Debug)]
struct InternalRowVal {
    data: Vec<InternalData>,
    version: u64,
    change: Arc<Notify>,
}
impl std::default::Default for InternalRowVal {
    fn default() -> Self {
        Self {
            data: vec![],
            version: 1,
            change: Arc::new(Notify::new()),
        }
    }
}
impl InternalRowVal {
    fn concurrent_values(&self) -> Vec<Alternative> {
        self.data.iter().map(InternalData::to_alternative).collect()
    }

    fn to_row_val(&self, row_ref: RowRef) -> RowVal {
        RowVal {
            row_ref: row_ref.with_causality(self.version.to_string()),
            value: self.concurrent_values(),
        }
    }
}

#[derive(Debug, Default, Clone)]
struct InternalBlobVal {
    data: Vec<u8>,
    metadata: HashMap<String, String>,
}
impl InternalBlobVal {
    fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
        BlobVal {
            blob_ref: bref.clone(),
            meta: self.metadata.clone(),
            value: self.data.clone(),
        }
    }
}

type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>;
type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>;

#[derive(Clone, Debug)]
pub struct MemBuilder {
    unicity: Vec<u8>,
    row: ArcRow,
    blob: ArcBlob,
}

impl MemBuilder {
    pub fn new(user: &str) -> Arc<Self> {
        tracing::debug!("initialize membuilder for {}", user);
        let mut unicity: Vec<u8> = vec![];
        unicity.extend_from_slice(file!().as_bytes());
        unicity.extend_from_slice(user.as_bytes());
        Arc::new(Self {
            unicity,
            row: Arc::new(RwLock::new(HashMap::new())),
            blob: Arc::new(RwLock::new(BTreeMap::new())),
        })
    }
}

#[async_trait]
impl IBuilder for MemBuilder {
    async fn build(&self) -> Result<Store, StorageError> {
        Ok(Box::new(MemStore {
            row: self.row.clone(),
            blob: self.blob.clone(),
        }))
    }

    fn unique(&self) -> UnicityBuffer {
        UnicityBuffer(self.unicity.clone())
    }
}

pub struct MemStore {
    row: ArcRow,
    blob: ArcBlob,
}

fn prefix_last_bound(prefix: &str) -> Bound<String> {
    let mut sort_end = prefix.to_string();
    match sort_end.pop() {
        None => Unbounded,
        Some(ch) => {
            let nc = char::from_u32(ch as u32 + 1).unwrap();
            sort_end.push(nc);
            Excluded(sort_end)
        }
    }
}

impl MemStore {
    fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
        tracing::trace!(entry=%entry, command="row_rm_single");
        let mut store = self.row.write().or(Err(StorageError::Internal))?;
        let shard = &entry.uid.shard;
        let sort = &entry.uid.sort;

        let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
            Some(Ok(v)) => v,
            _ => 0,
        };

        let bt = store.entry(shard.to_string()).or_default();
        let intval = bt.entry(sort.to_string()).or_default();

        if cauz == intval.version {
            intval.data.clear();
        }
        intval.data.push(InternalData::Tombstone);
        intval.version += 1;
        intval.change.notify_waiters();

        Ok(())
    }
}

#[async_trait]
impl IStore for MemStore {
    async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
        tracing::trace!(select=%select, command="row_fetch");
        let store = self.row.read().or(Err(StorageError::Internal))?;

        match select {
            Selector::Range {
                shard,
                sort_begin,
                sort_end,
            } => Ok(store
                .get(*shard)
                .unwrap_or(&BTreeMap::new())
                .range((
                    Included(sort_begin.to_string()),
                    Excluded(sort_end.to_string()),
                ))
                .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
                .collect::<Vec<_>>()),
            Selector::List(rlist) => {
                let mut acc = vec![];
                for row_ref in rlist {
                    let maybe_intval = store
                        .get(&row_ref.uid.shard)
                        .map(|v| v.get(&row_ref.uid.sort))
                        .flatten();
                    if let Some(intval) = maybe_intval {
                        acc.push(intval.to_row_val(row_ref.clone()));
                    }
                }
                Ok(acc)
            }
            Selector::Prefix { shard, sort_prefix } => {
                let last_bound = prefix_last_bound(sort_prefix);

                Ok(store
                    .get(*shard)
                    .unwrap_or(&BTreeMap::new())
                    .range((Included(sort_prefix.to_string()), last_bound))
                    .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
                    .collect::<Vec<_>>())
            }
            Selector::Single(row_ref) => {
                let intval = store
                    .get(&row_ref.uid.shard)
                    .ok_or(StorageError::NotFound)?
                    .get(&row_ref.uid.sort)
                    .ok_or(StorageError::NotFound)?;
                Ok(vec![intval.to_row_val((*row_ref).clone())])
            }
        }
    }

    async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
        tracing::trace!(select=%select, command="row_rm");

        let values = match select {
            Selector::Range { .. } | Selector::Prefix { .. } => self
                .row_fetch(select)
                .await?
                .into_iter()
                .map(|rv| rv.row_ref)
                .collect::<Vec<_>>(),
            Selector::List(rlist) => rlist.clone(),
            Selector::Single(row_ref) => vec![(*row_ref).clone()],
        };

        for v in values.into_iter() {
            self.row_rm_single(&v)?;
        }
        Ok(())
    }

    async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
        tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
        let mut store = self.row.write().or(Err(StorageError::Internal))?;
        for v in values.into_iter() {
            let shard = v.row_ref.uid.shard;
            let sort = v.row_ref.uid.sort;

            let val = match v.value.into_iter().next() {
                Some(Alternative::Value(x)) => x,
                _ => vec![],
            };

            let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) {
                Some(Ok(v)) => v,
                _ => 0,
            };

            let bt = store.entry(shard).or_default();
            let intval = bt.entry(sort).or_default();

            if cauz == intval.version {
                intval.data.clear();
            }
            intval.data.push(InternalData::Value(val));
            intval.version += 1;
            intval.change.notify_waiters();
        }
        Ok(())
    }
    async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
        tracing::trace!(entry=%value, command="row_poll");
        let shard = &value.uid.shard;
        let sort = &value.uid.sort;
        let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) {
            Some(Ok(v)) => v,
            _ => 0,
        };

        let notify_me = {
            let mut store = self.row.write().or(Err(StorageError::Internal))?;
            let bt = store.entry(shard.to_string()).or_default();
            let intval = bt.entry(sort.to_string()).or_default();

            if intval.version != cauz {
                return Ok(intval.to_row_val(value.clone()));
            }
            intval.change.clone()
        };

        notify_me.notified().await;

        let res = self.row_fetch(&Selector::Single(value)).await?;
        res.into_iter().next().ok_or(StorageError::NotFound)
    }

    async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
        tracing::trace!(entry=%blob_ref, command="blob_fetch");
        let store = self.blob.read().or(Err(StorageError::Internal))?;
        store
            .get(&blob_ref.0)
            .ok_or(StorageError::NotFound)
            .map(|v| v.to_blob_val(blob_ref))
    }
    async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
        tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
        let mut store = self.blob.write().or(Err(StorageError::Internal))?;
        let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
        entry.data = blob_val.value.clone();
        entry.metadata = blob_val.meta.clone();
        Ok(())
    }
    async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
        tracing::trace!(src=%src, dst=%dst, command="blob_copy");
        let mut store = self.blob.write().or(Err(StorageError::Internal))?;
        let blob_src = store.entry(src.0.clone()).or_default().clone();
        store.insert(dst.0.clone(), blob_src);
        Ok(())
    }
    async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
        tracing::trace!(prefix = prefix, command = "blob_list");
        let store = self.blob.read().or(Err(StorageError::Internal))?;
        let last_bound = prefix_last_bound(prefix);
        let blist = store
            .range((Included(prefix.to_string()), last_bound))
            .map(|(k, _)| BlobRef(k.to_string()))
            .collect::<Vec<_>>();
        Ok(blist)
    }
    async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
        tracing::trace!(entry=%blob_ref, command="blob_rm");
        let mut store = self.blob.write().or(Err(StorageError::Internal))?;
        store.remove(&blob_ref.0);
        Ok(())
    }
}