From 684f4de225c44464abcb6a9cb2ef6dcae90537a8 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 16 Dec 2023 11:13:32 +0100 Subject: new new new storage interface --- src/login/ldap_provider.rs | 4 +- src/login/static_provider.rs | 27 ++++--- src/storage/garage.rs | 118 ++++------------------------- src/storage/in_memory.rs | 177 ++++++++++++++++++++++++++----------------- src/storage/mod.rs | 95 ++++++++++++++++++----- 5 files changed, 219 insertions(+), 202 deletions(-) (limited to 'src') diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index 6e94061..4e3af3c 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -87,7 +87,9 @@ impl LdapLoginProvider { fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result { let storage: Builders = match &self.storage_specific { - StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem {}), + StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem::new( + &get_attr(user, &self.username_attr)? + )), StorageSpecific::Garage { from_config, bucket_source } => { let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?; diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index 4a8d484..788a4c5 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -11,10 +11,15 @@ use crate::config::*; use crate::login::*; use crate::storage; +pub struct ContextualUserEntry { + pub username: String, + pub config: UserEntry, +} + #[derive(Default)] pub struct UserDatabase { - users: HashMap>, - users_by_email: HashMap>, + users: HashMap>, + users_by_email: HashMap>, } pub struct StaticLoginProvider { @@ -35,12 +40,12 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender) let users = ulist .into_iter() - .map(|(k, v)| (k, Arc::new(v))) + .map(|(username, config)| (username.clone() , Arc::new(ContextualUserEntry { username, config }))) .collect::>(); let mut users_by_email = HashMap::new(); for (_, u) in users.iter() { - for m in u.email_addresses.iter() { + for m in u.config.email_addresses.iter() { if users_by_email.contains_key(m) { tracing::warn!("Several users have the same email address: {}", m); continue @@ -78,13 +83,13 @@ impl LoginProvider for StaticLoginProvider { }; tracing::debug!(user=%username, "verify password"); - if !verify_password(password, &user.password)? { + if !verify_password(password, &user.config.password)? { bail!("Wrong password"); } tracing::debug!(user=%username, "fetch keys"); - let storage: storage::Builders = match &user.storage { - StaticStorage::InMemory => Box::new(storage::in_memory::FullMem {}), + let storage: storage::Builders = match &user.config.storage { + StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(username)), StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), @@ -95,7 +100,7 @@ impl LoginProvider for StaticLoginProvider { }), }; - let cr = CryptoRoot(user.crypto_root.clone()); + let cr = CryptoRoot(user.config.crypto_root.clone()); let keys = cr.crypto_keys(password)?; tracing::debug!(user=%username, "logged"); @@ -109,8 +114,8 @@ impl LoginProvider for StaticLoginProvider { Some(u) => u, }; - let storage: storage::Builders = match &user.storage { - StaticStorage::InMemory => Box::new(storage::in_memory::FullMem {}), + let storage: storage::Builders = match &user.config.storage { + StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(&user.username)), StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), @@ -121,7 +126,7 @@ impl LoginProvider for StaticLoginProvider { }), }; - let cr = CryptoRoot(user.crypto_root.clone()); + let cr = CryptoRoot(user.config.crypto_root.clone()); let public_key = cr.public_key()?; Ok(PublicCredentials { diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 052e812..8276f70 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,7 +1,7 @@ use crate::storage::*; #[derive(Clone, Debug, Hash)] -pub struct GrgCreds { +pub struct GarageBuilder { pub region: String, pub s3_endpoint: String, pub k2v_endpoint: String, @@ -9,133 +9,47 @@ pub struct GrgCreds { pub aws_secret_access_key: String, pub bucket: String, } -pub struct GrgStore {} -pub struct GrgRef {} -pub struct GrgValue {} -#[derive(Clone, Debug, PartialEq)] -pub struct GrgOrphanRowRef {} - -impl IBuilders for GrgCreds { - fn row_store(&self) -> Result { - unimplemented!(); - } - - fn blob_store(&self) -> Result { +impl IBuilder for GarageBuilder { + fn build(&self) -> Box { unimplemented!(); } +} - fn url(&self) -> &str { - return "grg://unimplemented;" - } +pub struct GarageStore { + dummy: String, } -impl IRowStore for GrgStore { - fn row(&self, partition: &str, sort: &str) -> RowRef { +#[async_trait] +impl IStore for GarageStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { unimplemented!(); } - - fn select(&self, selector: Selector) -> AsyncResult> { + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { unimplemented!(); } - fn rm(&self, selector: Selector) -> AsyncResult<()> { + async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { unimplemented!(); - } - fn from_orphan(&self, orphan: OrphanRowRef) -> Result { - unimplemented!(); } -} - -impl IRowRef for GrgRef { - /*fn clone_boxed(&self) -> RowRef { + async fn row_poll(&self, value: RowRef) -> Result { unimplemented!(); - }*/ - fn to_orphan(&self) -> OrphanRowRef { - unimplemented!() } - fn key(&self) -> (&str, &str) { + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { unimplemented!(); - } - fn set_value(&self, content: &[u8]) -> RowValue { - unimplemented!(); - } - fn fetch(&self) -> AsyncResult { - unimplemented!(); } - fn rm(&self) -> AsyncResult<()> { + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { unimplemented!(); - } - fn poll(&self) -> AsyncResult { - unimplemented!(); - } -} -impl std::fmt::Debug for GrgRef { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - unimplemented!(); } -} - -impl IRowValue for GrgValue { - fn to_ref(&self) -> RowRef { + async fn blob_list(&self, prefix: &str) -> Result, StorageError> { unimplemented!(); } - fn content(&self) -> ConcurrentValues { + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { unimplemented!(); } - fn push(&self) -> AsyncResult<()> { - unimplemented!(); - } -} - -impl std::fmt::Debug for GrgValue { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - unimplemented!(); - } -} - - -/* -/// A custom S3 region, composed of a region name and endpoint. -/// We use this instead of rusoto_signature::Region so that we can -/// derive Hash and Eq - - -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct Region { - pub name: String, - pub endpoint: String, -} - -impl Region { - pub fn as_rusoto_region(&self) -> rusoto_signature::Region { - rusoto_signature::Region::Custom { - name: self.name.clone(), - endpoint: self.endpoint.clone(), - } - } } -*/ - -/* -pub struct Garage { - pub s3_region: Region, - pub k2v_region: Region, - - pub aws_access_key_id: String, - pub aws_secret_access_key: String, - pub bucket: String, -} - -impl StoreBuilder<> for Garage { - fn row_store(&self) -> -} - -pub struct K2V {} -impl RowStore for K2V { -}*/ diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 5ba9461..09c6763 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -1,97 +1,134 @@ -use futures::FutureExt; use crate::storage::*; - -#[derive(Clone, Debug, Hash)] -pub struct FullMem {} -pub struct MemStore {} -pub struct MemRef {} -pub struct MemValue {} - -#[derive(Clone, Debug, PartialEq)] -pub struct MemOrphanRowRef {} - -impl IBuilders for FullMem { - fn row_store(&self) -> Result { - unimplemented!(); - } - - fn blob_store(&self) -> Result { - unimplemented!(); - } - - fn url(&self) -> &str { - return "mem://unimplemented;" - } +use std::collections::{HashMap, BTreeMap}; +use std::ops::Bound::{Included, Unbounded, Excluded}; +use std::sync::{Arc, RwLock}; + +/// This implementation is very inneficient, and not completely correct +/// Indeed, when the connector is dropped, the memory is freed. +/// It means that when a user disconnects, its data are lost. +/// It's intended only for basic debugging, do not use it for advanced tests... + +pub type ArcRow = Arc>>>>; +pub type ArcBlob = Arc>>>; + +#[derive(Clone, Debug)] +pub struct MemBuilder { + user: String, + url: String, + row: ArcRow, + blob: ArcBlob, } -impl IRowStore for MemStore { - fn row(&self, partition: &str, sort: &str) -> RowRef { - unimplemented!(); - } - - fn select(&self, selector: Selector) -> AsyncResult> { - unimplemented!() - } - - fn rm(&self, selector: Selector) -> AsyncResult<()> { - unimplemented!(); - } +impl IBuilder for MemBuilder { + fn build(&self) -> Box { + Box::new(MemStore { + row: self.row.clone(), + blob: self.blob.clone(), + }) + } +} - fn from_orphan(&self, orphan: OrphanRowRef) -> Result { - unimplemented!(); - } +pub struct MemStore { + row: ArcRow, + blob: ArcBlob, } -impl IRowRef for MemRef { - fn to_orphan(&self) -> OrphanRowRef { - unimplemented!() +impl MemStore { + fn inner_fetch(&self, row_ref: &RowRef) -> Result, StorageError> { + Ok(self.row + .read() + .or(Err(StorageError::Internal))? + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)? + .clone()) } +} - fn key(&self) -> (&str, &str) { +#[async_trait] +impl IStore for MemStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { + match select { + Selector::Range { shard, sort_begin, sort_end } => { + Ok(self.row + .read() + .or(Err(StorageError::Internal))? + .get(*shard) + .ok_or(StorageError::NotFound)? + .range((Included(sort_begin.to_string()), Included(sort_end.to_string()))) + .map(|(k, v)| RowVal { + row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, + value: vec![Alternative::Value(v.clone())], + }) + .collect::>()) + }, + Selector::List(rlist) => { + let mut acc = vec![]; + for row_ref in rlist { + let bytes = self.inner_fetch(row_ref)?; + let row_val = RowVal { + row_ref: row_ref.clone(), + value: vec![Alternative::Value(bytes)] + }; + acc.push(row_val); + } + Ok(acc) + }, + Selector::Prefix { shard, sort_prefix } => { + let mut sort_end = sort_prefix.to_string(); + let last_bound = match sort_end.pop() { + None => Unbounded, + Some(ch) => { + let nc = char::from_u32(ch as u32 + 1).unwrap(); + sort_end.push(nc); + Excluded(sort_end) + } + }; + + Ok(self.row + .read() + .or(Err(StorageError::Internal))? + .get(*shard) + .ok_or(StorageError::NotFound)? + .range((Included(sort_prefix.to_string()), last_bound)) + .map(|(k, v)| RowVal { + row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, + value: vec![Alternative::Value(v.clone())], + }) + .collect::>()) + }, + Selector::Single(row_ref) => { + let bytes = self.inner_fetch(row_ref)?; + Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}]) + } + } + } + + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { unimplemented!(); } - /*fn clone_boxed(&self) -> RowRef { + async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { unimplemented!(); - }*/ - fn set_value(&self, content: &[u8]) -> RowValue { - unimplemented!(); - } - fn fetch(&self) -> AsyncResult { - unimplemented!(); } - fn rm(&self) -> AsyncResult<()> { + async fn row_poll(&self, value: RowRef) -> Result { unimplemented!(); } - fn poll(&self) -> AsyncResult { - async { - let rv: RowValue = Box::new(MemValue{}); - Ok(rv) - }.boxed() - } -} -impl std::fmt::Debug for MemRef { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { unimplemented!(); - } -} -impl IRowValue for MemValue { - fn to_ref(&self) -> RowRef { - unimplemented!(); } - fn content(&self) -> ConcurrentValues { + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { unimplemented!(); + } - fn push(&self) -> AsyncResult<()> { + async fn blob_list(&self, prefix: &str) -> Result, StorageError> { unimplemented!(); } -} - -impl std::fmt::Debug for MemValue { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { unimplemented!(); } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c002278..cb66d58 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,39 +8,97 @@ * into the object system so it is not exposed. */ -use std::hash::{Hash, Hasher}; -use futures::future::BoxFuture; - pub mod in_memory; pub mod garage; +use std::hash::{Hash, Hasher}; +use std::collections::HashMap; +use futures::future::BoxFuture; +use async_trait::async_trait; + +#[derive(Debug, Clone)] pub enum Alternative { Tombstone, Value(Vec), } type ConcurrentValues = Vec; +#[derive(Debug)] +pub enum StorageError { + NotFound, + Internal, +} + +#[derive(Debug, Clone)] +pub struct RowUid { + shard: String, + sort: String, +} + +#[derive(Debug, Clone)] +pub struct RowRef { + uid: RowUid, + causality: Option, +} + +#[derive(Debug, Clone)] +pub struct RowVal { + row_ref: RowRef, + value: ConcurrentValues, +} + +#[derive(Debug, Clone)] +pub struct BlobRef(String); + +#[derive(Debug, Clone)] +pub struct BlobVal { + blob_ref: BlobRef, + meta: HashMap, + value: Vec, +} + +pub enum Selector<'a> { + Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, + List (Vec), // list of (shard_key, sort_key) + Prefix { shard: &'a str, sort_prefix: &'a str }, + Single(RowRef), +} + +#[async_trait] +pub trait IStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError>; + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; + async fn row_insert(&self, values: Vec) -> Result<(), StorageError>; + async fn row_poll(&self, value: RowRef) -> Result; + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result; + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result; + async fn blob_list(&self, prefix: &str) -> Result, StorageError>; + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; +} + +pub trait IBuilder { + fn build(&self) -> Box; +} + + + + + + +/* #[derive(Clone, Debug, PartialEq)] pub enum OrphanRowRef { Garage(garage::GrgOrphanRowRef), Memory(in_memory::MemOrphanRowRef), } -pub enum Selector<'a> { - Range { shard_key: &'a str, begin: &'a str, end: &'a str }, - List (Vec<(&'a str, &'a str)>), // list of (shard_key, sort_key) - Prefix { shard_key: &'a str, prefix: &'a str }, -} -#[derive(Debug)] -pub enum StorageError { - NotFound, - Internal, - IncompatibleOrphan, -} + + impl std::fmt::Display for StorageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("Storage Error: "); + f.write_str("Storage Error: ")?; match self { Self::NotFound => f.write_str("Item not found"), Self::Internal => f.write_str("An internal error occured"), @@ -55,6 +113,7 @@ pub type AsyncResult<'a, T> = BoxFuture<'a, Result>; // ----- Builders pub trait IBuilders { + fn box_clone(&self) -> Builders; fn row_store(&self) -> Result; fn blob_store(&self) -> Result; fn url(&self) -> &str; @@ -62,8 +121,7 @@ pub trait IBuilders { pub type Builders = Box; impl Clone for Builders { fn clone(&self) -> Self { - // @FIXME write a real implementation with a box_clone function - Box::new(in_memory::FullMem{}) + self.box_clone() } } impl std::fmt::Debug for Builders { @@ -102,7 +160,7 @@ pub trait IRowRef: std::fmt::Debug fn rm(&self) -> AsyncResult<()>; fn poll(&self) -> AsyncResult; } -pub type RowRef = Box; +pub type RowRef<'a> = Box; pub trait IRowValue: std::fmt::Debug { @@ -138,3 +196,4 @@ pub trait IBlobValue { fn push(&self) -> AsyncResult<()>; } pub type BlobValue = Box; +*/ -- cgit v1.2.3