aboutsummaryrefslogtreecommitdiff
path: root/src/storage
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-12-18 17:09:44 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-12-18 17:09:44 +0100
commit3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf (patch)
treefff5d16e266788b28e812c24669f50118831512b /src/storage
parent684f4de225c44464abcb6a9cb2ef6dcae90537a8 (diff)
downloadaerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.tar.gz
aerogramme-3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf.zip
Storage trait new implementation
Diffstat (limited to 'src/storage')
-rw-r--r--src/storage/garage.rs30
-rw-r--r--src/storage/in_memory.rs35
-rw-r--r--src/storage/mod.rs209
3 files changed, 130 insertions, 144 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 8276f70..ff37287 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -1,7 +1,8 @@
use crate::storage::*;
+use serde::Serialize;
-#[derive(Clone, Debug, Hash)]
-pub struct GarageBuilder {
+#[derive(Clone, Debug, Serialize)]
+pub struct GarageConf {
pub region: String,
pub s3_endpoint: String,
pub k2v_endpoint: String,
@@ -10,10 +11,28 @@ pub struct GarageBuilder {
pub bucket: String,
}
+#[derive(Clone, Debug)]
+pub struct GarageBuilder {
+ conf: GarageConf,
+ unicity: Vec<u8>,
+}
+
+impl GarageBuilder {
+ pub fn new(conf: GarageConf) -> anyhow::Result<Arc<Self>> {
+ let mut unicity: Vec<u8> = vec![];
+ unicity.extend_from_slice(file!().as_bytes());
+ unicity.append(&mut rmp_serde::to_vec(&conf)?);
+ Ok(Arc::new(Self { conf, unicity }))
+ }
+}
+
impl IBuilder for GarageBuilder {
- fn build(&self) -> Box<dyn IStore> {
+ fn build(&self) -> Result<Store, StorageError> {
unimplemented!();
}
+ fn unique(&self) -> UnicityBuffer {
+ UnicityBuffer(self.unicity.clone())
+ }
}
pub struct GarageStore {
@@ -33,7 +52,7 @@ impl IStore for GarageStore {
unimplemented!();
}
- async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
+ async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
unimplemented!();
}
@@ -41,6 +60,9 @@ impl IStore for GarageStore {
unimplemented!();
}
+ async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError> {
+ unimplemented!();
+ }
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index 09c6763..6d0460f 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -14,18 +14,36 @@ pub type ArcBlob = Arc<RwLock<HashMap<String, Vec<u8>>>>;
#[derive(Clone, Debug)]
pub struct MemBuilder {
user: String,
- url: String,
+ unicity: Vec<u8>,
row: ArcRow,
blob: ArcBlob,
}
+impl MemBuilder {
+ pub fn new(user: &str) -> Arc<Self> {
+ let mut unicity: Vec<u8> = vec![];
+ unicity.extend_from_slice(file!().as_bytes());
+ unicity.extend_from_slice(user.as_bytes());
+ Arc::new(Self {
+ user: user.to_string(),
+ unicity,
+ row: Arc::new(RwLock::new(HashMap::new())),
+ blob: Arc::new(RwLock::new(HashMap::new())),
+ })
+ }
+}
+
impl IBuilder for MemBuilder {
- fn build(&self) -> Box<dyn IStore> {
- Box::new(MemStore {
+ fn build(&self) -> Result<Store, StorageError> {
+ Ok(Box::new(MemStore {
row: self.row.clone(),
blob: self.blob.clone(),
- })
+ }))
}
+
+ fn unique(&self) -> UnicityBuffer {
+ UnicityBuffer(self.unicity.clone())
+ }
}
pub struct MemStore {
@@ -56,7 +74,7 @@ impl IStore for MemStore {
.or(Err(StorageError::Internal))?
.get(*shard)
.ok_or(StorageError::NotFound)?
- .range((Included(sort_begin.to_string()), Included(sort_end.to_string())))
+ .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string())))
.map(|(k, v)| RowVal {
row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
value: vec![Alternative::Value(v.clone())],
@@ -100,7 +118,7 @@ impl IStore for MemStore {
},
Selector::Single(row_ref) => {
let bytes = self.inner_fetch(row_ref)?;
- Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}])
+ Ok(vec![RowVal{ row_ref: (*row_ref).clone(), value: vec![Alternative::Value(bytes)]}])
}
}
}
@@ -113,7 +131,7 @@ impl IStore for MemStore {
unimplemented!();
}
- async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
+ async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
unimplemented!();
}
@@ -121,6 +139,9 @@ impl IStore for MemStore {
unimplemented!();
}
+ async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError> {
+ unimplemented!();
+ }
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index cb66d58..8004ac5 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -11,9 +11,9 @@
pub mod in_memory;
pub mod garage;
-use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+use std::hash::Hash;
use std::collections::HashMap;
-use futures::future::BoxFuture;
use async_trait::async_trait;
#[derive(Debug, Clone)]
@@ -23,45 +23,95 @@ pub enum Alternative {
}
type ConcurrentValues = Vec<Alternative>;
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum StorageError {
NotFound,
Internal,
}
+impl std::fmt::Display for StorageError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str("Storage Error: ")?;
+ match self {
+ Self::NotFound => f.write_str("Item not found"),
+ Self::Internal => f.write_str("An internal error occured"),
+ }
+ }
+}
+impl std::error::Error for StorageError {}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq)]
pub struct RowUid {
- shard: String,
- sort: String,
+ pub shard: String,
+ pub sort: String,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq)]
pub struct RowRef {
- uid: RowUid,
- causality: Option<String>,
+ pub uid: RowUid,
+ pub causality: Option<String>,
+}
+
+impl RowRef {
+ pub fn new(shard: &str, sort: &str) -> Self {
+ Self {
+ uid: RowUid {
+ shard: shard.to_string(),
+ sort: sort.to_string(),
+ },
+ causality: None,
+ }
+ }
}
#[derive(Debug, Clone)]
pub struct RowVal {
- row_ref: RowRef,
- value: ConcurrentValues,
+ pub row_ref: RowRef,
+ pub value: ConcurrentValues,
}
+impl RowVal {
+ pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self {
+ Self {
+ row_ref,
+ value: vec![Alternative::Value(value)],
+ }
+ }
+}
+
+
#[derive(Debug, Clone)]
-pub struct BlobRef(String);
+pub struct BlobRef(pub String);
+impl BlobRef {
+ pub fn new(key: &str) -> Self {
+ Self(key.to_string())
+ }
+}
#[derive(Debug, Clone)]
pub struct BlobVal {
- blob_ref: BlobRef,
- meta: HashMap<String, String>,
- value: Vec<u8>,
+ pub blob_ref: BlobRef,
+ pub meta: HashMap<String, String>,
+ pub value: Vec<u8>,
+}
+impl BlobVal {
+ pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self {
+ Self {
+ blob_ref, value,
+ meta: HashMap::new(),
+ }
+ }
+
+ pub fn with_meta(mut self, k: String, v: String) -> Self {
+ self.meta.insert(k, v);
+ self
+ }
}
pub enum Selector<'a> {
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
List (Vec<RowRef>), // list of (shard_key, sort_key)
Prefix { shard: &'a str, sort_prefix: &'a str },
- Single(RowRef),
+ Single(&'a RowRef),
}
#[async_trait]
@@ -69,131 +119,24 @@ pub trait IStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
- async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError>;
+ async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
+ async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError>;
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError>;
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
}
-pub trait IBuilder {
- fn build(&self) -> Box<dyn IStore>;
-}
-
-
-
-
-
-
-/*
-#[derive(Clone, Debug, PartialEq)]
-pub enum OrphanRowRef {
- Garage(garage::GrgOrphanRowRef),
- Memory(in_memory::MemOrphanRowRef),
-}
-
+#[derive(Clone,Debug,PartialEq,Eq,Hash)]
+pub struct UnicityBuffer(Vec<u8>);
+pub trait IBuilder: std::fmt::Debug {
+ fn build(&self) -> Result<Store, StorageError>;
-
-impl std::fmt::Display for StorageError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str("Storage Error: ")?;
- match self {
- Self::NotFound => f.write_str("Item not found"),
- Self::Internal => f.write_str("An internal error occured"),
- Self::IncompatibleOrphan => f.write_str("Incompatible orphan"),
- }
- }
+ /// Returns an opaque buffer that uniquely identifies this builder
+ fn unique(&self) -> UnicityBuffer;
}
-impl std::error::Error for StorageError {}
-// Utils
-pub type AsyncResult<'a, T> = BoxFuture<'a, Result<T, StorageError>>;
-
-// ----- Builders
-pub trait IBuilders {
- fn box_clone(&self) -> Builders;
- fn row_store(&self) -> Result<RowStore, StorageError>;
- fn blob_store(&self) -> Result<BlobStore, StorageError>;
- fn url(&self) -> &str;
-}
-pub type Builders = Box<dyn IBuilders + Send + Sync>;
-impl Clone for Builders {
- fn clone(&self) -> Self {
- self.box_clone()
- }
-}
-impl std::fmt::Debug for Builders {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str("aerogramme::storage::Builder")
- }
-}
-impl PartialEq for Builders {
- fn eq(&self, other: &Self) -> bool {
- self.url() == other.url()
- }
-}
-impl Eq for Builders {}
-impl Hash for Builders {
- fn hash<H: Hasher>(&self, state: &mut H) {
- self.url().hash(state);
- }
-}
-
-// ------ Row
-pub trait IRowStore
-{
- fn row(&self, partition: &str, sort: &str) -> RowRef;
- fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>;
- fn rm(&self, selector: Selector) -> AsyncResult<()>;
- fn from_orphan(&self, orphan: OrphanRowRef) -> Result<RowRef, StorageError>;
-}
-pub type RowStore = Box<dyn IRowStore + Sync + Send>;
-
-pub trait IRowRef: std::fmt::Debug
-{
- fn to_orphan(&self) -> OrphanRowRef;
- fn key(&self) -> (&str, &str);
- fn set_value(&self, content: &[u8]) -> RowValue;
- fn fetch(&self) -> AsyncResult<RowValue>;
- fn rm(&self) -> AsyncResult<()>;
- fn poll(&self) -> AsyncResult<RowValue>;
-}
-pub type RowRef<'a> = Box<dyn IRowRef + Send + Sync + 'a>;
-
-pub trait IRowValue: std::fmt::Debug
-{
- fn to_ref(&self) -> RowRef;
- fn content(&self) -> ConcurrentValues;
- fn push(&self) -> AsyncResult<()>;
-}
-pub type RowValue = Box<dyn IRowValue + Send + Sync>;
-
-// ------- Blob
-pub trait IBlobStore
-{
- fn blob(&self, key: &str) -> BlobRef;
- fn list(&self, prefix: &str) -> AsyncResult<Vec<BlobRef>>;
-}
-pub type BlobStore = Box<dyn IBlobStore + Send + Sync>;
-
-pub trait IBlobRef
-{
- fn set_value(&self, content: Vec<u8>) -> BlobValue;
- fn key(&self) -> &str;
- fn fetch(&self) -> AsyncResult<BlobValue>;
- fn copy(&self, dst: &BlobRef) -> AsyncResult<()>;
- fn rm(&self) -> AsyncResult<()>;
-}
-pub type BlobRef = Box<dyn IBlobRef + Send + Sync>;
-
-pub trait IBlobValue {
- fn to_ref(&self) -> BlobRef;
- fn get_meta(&self, key: &str) -> Option<&[u8]>;
- fn set_meta(&mut self, key: &str, val: &str);
- fn content(&self) -> Option<&[u8]>;
- fn push(&self) -> AsyncResult<()>;
-}
-pub type BlobValue = Box<dyn IBlobValue + Send + Sync>;
-*/
+pub type Builder = Arc<dyn IBuilder + Send + Sync>;
+pub type Store = Box<dyn IStore + Send + Sync>;