aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-12-16 11:13:32 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-12-16 11:13:32 +0100
commit684f4de225c44464abcb6a9cb2ef6dcae90537a8 (patch)
treecbbc9ba773ed3d9f8f011637d58791af79672774
parent1b5f2eb695d658c57ba9c4264e76ca13bd82a958 (diff)
downloadaerogramme-684f4de225c44464abcb6a9cb2ef6dcae90537a8.tar.gz
aerogramme-684f4de225c44464abcb6a9cb2ef6dcae90537a8.zip
new new new storage interface
-rw-r--r--src/login/ldap_provider.rs4
-rw-r--r--src/login/static_provider.rs27
-rw-r--r--src/storage/garage.rs118
-rw-r--r--src/storage/in_memory.rs177
-rw-r--r--src/storage/mod.rs95
5 files changed, 219 insertions, 202 deletions
diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs
index 6e94061..4e3af3c 100644
--- a/src/login/ldap_provider.rs
+++ b/src/login/ldap_provider.rs
@@ -87,7 +87,9 @@ impl LdapLoginProvider {
fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builders> {
let storage: Builders = match &self.storage_specific {
- StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem {}),
+ StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem::new(
+ &get_attr(user, &self.username_attr)?
+ )),
StorageSpecific::Garage { from_config, bucket_source } => {
let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?;
diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs
index 4a8d484..788a4c5 100644
--- a/src/login/static_provider.rs
+++ b/src/login/static_provider.rs
@@ -11,10 +11,15 @@ use crate::config::*;
use crate::login::*;
use crate::storage;
+pub struct ContextualUserEntry {
+ pub username: String,
+ pub config: UserEntry,
+}
+
#[derive(Default)]
pub struct UserDatabase {
- users: HashMap<String, Arc<UserEntry>>,
- users_by_email: HashMap<String, Arc<UserEntry>>,
+ users: HashMap<String, Arc<ContextualUserEntry>>,
+ users_by_email: HashMap<String, Arc<ContextualUserEntry>>,
}
pub struct StaticLoginProvider {
@@ -35,12 +40,12 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>)
let users = ulist
.into_iter()
- .map(|(k, v)| (k, Arc::new(v)))
+ .map(|(username, config)| (username.clone() , Arc::new(ContextualUserEntry { username, config })))
.collect::<HashMap<_, _>>();
let mut users_by_email = HashMap::new();
for (_, u) in users.iter() {
- for m in u.email_addresses.iter() {
+ for m in u.config.email_addresses.iter() {
if users_by_email.contains_key(m) {
tracing::warn!("Several users have the same email address: {}", m);
continue
@@ -78,13 +83,13 @@ impl LoginProvider for StaticLoginProvider {
};
tracing::debug!(user=%username, "verify password");
- if !verify_password(password, &user.password)? {
+ if !verify_password(password, &user.config.password)? {
bail!("Wrong password");
}
tracing::debug!(user=%username, "fetch keys");
- let storage: storage::Builders = match &user.storage {
- StaticStorage::InMemory => Box::new(storage::in_memory::FullMem {}),
+ let storage: storage::Builders = match &user.config.storage {
+ StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(username)),
StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds {
region: grgconf.aws_region.clone(),
k2v_endpoint: grgconf.k2v_endpoint.clone(),
@@ -95,7 +100,7 @@ impl LoginProvider for StaticLoginProvider {
}),
};
- let cr = CryptoRoot(user.crypto_root.clone());
+ let cr = CryptoRoot(user.config.crypto_root.clone());
let keys = cr.crypto_keys(password)?;
tracing::debug!(user=%username, "logged");
@@ -109,8 +114,8 @@ impl LoginProvider for StaticLoginProvider {
Some(u) => u,
};
- let storage: storage::Builders = match &user.storage {
- StaticStorage::InMemory => Box::new(storage::in_memory::FullMem {}),
+ let storage: storage::Builders = match &user.config.storage {
+ StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(&user.username)),
StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds {
region: grgconf.aws_region.clone(),
k2v_endpoint: grgconf.k2v_endpoint.clone(),
@@ -121,7 +126,7 @@ impl LoginProvider for StaticLoginProvider {
}),
};
- let cr = CryptoRoot(user.crypto_root.clone());
+ let cr = CryptoRoot(user.config.crypto_root.clone());
let public_key = cr.public_key()?;
Ok(PublicCredentials {
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 052e812..8276f70 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -1,7 +1,7 @@
use crate::storage::*;
#[derive(Clone, Debug, Hash)]
-pub struct GrgCreds {
+pub struct GarageBuilder {
pub region: String,
pub s3_endpoint: String,
pub k2v_endpoint: String,
@@ -9,133 +9,47 @@ pub struct GrgCreds {
pub aws_secret_access_key: String,
pub bucket: String,
}
-pub struct GrgStore {}
-pub struct GrgRef {}
-pub struct GrgValue {}
-#[derive(Clone, Debug, PartialEq)]
-pub struct GrgOrphanRowRef {}
-
-impl IBuilders for GrgCreds {
- fn row_store(&self) -> Result<RowStore, StorageError> {
- unimplemented!();
- }
-
- fn blob_store(&self) -> Result<BlobStore, StorageError> {
+impl IBuilder for GarageBuilder {
+ fn build(&self) -> Box<dyn IStore> {
unimplemented!();
}
+}
- fn url(&self) -> &str {
- return "grg://unimplemented;"
- }
+pub struct GarageStore {
+ dummy: String,
}
-impl IRowStore for GrgStore {
- fn row(&self, partition: &str, sort: &str) -> RowRef {
+#[async_trait]
+impl IStore for GarageStore {
+ async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
unimplemented!();
}
-
- fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
+ async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!();
}
- fn rm(&self, selector: Selector) -> AsyncResult<()> {
+ async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
unimplemented!();
- }
- fn from_orphan(&self, orphan: OrphanRowRef) -> Result<RowRef, StorageError> {
- unimplemented!();
}
-}
-
-impl IRowRef for GrgRef {
- /*fn clone_boxed(&self) -> RowRef {
+ async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
unimplemented!();
- }*/
- fn to_orphan(&self) -> OrphanRowRef {
- unimplemented!()
}
- fn key(&self) -> (&str, &str) {
+ async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
- }
- fn set_value(&self, content: &[u8]) -> RowValue {
- unimplemented!();
- }
- fn fetch(&self) -> AsyncResult<RowValue> {
- unimplemented!();
}
- fn rm(&self) -> AsyncResult<()> {
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
- }
- fn poll(&self) -> AsyncResult<RowValue> {
- unimplemented!();
- }
-}
-impl std::fmt::Debug for GrgRef {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- unimplemented!();
}
-}
-
-impl IRowValue for GrgValue {
- fn to_ref(&self) -> RowRef {
+ async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
unimplemented!();
}
- fn content(&self) -> ConcurrentValues {
+ async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
unimplemented!();
}
- fn push(&self) -> AsyncResult<()> {
- unimplemented!();
- }
-}
-
-impl std::fmt::Debug for GrgValue {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- unimplemented!();
- }
-}
-
-
-/*
-/// A custom S3 region, composed of a region name and endpoint.
-/// We use this instead of rusoto_signature::Region so that we can
-/// derive Hash and Eq
-
-
-#[derive(Clone, Debug, Hash, PartialEq, Eq)]
-pub struct Region {
- pub name: String,
- pub endpoint: String,
-}
-
-impl Region {
- pub fn as_rusoto_region(&self) -> rusoto_signature::Region {
- rusoto_signature::Region::Custom {
- name: self.name.clone(),
- endpoint: self.endpoint.clone(),
- }
- }
}
-*/
-
-/*
-pub struct Garage {
- pub s3_region: Region,
- pub k2v_region: Region,
-
- pub aws_access_key_id: String,
- pub aws_secret_access_key: String,
- pub bucket: String,
-}
-
-impl StoreBuilder<> for Garage {
- fn row_store(&self) ->
-}
-
-pub struct K2V {}
-impl RowStore for K2V {
-}*/
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index 5ba9461..09c6763 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -1,97 +1,134 @@
-use futures::FutureExt;
use crate::storage::*;
-
-#[derive(Clone, Debug, Hash)]
-pub struct FullMem {}
-pub struct MemStore {}
-pub struct MemRef {}
-pub struct MemValue {}
-
-#[derive(Clone, Debug, PartialEq)]
-pub struct MemOrphanRowRef {}
-
-impl IBuilders for FullMem {
- fn row_store(&self) -> Result<RowStore, StorageError> {
- unimplemented!();
- }
-
- fn blob_store(&self) -> Result<BlobStore, StorageError> {
- unimplemented!();
- }
-
- fn url(&self) -> &str {
- return "mem://unimplemented;"
- }
+use std::collections::{HashMap, BTreeMap};
+use std::ops::Bound::{Included, Unbounded, Excluded};
+use std::sync::{Arc, RwLock};
+
+/// This implementation is very inneficient, and not completely correct
+/// Indeed, when the connector is dropped, the memory is freed.
+/// It means that when a user disconnects, its data are lost.
+/// It's intended only for basic debugging, do not use it for advanced tests...
+
+pub type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, Vec<u8>>>>>;
+pub type ArcBlob = Arc<RwLock<HashMap<String, Vec<u8>>>>;
+
+#[derive(Clone, Debug)]
+pub struct MemBuilder {
+ user: String,
+ url: String,
+ row: ArcRow,
+ blob: ArcBlob,
}
-impl IRowStore for MemStore {
- fn row(&self, partition: &str, sort: &str) -> RowRef {
- unimplemented!();
- }
-
- fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
- unimplemented!()
- }
-
- fn rm(&self, selector: Selector) -> AsyncResult<()> {
- unimplemented!();
- }
+impl IBuilder for MemBuilder {
+ fn build(&self) -> Box<dyn IStore> {
+ Box::new(MemStore {
+ row: self.row.clone(),
+ blob: self.blob.clone(),
+ })
+ }
+}
- fn from_orphan(&self, orphan: OrphanRowRef) -> Result<RowRef, StorageError> {
- unimplemented!();
- }
+pub struct MemStore {
+ row: ArcRow,
+ blob: ArcBlob,
}
-impl IRowRef for MemRef {
- fn to_orphan(&self) -> OrphanRowRef {
- unimplemented!()
+impl MemStore {
+ fn inner_fetch(&self, row_ref: &RowRef) -> Result<Vec<u8>, StorageError> {
+ Ok(self.row
+ .read()
+ .or(Err(StorageError::Internal))?
+ .get(&row_ref.uid.shard)
+ .ok_or(StorageError::NotFound)?
+ .get(&row_ref.uid.sort)
+ .ok_or(StorageError::NotFound)?
+ .clone())
}
+}
- fn key(&self) -> (&str, &str) {
+#[async_trait]
+impl IStore for MemStore {
+ async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ match select {
+ Selector::Range { shard, sort_begin, sort_end } => {
+ Ok(self.row
+ .read()
+ .or(Err(StorageError::Internal))?
+ .get(*shard)
+ .ok_or(StorageError::NotFound)?
+ .range((Included(sort_begin.to_string()), Included(sort_end.to_string())))
+ .map(|(k, v)| RowVal {
+ row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
+ value: vec![Alternative::Value(v.clone())],
+ })
+ .collect::<Vec<_>>())
+ },
+ Selector::List(rlist) => {
+ let mut acc = vec![];
+ for row_ref in rlist {
+ let bytes = self.inner_fetch(row_ref)?;
+ let row_val = RowVal {
+ row_ref: row_ref.clone(),
+ value: vec![Alternative::Value(bytes)]
+ };
+ acc.push(row_val);
+ }
+ Ok(acc)
+ },
+ Selector::Prefix { shard, sort_prefix } => {
+ let mut sort_end = sort_prefix.to_string();
+ let last_bound = match sort_end.pop() {
+ None => Unbounded,
+ Some(ch) => {
+ let nc = char::from_u32(ch as u32 + 1).unwrap();
+ sort_end.push(nc);
+ Excluded(sort_end)
+ }
+ };
+
+ Ok(self.row
+ .read()
+ .or(Err(StorageError::Internal))?
+ .get(*shard)
+ .ok_or(StorageError::NotFound)?
+ .range((Included(sort_prefix.to_string()), last_bound))
+ .map(|(k, v)| RowVal {
+ row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
+ value: vec![Alternative::Value(v.clone())],
+ })
+ .collect::<Vec<_>>())
+ },
+ Selector::Single(row_ref) => {
+ let bytes = self.inner_fetch(row_ref)?;
+ Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}])
+ }
+ }
+ }
+
+ async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!();
}
- /*fn clone_boxed(&self) -> RowRef {
+ async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
unimplemented!();
- }*/
- fn set_value(&self, content: &[u8]) -> RowValue {
- unimplemented!();
- }
- fn fetch(&self) -> AsyncResult<RowValue> {
- unimplemented!();
}
- fn rm(&self) -> AsyncResult<()> {
+ async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
unimplemented!();
}
- fn poll(&self) -> AsyncResult<RowValue> {
- async {
- let rv: RowValue = Box::new(MemValue{});
- Ok(rv)
- }.boxed()
- }
-}
-impl std::fmt::Debug for MemRef {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
- }
-}
-impl IRowValue for MemValue {
- fn to_ref(&self) -> RowRef {
- unimplemented!();
}
- fn content(&self) -> ConcurrentValues {
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
+
}
- fn push(&self) -> AsyncResult<()> {
+ async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
unimplemented!();
}
-}
-
-impl std::fmt::Debug for MemValue {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
unimplemented!();
}
}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index c002278..cb66d58 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -8,39 +8,97 @@
* into the object system so it is not exposed.
*/
-use std::hash::{Hash, Hasher};
-use futures::future::BoxFuture;
-
pub mod in_memory;
pub mod garage;
+use std::hash::{Hash, Hasher};
+use std::collections::HashMap;
+use futures::future::BoxFuture;
+use async_trait::async_trait;
+
+#[derive(Debug, Clone)]
pub enum Alternative {
Tombstone,
Value(Vec<u8>),
}
type ConcurrentValues = Vec<Alternative>;
+#[derive(Debug)]
+pub enum StorageError {
+ NotFound,
+ Internal,
+}
+
+#[derive(Debug, Clone)]
+pub struct RowUid {
+ shard: String,
+ sort: String,
+}
+
+#[derive(Debug, Clone)]
+pub struct RowRef {
+ uid: RowUid,
+ causality: Option<String>,
+}
+
+#[derive(Debug, Clone)]
+pub struct RowVal {
+ row_ref: RowRef,
+ value: ConcurrentValues,
+}
+
+#[derive(Debug, Clone)]
+pub struct BlobRef(String);
+
+#[derive(Debug, Clone)]
+pub struct BlobVal {
+ blob_ref: BlobRef,
+ meta: HashMap<String, String>,
+ value: Vec<u8>,
+}
+
+pub enum Selector<'a> {
+ Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
+ List (Vec<RowRef>), // list of (shard_key, sort_key)
+ Prefix { shard: &'a str, sort_prefix: &'a str },
+ Single(RowRef),
+}
+
+#[async_trait]
+pub trait IStore {
+ async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
+ async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
+ async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
+ async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError>;
+
+ async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError>;
+ async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
+ async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
+}
+
+pub trait IBuilder {
+ fn build(&self) -> Box<dyn IStore>;
+}
+
+
+
+
+
+
+/*
#[derive(Clone, Debug, PartialEq)]
pub enum OrphanRowRef {
Garage(garage::GrgOrphanRowRef),
Memory(in_memory::MemOrphanRowRef),
}
-pub enum Selector<'a> {
- Range { shard_key: &'a str, begin: &'a str, end: &'a str },
- List (Vec<(&'a str, &'a str)>), // list of (shard_key, sort_key)
- Prefix { shard_key: &'a str, prefix: &'a str },
-}
-#[derive(Debug)]
-pub enum StorageError {
- NotFound,
- Internal,
- IncompatibleOrphan,
-}
+
+
impl std::fmt::Display for StorageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str("Storage Error: ");
+ f.write_str("Storage Error: ")?;
match self {
Self::NotFound => f.write_str("Item not found"),
Self::Internal => f.write_str("An internal error occured"),
@@ -55,6 +113,7 @@ pub type AsyncResult<'a, T> = BoxFuture<'a, Result<T, StorageError>>;
// ----- Builders
pub trait IBuilders {
+ fn box_clone(&self) -> Builders;
fn row_store(&self) -> Result<RowStore, StorageError>;
fn blob_store(&self) -> Result<BlobStore, StorageError>;
fn url(&self) -> &str;
@@ -62,8 +121,7 @@ pub trait IBuilders {
pub type Builders = Box<dyn IBuilders + Send + Sync>;
impl Clone for Builders {
fn clone(&self) -> Self {
- // @FIXME write a real implementation with a box_clone function
- Box::new(in_memory::FullMem{})
+ self.box_clone()
}
}
impl std::fmt::Debug for Builders {
@@ -102,7 +160,7 @@ pub trait IRowRef: std::fmt::Debug
fn rm(&self) -> AsyncResult<()>;
fn poll(&self) -> AsyncResult<RowValue>;
}
-pub type RowRef = Box<dyn IRowRef + Send + Sync>;
+pub type RowRef<'a> = Box<dyn IRowRef + Send + Sync + 'a>;
pub trait IRowValue: std::fmt::Debug
{
@@ -138,3 +196,4 @@ pub trait IBlobValue {
fn push(&self) -> AsyncResult<()>;
}
pub type BlobValue = Box<dyn IBlobValue + Send + Sync>;
+*/