aboutsummaryrefslogtreecommitdiff
path: root/aero-user
diff options
context:
space:
mode:
Diffstat (limited to 'aero-user')
-rw-r--r--aero-user/Cargo.toml30
-rw-r--r--aero-user/src/config.rs198
-rw-r--r--aero-user/src/cryptoblob.rs67
-rw-r--r--aero-user/src/lib.rs9
-rw-r--r--aero-user/src/login/demo_provider.rs51
-rw-r--r--aero-user/src/login/ldap_provider.rs264
-rw-r--r--aero-user/src/login/mod.rs245
-rw-r--r--aero-user/src/login/static_provider.rs188
-rw-r--r--aero-user/src/storage/garage.rs542
-rw-r--r--aero-user/src/storage/in_memory.rs344
-rw-r--r--aero-user/src/storage/mod.rs180
11 files changed, 2118 insertions, 0 deletions
diff --git a/aero-user/Cargo.toml b/aero-user/Cargo.toml
new file mode 100644
index 0000000..fc851e2
--- /dev/null
+++ b/aero-user/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "aero-user"
+version = "0.3.0"
+authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
+edition = "2021"
+license = "EUPL-1.2"
+description = "Represent an encrypted user profile"
+
+[dependencies]
+anyhow.workspace = true
+serde.workspace = true
+zstd.workspace = true
+sodiumoxide.workspace = true
+log.workspace = true
+async-trait.workspace = true
+ldap3.workspace = true
+base64.workspace = true
+rand.workspace = true
+tokio.workspace = true
+aws-config.workspace = true
+aws-sdk-s3.workspace = true
+aws-smithy-runtime.workspace = true
+aws-smithy-runtime-api.workspace = true
+hyper-rustls.workspace = true
+hyper-util.workspace = true
+k2v-client.workspace = true
+rmp-serde.workspace = true
+toml.workspace = true
+tracing.workspace = true
+argon2.workspace = true
diff --git a/aero-user/src/config.rs b/aero-user/src/config.rs
new file mode 100644
index 0000000..cea4520
--- /dev/null
+++ b/aero-user/src/config.rs
@@ -0,0 +1,198 @@
+use std::collections::HashMap;
+use std::io::{Read, Write};
+use std::net::SocketAddr;
+use std::path::PathBuf;
+
+use anyhow::Result;
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct CompanionConfig {
+ pub pid: Option<PathBuf>,
+ pub imap: ImapUnsecureConfig,
+ // @FIXME Add DAV
+ #[serde(flatten)]
+ pub users: LoginStaticConfig,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct ProviderConfig {
+ pub pid: Option<PathBuf>,
+ pub imap: Option<ImapConfig>,
+ pub imap_unsecure: Option<ImapUnsecureConfig>,
+ pub lmtp: Option<LmtpConfig>,
+ pub auth: Option<AuthConfig>,
+ pub dav: Option<DavConfig>,
+ pub dav_unsecure: Option<DavUnsecureConfig>,
+ pub users: UserManagement,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(tag = "user_driver")]
+pub enum UserManagement {
+ Demo,
+ Static(LoginStaticConfig),
+ Ldap(LoginLdapConfig),
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct AuthConfig {
+ pub bind_addr: SocketAddr,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct LmtpConfig {
+ pub bind_addr: SocketAddr,
+ pub hostname: String,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct ImapConfig {
+ pub bind_addr: SocketAddr,
+ pub certs: PathBuf,
+ pub key: PathBuf,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct DavUnsecureConfig {
+ pub bind_addr: SocketAddr,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct DavConfig {
+ pub bind_addr: SocketAddr,
+ pub certs: PathBuf,
+ pub key: PathBuf,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct ImapUnsecureConfig {
+ pub bind_addr: SocketAddr,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct LoginStaticConfig {
+ pub user_list: PathBuf,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(tag = "storage_driver")]
+pub enum LdapStorage {
+ Garage(LdapGarageConfig),
+ InMemory,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct LdapGarageConfig {
+ pub s3_endpoint: String,
+ pub k2v_endpoint: String,
+ pub aws_region: String,
+
+ pub aws_access_key_id_attr: String,
+ pub aws_secret_access_key_attr: String,
+ pub bucket_attr: Option<String>,
+ pub default_bucket: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct LoginLdapConfig {
+ // LDAP connection info
+ pub ldap_server: String,
+ #[serde(default)]
+ pub pre_bind_on_login: bool,
+ pub bind_dn: Option<String>,
+ pub bind_password: Option<String>,
+ pub search_base: String,
+
+ // Schema-like info required for Aerogramme's logic
+ pub username_attr: String,
+ #[serde(default = "default_mail_attr")]
+ pub mail_attr: String,
+
+ // The field that will contain the crypto root thingy
+ pub crypto_root_attr: String,
+
+ // Storage related thing
+ #[serde(flatten)]
+ pub storage: LdapStorage,
+}
+
+// ----
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(tag = "storage_driver")]
+pub enum StaticStorage {
+ Garage(StaticGarageConfig),
+ InMemory,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct StaticGarageConfig {
+ pub s3_endpoint: String,
+ pub k2v_endpoint: String,
+ pub aws_region: String,
+
+ pub aws_access_key_id: String,
+ pub aws_secret_access_key: String,
+ pub bucket: String,
+}
+
+pub type UserList = HashMap<String, UserEntry>;
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct UserEntry {
+ #[serde(default)]
+ pub email_addresses: Vec<String>,
+ pub password: String,
+ pub crypto_root: String,
+
+ #[serde(flatten)]
+ pub storage: StaticStorage,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct SetupEntry {
+ #[serde(default)]
+ pub email_addresses: Vec<String>,
+
+ #[serde(default)]
+ pub clear_password: Option<String>,
+
+ #[serde(flatten)]
+ pub storage: StaticStorage,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(tag = "role")]
+pub enum AnyConfig {
+ Companion(CompanionConfig),
+ Provider(ProviderConfig),
+}
+
+// ---
+pub fn read_config<T: serde::de::DeserializeOwned>(config_file: PathBuf) -> Result<T> {
+ let mut file = std::fs::OpenOptions::new()
+ .read(true)
+ .open(config_file.as_path())?;
+
+ let mut config = String::new();
+ file.read_to_string(&mut config)?;
+
+ Ok(toml::from_str(&config)?)
+}
+
+pub fn write_config<T: Serialize>(config_file: PathBuf, config: &T) -> Result<()> {
+ let mut file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(config_file.as_path())?;
+
+ file.write_all(toml::to_string(config)?.as_bytes())?;
+
+ Ok(())
+}
+
+fn default_mail_attr() -> String {
+ "mail".into()
+}
diff --git a/aero-user/src/cryptoblob.rs b/aero-user/src/cryptoblob.rs
new file mode 100644
index 0000000..327a642
--- /dev/null
+++ b/aero-user/src/cryptoblob.rs
@@ -0,0 +1,67 @@
+//! Helper functions for secret-key encrypted blobs
+//! that contain Zstd encrypted data
+
+use anyhow::{anyhow, Result};
+use serde::{Deserialize, Serialize};
+use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode};
+
+//use sodiumoxide::crypto::box_ as publicbox;
+use sodiumoxide::crypto::secretbox::xsalsa20poly1305 as secretbox;
+
+pub use sodiumoxide::crypto::box_::{
+ gen_keypair, PublicKey, SecretKey, PUBLICKEYBYTES, SECRETKEYBYTES,
+};
+pub use sodiumoxide::crypto::secretbox::xsalsa20poly1305::{gen_key, Key, KEYBYTES};
+
+pub fn open(cryptoblob: &[u8], key: &Key) -> Result<Vec<u8>> {
+ use secretbox::{Nonce, NONCEBYTES};
+
+ if cryptoblob.len() < NONCEBYTES {
+ return Err(anyhow!("Cyphertext too short"));
+ }
+
+ // Decrypt -> get Zstd data
+ let nonce = Nonce::from_slice(&cryptoblob[..NONCEBYTES]).unwrap();
+ let zstdblob = secretbox::open(&cryptoblob[NONCEBYTES..], &nonce, key)
+ .map_err(|_| anyhow!("Could not decrypt blob"))?;
+
+ // Decompress zstd data
+ let mut reader = &zstdblob[..];
+ let data = zstd_decode(&mut reader)?;
+
+ Ok(data)
+}
+
+pub fn seal(plainblob: &[u8], key: &Key) -> Result<Vec<u8>> {
+ use secretbox::{gen_nonce, NONCEBYTES};
+
+ // Compress data using zstd
+ let mut reader = plainblob;
+ let zstdblob = zstd_encode(&mut reader, 0)?;
+
+ // Encrypt
+ let nonce = gen_nonce();
+ let cryptoblob = secretbox::seal(&zstdblob, &nonce, key);
+
+ let mut res = Vec::with_capacity(NONCEBYTES + cryptoblob.len());
+ res.extend(nonce.as_ref());
+ res.extend(cryptoblob);
+
+ Ok(res)
+}
+
+pub fn open_deserialize<T: for<'de> Deserialize<'de>>(cryptoblob: &[u8], key: &Key) -> Result<T> {
+ let blob = open(cryptoblob, key)?;
+
+ Ok(rmp_serde::decode::from_read_ref::<_, T>(&blob)?)
+}
+
+pub fn seal_serialize<T: Serialize>(obj: T, key: &Key) -> Result<Vec<u8>> {
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ obj.serialize(&mut se)?;
+
+ seal(&wr, key)
+}
diff --git a/aero-user/src/lib.rs b/aero-user/src/lib.rs
new file mode 100644
index 0000000..9b08fe2
--- /dev/null
+++ b/aero-user/src/lib.rs
@@ -0,0 +1,9 @@
+pub mod config;
+pub mod cryptoblob;
+pub mod login;
+pub mod storage;
+
+// A user is composed of 3 things:
+// - An identity (login)
+// - A storage profile (storage)
+// - Some cryptography data (cryptoblob)
diff --git a/aero-user/src/login/demo_provider.rs b/aero-user/src/login/demo_provider.rs
new file mode 100644
index 0000000..11c7d54
--- /dev/null
+++ b/aero-user/src/login/demo_provider.rs
@@ -0,0 +1,51 @@
+use crate::login::*;
+use crate::storage::*;
+
+pub struct DemoLoginProvider {
+ keys: CryptoKeys,
+ in_memory_store: in_memory::MemDb,
+}
+
+impl DemoLoginProvider {
+ pub fn new() -> Self {
+ Self {
+ keys: CryptoKeys::init(),
+ in_memory_store: in_memory::MemDb::new(),
+ }
+ }
+}
+
+#[async_trait]
+impl LoginProvider for DemoLoginProvider {
+ async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
+ tracing::debug!(user=%username, "login");
+
+ if username != "alice" {
+ bail!("user does not exist");
+ }
+
+ if password != "hunter2" {
+ bail!("wrong password");
+ }
+
+ let storage = self.in_memory_store.builder("alice").await;
+ let keys = self.keys.clone();
+
+ Ok(Credentials { storage, keys })
+ }
+
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials> {
+ tracing::debug!(user=%email, "public_login");
+ if email != "alice@example.tld" {
+ bail!("invalid email address");
+ }
+
+ let storage = self.in_memory_store.builder("alice").await;
+ let public_key = self.keys.public.clone();
+
+ Ok(PublicCredentials {
+ storage,
+ public_key,
+ })
+ }
+}
diff --git a/aero-user/src/login/ldap_provider.rs b/aero-user/src/login/ldap_provider.rs
new file mode 100644
index 0000000..22b301e
--- /dev/null
+++ b/aero-user/src/login/ldap_provider.rs
@@ -0,0 +1,264 @@
+use async_trait::async_trait;
+use ldap3::{LdapConnAsync, Scope, SearchEntry};
+use log::debug;
+
+use super::*;
+use crate::config::*;
+use crate::storage;
+
+pub struct LdapLoginProvider {
+ ldap_server: String,
+
+ pre_bind_on_login: bool,
+ bind_dn_and_pw: Option<(String, String)>,
+
+ search_base: String,
+ attrs_to_retrieve: Vec<String>,
+ username_attr: String,
+ mail_attr: String,
+ crypto_root_attr: String,
+
+ storage_specific: StorageSpecific,
+ in_memory_store: storage::in_memory::MemDb,
+ garage_store: storage::garage::GarageRoot,
+}
+
+enum BucketSource {
+ Constant(String),
+ Attr(String),
+}
+
+enum StorageSpecific {
+ InMemory,
+ Garage {
+ from_config: LdapGarageConfig,
+ bucket_source: BucketSource,
+ },
+}
+
+impl LdapLoginProvider {
+ pub fn new(config: LoginLdapConfig) -> Result<Self> {
+ let bind_dn_and_pw = match (config.bind_dn, config.bind_password) {
+ (Some(dn), Some(pw)) => Some((dn, pw)),
+ (None, None) => None,
+ _ => bail!(
+ "If either of `bind_dn` or `bind_password` is set, the other must be set as well."
+ ),
+ };
+
+ if config.pre_bind_on_login && bind_dn_and_pw.is_none() {
+ bail!("Cannot use `pre_bind_on_login` without setting `bind_dn` and `bind_password`");
+ }
+
+ let mut attrs_to_retrieve = vec![
+ config.username_attr.clone(),
+ config.mail_attr.clone(),
+ config.crypto_root_attr.clone(),
+ ];
+
+ // storage specific
+ let specific = match config.storage {
+ LdapStorage::InMemory => StorageSpecific::InMemory,
+ LdapStorage::Garage(grgconf) => {
+ attrs_to_retrieve.push(grgconf.aws_access_key_id_attr.clone());
+ attrs_to_retrieve.push(grgconf.aws_secret_access_key_attr.clone());
+
+ let bucket_source =
+ match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) {
+ (Some(b), None) => BucketSource::Constant(b),
+ (None, Some(a)) => BucketSource::Attr(a),
+ _ => bail!("Must set `bucket` or `bucket_attr`, but not both"),
+ };
+
+ if let BucketSource::Attr(a) = &bucket_source {
+ attrs_to_retrieve.push(a.clone());
+ }
+
+ StorageSpecific::Garage {
+ from_config: grgconf,
+ bucket_source,
+ }
+ }
+ };
+
+ Ok(Self {
+ ldap_server: config.ldap_server,
+ pre_bind_on_login: config.pre_bind_on_login,
+ bind_dn_and_pw,
+ search_base: config.search_base,
+ attrs_to_retrieve,
+ username_attr: config.username_attr,
+ mail_attr: config.mail_attr,
+ crypto_root_attr: config.crypto_root_attr,
+ storage_specific: specific,
+ //@FIXME should be created outside of the login provider
+ //Login provider should return only a cryptoroot + a storage URI
+ //storage URI that should be resolved outside...
+ in_memory_store: storage::in_memory::MemDb::new(),
+ garage_store: storage::garage::GarageRoot::new()?,
+ })
+ }
+
+ async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> {
+ let storage: Builder = match &self.storage_specific {
+ StorageSpecific::InMemory => {
+ self.in_memory_store
+ .builder(&get_attr(user, &self.username_attr)?)
+ .await
+ }
+ StorageSpecific::Garage {
+ from_config,
+ bucket_source,
+ } => {
+ let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
+ let aws_secret_access_key =
+ get_attr(user, &from_config.aws_secret_access_key_attr)?;
+ let bucket = match bucket_source {
+ BucketSource::Constant(b) => b.clone(),
+ BucketSource::Attr(a) => get_attr(user, &a)?,
+ };
+
+ self.garage_store.user(storage::garage::GarageConf {
+ region: from_config.aws_region.clone(),
+ s3_endpoint: from_config.s3_endpoint.clone(),
+ k2v_endpoint: from_config.k2v_endpoint.clone(),
+ aws_access_key_id,
+ aws_secret_access_key,
+ bucket,
+ })?
+ }
+ };
+
+ Ok(storage)
+ }
+}
+
+#[async_trait]
+impl LoginProvider for LdapLoginProvider {
+ async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
+ check_identifier(username)?;
+
+ let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?;
+ ldap3::drive!(conn);
+
+ if self.pre_bind_on_login {
+ let (dn, pw) = self.bind_dn_and_pw.as_ref().unwrap();
+ ldap.simple_bind(dn, pw).await?.success()?;
+ }
+
+ let (matches, _res) = ldap
+ .search(
+ &self.search_base,
+ Scope::Subtree,
+ &format!(
+ "(&(objectClass=inetOrgPerson)({}={}))",
+ self.username_attr, username
+ ),
+ &self.attrs_to_retrieve,
+ )
+ .await?
+ .success()?;
+
+ if matches.is_empty() {
+ bail!("Invalid username");
+ }
+ if matches.len() > 1 {
+ bail!("Invalid username (multiple matching accounts)");
+ }
+ let user = SearchEntry::construct(matches.into_iter().next().unwrap());
+ debug!(
+ "Found matching LDAP user for username {}: {}",
+ username, user.dn
+ );
+
+ // Try to login against LDAP server with provided password
+ // to check user's password
+ ldap.simple_bind(&user.dn, password)
+ .await?
+ .success()
+ .context("Invalid password")?;
+ debug!("Ldap login with user name {} successfull", username);
+
+ // cryptography
+ let crstr = get_attr(&user, &self.crypto_root_attr)?;
+ let cr = CryptoRoot(crstr);
+ let keys = cr.crypto_keys(password)?;
+
+ // storage
+ let storage = self.storage_creds_from_ldap_user(&user).await?;
+
+ drop(ldap);
+
+ Ok(Credentials { storage, keys })
+ }
+
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials> {
+ check_identifier(email)?;
+
+ let (dn, pw) = match self.bind_dn_and_pw.as_ref() {
+ Some(x) => x,
+ None => bail!("Missing bind_dn and bind_password in LDAP login provider config"),
+ };
+
+ let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?;
+ ldap3::drive!(conn);
+ ldap.simple_bind(dn, pw).await?.success()?;
+
+ let (matches, _res) = ldap
+ .search(
+ &self.search_base,
+ Scope::Subtree,
+ &format!(
+ "(&(objectClass=inetOrgPerson)({}={}))",
+ self.mail_attr, email
+ ),
+ &self.attrs_to_retrieve,
+ )
+ .await?
+ .success()?;
+
+ if matches.is_empty() {
+ bail!("No such user account");
+ }
+ if matches.len() > 1 {
+ bail!("Multiple matching user accounts");
+ }
+ let user = SearchEntry::construct(matches.into_iter().next().unwrap());
+ debug!("Found matching LDAP user for email {}: {}", email, user.dn);
+
+ // cryptography
+ let crstr = get_attr(&user, &self.crypto_root_attr)?;
+ let cr = CryptoRoot(crstr);
+ let public_key = cr.public_key()?;
+
+ // storage
+ let storage = self.storage_creds_from_ldap_user(&user).await?;
+ drop(ldap);
+
+ Ok(PublicCredentials {
+ storage,
+ public_key,
+ })
+ }
+}
+
+fn get_attr(user: &SearchEntry, attr: &str) -> Result<String> {
+ Ok(user
+ .attrs
+ .get(attr)
+ .ok_or(anyhow!("Missing attr: {}", attr))?
+ .iter()
+ .next()
+ .ok_or(anyhow!("No value for attr: {}", attr))?
+ .clone())
+}
+
+fn check_identifier(id: &str) -> Result<()> {
+ let is_ok = id
+ .chars()
+ .all(|c| c.is_alphanumeric() || "-+_.@".contains(c));
+ if !is_ok {
+ bail!("Invalid username/email address, must contain only a-z A-Z 0-9 - + _ . @");
+ }
+ Ok(())
+}
diff --git a/aero-user/src/login/mod.rs b/aero-user/src/login/mod.rs
new file mode 100644
index 0000000..5e54b4a
--- /dev/null
+++ b/aero-user/src/login/mod.rs
@@ -0,0 +1,245 @@
+pub mod demo_provider;
+pub mod ldap_provider;
+pub mod static_provider;
+
+use std::sync::Arc;
+
+use anyhow::{anyhow, bail, Context, Result};
+use async_trait::async_trait;
+use base64::Engine;
+use rand::prelude::*;
+
+use crate::cryptoblob::*;
+use crate::storage::*;
+
+/// The trait LoginProvider defines the interface for a login provider that allows
+/// to retrieve storage and cryptographic credentials for access to a user account
+/// from their username and password.
+#[async_trait]
+pub trait LoginProvider {
+ /// The login method takes an account's password as an input to decypher
+ /// decryption keys and obtain full access to the user's account.
+ async fn login(&self, username: &str, password: &str) -> Result<Credentials>;
+ /// The public_login method takes an account's email address and returns
+ /// public credentials for adding mails to the user's inbox.
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials>;
+}
+
+/// ArcLoginProvider is simply an alias on a structure that is used
+/// in many places in the code
+pub type ArcLoginProvider = Arc<dyn LoginProvider + Send + Sync>;
+
+/// The struct Credentials represent all of the necessary information to interact
+/// with a user account's data after they are logged in.
+#[derive(Clone, Debug)]
+pub struct Credentials {
+ /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
+ pub storage: Builder,
+ /// The cryptographic keys are used to encrypt and decrypt data stored in S3 and K2V
+ pub keys: CryptoKeys,
+}
+
+#[derive(Clone, Debug)]
+pub struct PublicCredentials {
+ /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
+ pub storage: Builder,
+ pub public_key: PublicKey,
+}
+
+use serde::{Deserialize, Serialize};
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct CryptoRoot(pub String);
+
+impl CryptoRoot {
+ pub fn create_pass(password: &str, k: &CryptoKeys) -> Result<Self> {
+ let bytes = k.password_seal(password)?;
+ let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes);
+ let cr = format!("aero:cryptoroot:pass:{}", b64);
+ Ok(Self(cr))
+ }
+
+ pub fn create_cleartext(k: &CryptoKeys) -> Self {
+ let bytes = k.serialize();
+ let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes);
+ let cr = format!("aero:cryptoroot:cleartext:{}", b64);
+ Self(cr)
+ }
+
+ pub fn create_incoming(pk: &PublicKey) -> Self {
+ let bytes: &[u8] = &pk[..];
+ let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes);
+ let cr = format!("aero:cryptoroot:incoming:{}", b64);
+ Self(cr)
+ }
+
+ pub fn public_key(&self) -> Result<PublicKey> {
+ match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] {
+ ["aero", "cryptoroot", "pass", b64blob] => {
+ let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
+ if blob.len() < 32 {
+ bail!(
+ "Decoded data is {} bytes long, expect at least 32 bytes",
+ blob.len()
+ );
+ }
+ PublicKey::from_slice(&blob[..32]).context("must be a valid public key")
+ }
+ ["aero", "cryptoroot", "cleartext", b64blob] => {
+ let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
+ Ok(CryptoKeys::deserialize(&blob)?.public)
+ }
+ ["aero", "cryptoroot", "incoming", b64blob] => {
+ let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
+ if blob.len() < 32 {
+ bail!(
+ "Decoded data is {} bytes long, expect at least 32 bytes",
+ blob.len()
+ );
+ }
+ PublicKey::from_slice(&blob[..32]).context("must be a valid public key")
+ }
+ ["aero", "cryptoroot", "keyring", _] => {
+ bail!("keyring is not yet implemented!")
+ }
+ _ => bail!(format!(
+ "passed string '{}' is not a valid cryptoroot",
+ self.0
+ )),
+ }
+ }
+ pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> {
+ match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] {
+ ["aero", "cryptoroot", "pass", b64blob] => {
+ let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
+ CryptoKeys::password_open(password, &blob)
+ }
+ ["aero", "cryptoroot", "cleartext", b64blob] => {
+ let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
+ CryptoKeys::deserialize(&blob)
+ }
+ ["aero", "cryptoroot", "incoming", _] => {
+ bail!("incoming cryptoroot does not contain a crypto key!")
+ }
+ ["aero", "cryptoroot", "keyring", _] => {
+ bail!("keyring is not yet implemented!")
+ }
+ _ => bail!(format!(
+ "passed string '{}' is not a valid cryptoroot",
+ self.0
+ )),
+ }
+ }
+}
+
+/// The struct CryptoKeys contains the cryptographic keys used to encrypt and decrypt
+/// data in a user's mailbox.
+#[derive(Clone, Debug)]
+pub struct CryptoKeys {
+ /// Master key for symmetric encryption of mailbox data
+ pub master: Key,
+ /// Public/private keypair for encryption of incomming emails (secret part)
+ pub secret: SecretKey,
+ /// Public/private keypair for encryption of incomming emails (public part)
+ pub public: PublicKey,
+}
+
+// ----
+
+impl CryptoKeys {
+ /// Initialize a new cryptography root
+ pub fn init() -> Self {
+ let (public, secret) = gen_keypair();
+ let master = gen_key();
+ CryptoKeys {
+ master,
+ secret,
+ public,
+ }
+ }
+
+ // Clear text serialize/deserialize
+ /// Serialize the root as bytes without encryption
+ fn serialize(&self) -> [u8; 64] {
+ let mut res = [0u8; 64];
+ res[..32].copy_from_slice(self.master.as_ref());
+ res[32..].copy_from_slice(self.secret.as_ref());
+ res
+ }
+
+ /// Deserialize a clear text crypto root without encryption
+ fn deserialize(bytes: &[u8]) -> Result<Self> {
+ if bytes.len() != 64 {
+ bail!("Invalid length: {}, expected 64", bytes.len());
+ }
+ let master = Key::from_slice(&bytes[..32]).unwrap();
+ let secret = SecretKey::from_slice(&bytes[32..]).unwrap();
+ let public = secret.public_key();
+ Ok(Self {
+ master,
+ secret,
+ public,
+ })
+ }
+
+ // Password sealed keys serialize/deserialize
+ pub fn password_open(password: &str, blob: &[u8]) -> Result<Self> {
+ let _pubkey = &blob[0..32];
+ let kdf_salt = &blob[32..64];
+ let password_openned = try_open_encrypted_keys(kdf_salt, password, &blob[64..])?;
+
+ let keys = Self::deserialize(&password_openned)?;
+ Ok(keys)
+ }
+
+ pub fn password_seal(&self, password: &str) -> Result<Vec<u8>> {
+ let mut kdf_salt = [0u8; 32];
+ thread_rng().fill(&mut kdf_salt);
+
+ // Calculate key for password secret box
+ let password_key = derive_password_key(&kdf_salt, password)?;
+
+ // Seal a secret box that contains our crypto keys
+ let password_sealed = seal(&self.serialize(), &password_key)?;
+
+ // Create blob
+ let password_blob = [&self.public[..], &kdf_salt[..], &password_sealed].concat();
+
+ Ok(password_blob)
+ }
+}
+
+fn derive_password_key(kdf_salt: &[u8], password: &str) -> Result<Key> {
+ Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap())
+}
+
+fn try_open_encrypted_keys(
+ kdf_salt: &[u8],
+ password: &str,
+ encrypted_keys: &[u8],
+) -> Result<Vec<u8>> {
+ let password_key = derive_password_key(kdf_salt, password)?;
+ open(encrypted_keys, &password_key)
+}
+
+// ---- UTIL ----
+
+pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> {
+ use argon2::{password_hash, Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version};
+
+ let params = ParamsBuilder::new()
+ .output_len(output_len)
+ .build()
+ .map_err(|e| anyhow!("Invalid argon2 params: {}", e))?;
+ let argon2 = Argon2::new(Algorithm::default(), Version::default(), params);
+
+ let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt);
+ let valid_salt = password_hash::Salt::from_b64(&b64_salt)
+ .map_err(|e| anyhow!("Invalid salt, error {}", e))?;
+ let hash = argon2
+ .hash_password(password, valid_salt)
+ .map_err(|e| anyhow!("Unable to hash: {}", e))?;
+
+ let hash = hash.hash.ok_or(anyhow!("Missing output"))?;
+ assert!(hash.len() == output_len);
+ Ok(hash.as_bytes().to_vec())
+}
diff --git a/aero-user/src/login/static_provider.rs b/aero-user/src/login/static_provider.rs
new file mode 100644
index 0000000..ed39343
--- /dev/null
+++ b/aero-user/src/login/static_provider.rs
@@ -0,0 +1,188 @@
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use anyhow::{anyhow, bail};
+use async_trait::async_trait;
+use tokio::signal::unix::{signal, SignalKind};
+use tokio::sync::watch;
+
+use crate::config::*;
+use crate::login::*;
+use crate::storage;
+
+pub struct ContextualUserEntry {
+ pub username: String,
+ pub config: UserEntry,
+}
+
+#[derive(Default)]
+pub struct UserDatabase {
+ users: HashMap<String, Arc<ContextualUserEntry>>,
+ users_by_email: HashMap<String, Arc<ContextualUserEntry>>,
+}
+
+pub struct StaticLoginProvider {
+ user_db: watch::Receiver<UserDatabase>,
+ in_memory_store: storage::in_memory::MemDb,
+ garage_store: storage::garage::GarageRoot,
+}
+
+pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> {
+ let mut stream = signal(SignalKind::user_defined1())
+ .expect("failed to install SIGUSR1 signal hander for reload");
+
+ loop {
+ let ulist: UserList = match read_config(config.clone()) {
+ Ok(x) => x,
+ Err(e) => {
+ tracing::warn!(path=%config.as_path().to_string_lossy(), error=%e, "Unable to load config");
+ stream.recv().await;
+ continue;
+ }
+ };
+
+ let users = ulist
+ .into_iter()
+ .map(|(username, config)| {
+ (
+ username.clone(),
+ Arc::new(ContextualUserEntry { username, config }),
+ )
+ })
+ .collect::<HashMap<_, _>>();
+
+ let mut users_by_email = HashMap::new();
+ for (_, u) in users.iter() {
+ for m in u.config.email_addresses.iter() {
+ if users_by_email.contains_key(m) {
+ tracing::warn!("Several users have the same email address: {}", m);
+ stream.recv().await;
+ continue;
+ }
+ users_by_email.insert(m.clone(), u.clone());
+ }
+ }
+
+ tracing::info!("{} users loaded", users.len());
+ up.send(UserDatabase {
+ users,
+ users_by_email,
+ })
+ .context("update user db config")?;
+ stream.recv().await;
+ tracing::info!("Received SIGUSR1, reloading");
+ }
+}
+
+impl StaticLoginProvider {
+ pub async fn new(config: LoginStaticConfig) -> Result<Self> {
+ let (tx, mut rx) = watch::channel(UserDatabase::default());
+
+ tokio::spawn(update_user_list(config.user_list, tx));
+ rx.changed().await?;
+
+ Ok(Self {
+ user_db: rx,
+ in_memory_store: storage::in_memory::MemDb::new(),
+ garage_store: storage::garage::GarageRoot::new()?,
+ })
+ }
+}
+
+#[async_trait]
+impl LoginProvider for StaticLoginProvider {
+ async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
+ tracing::debug!(user=%username, "login");
+ let user = {
+ let user_db = self.user_db.borrow();
+ match user_db.users.get(username) {
+ None => bail!("User {} does not exist", username),
+ Some(u) => u.clone(),
+ }
+ };
+
+ tracing::debug!(user=%username, "verify password");
+ if !verify_password(password, &user.config.password)? {
+ bail!("Wrong password");
+ }
+
+ tracing::debug!(user=%username, "fetch keys");
+ let storage: storage::Builder = match &user.config.storage {
+ StaticStorage::InMemory => self.in_memory_store.builder(username).await,
+ StaticStorage::Garage(grgconf) => {
+ self.garage_store.user(storage::garage::GarageConf {
+ region: grgconf.aws_region.clone(),
+ k2v_endpoint: grgconf.k2v_endpoint.clone(),
+ s3_endpoint: grgconf.s3_endpoint.clone(),
+ aws_access_key_id: grgconf.aws_access_key_id.clone(),
+ aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
+ bucket: grgconf.bucket.clone(),
+ })?
+ }
+ };
+
+ let cr = CryptoRoot(user.config.crypto_root.clone());
+ let keys = cr.crypto_keys(password)?;
+
+ tracing::debug!(user=%username, "logged");
+ Ok(Credentials { storage, keys })
+ }
+
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials> {
+ let user = {
+ let user_db = self.user_db.borrow();
+ match user_db.users_by_email.get(email) {
+ None => bail!("Email {} does not exist", email),
+ Some(u) => u.clone(),
+ }
+ };
+ tracing::debug!(user=%user.username, "public_login");
+
+ let storage: storage::Builder = match &user.config.storage {
+ StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await,
+ StaticStorage::Garage(grgconf) => {
+ self.garage_store.user(storage::garage::GarageConf {
+ region: grgconf.aws_region.clone(),
+ k2v_endpoint: grgconf.k2v_endpoint.clone(),
+ s3_endpoint: grgconf.s3_endpoint.clone(),
+ aws_access_key_id: grgconf.aws_access_key_id.clone(),
+ aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
+ bucket: grgconf.bucket.clone(),
+ })?
+ }
+ };
+
+ let cr = CryptoRoot(user.config.crypto_root.clone());
+ let public_key = cr.public_key()?;
+
+ Ok(PublicCredentials {
+ storage,
+ public_key,
+ })
+ }
+}
+
+pub fn hash_password(password: &str) -> Result<String> {
+ use argon2::{
+ password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
+ Argon2,
+ };
+ let salt = SaltString::generate(&mut OsRng);
+ let argon2 = Argon2::default();
+ Ok(argon2
+ .hash_password(password.as_bytes(), &salt)
+ .map_err(|e| anyhow!("Argon2 error: {}", e))?
+ .to_string())
+}
+
+pub fn verify_password(password: &str, hash: &str) -> Result<bool> {
+ use argon2::{
+ password_hash::{PasswordHash, PasswordVerifier},
+ Argon2,
+ };
+ let parsed_hash =
+ PasswordHash::new(hash).map_err(|e| anyhow!("Invalid hashed password: {}", e))?;
+ Ok(Argon2::default()
+ .verify_password(password.as_bytes(), &parsed_hash)
+ .is_ok())
+}
diff --git a/aero-user/src/storage/garage.rs b/aero-user/src/storage/garage.rs
new file mode 100644
index 0000000..1164839
--- /dev/null
+++ b/aero-user/src/storage/garage.rs
@@ -0,0 +1,542 @@
+use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError};
+use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
+use aws_smithy_runtime_api::client::http::SharedHttpClient;
+use hyper_rustls::HttpsConnector;
+use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
+use hyper_util::rt::TokioExecutor;
+use serde::Serialize;
+
+use super::*;
+
+pub struct GarageRoot {
+ k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>,
+ aws_http: SharedHttpClient,
+}
+
+impl GarageRoot {
+ pub fn new() -> anyhow::Result<Self> {
+ let connector = hyper_rustls::HttpsConnectorBuilder::new()
+ .with_native_roots()?
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build();
+ let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector);
+ let aws_http = HyperClientBuilder::new().build_https();
+ Ok(Self { k2v_http, aws_http })
+ }
+
+ pub fn user(&self, conf: GarageConf) -> anyhow::Result<Arc<GarageUser>> {
+ let mut unicity: Vec<u8> = vec![];
+ unicity.extend_from_slice(file!().as_bytes());
+ unicity.append(&mut rmp_serde::to_vec(&conf)?);
+
+ Ok(Arc::new(GarageUser {
+ conf,
+ aws_http: self.aws_http.clone(),
+ k2v_http: self.k2v_http.clone(),
+ unicity,
+ }))
+ }
+}
+
+#[derive(Clone, Debug, Serialize)]
+pub struct GarageConf {
+ pub region: String,
+ pub s3_endpoint: String,
+ pub k2v_endpoint: String,
+ pub aws_access_key_id: String,
+ pub aws_secret_access_key: String,
+ pub bucket: String,
+}
+
+//@FIXME we should get rid of this builder
+//and allocate a S3 + K2V client only once per user
+//(and using a shared HTTP client)
+#[derive(Clone, Debug)]
+pub struct GarageUser {
+ conf: GarageConf,
+ aws_http: SharedHttpClient,
+ k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>,
+ unicity: Vec<u8>,
+}
+
+#[async_trait]
+impl IBuilder for GarageUser {
+ async fn build(&self) -> Result<Store, StorageError> {
+ let s3_creds = s3::config::Credentials::new(
+ self.conf.aws_access_key_id.clone(),
+ self.conf.aws_secret_access_key.clone(),
+ None,
+ None,
+ "aerogramme",
+ );
+
+ let sdk_config = aws_config::from_env()
+ .region(aws_config::Region::new(self.conf.region.clone()))
+ .credentials_provider(s3_creds)
+ .http_client(self.aws_http.clone())
+ .endpoint_url(self.conf.s3_endpoint.clone())
+ .load()
+ .await;
+
+ let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
+ .force_path_style(true)
+ .build();
+
+ let s3_client = aws_sdk_s3::Client::from_conf(s3_config);
+
+ let k2v_config = k2v_client::K2vClientConfig {
+ endpoint: self.conf.k2v_endpoint.clone(),
+ region: self.conf.region.clone(),
+ aws_access_key_id: self.conf.aws_access_key_id.clone(),
+ aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
+ bucket: self.conf.bucket.clone(),
+ user_agent: None,
+ };
+
+ let k2v_client =
+ match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) {
+ Err(e) => {
+ tracing::error!("unable to build k2v client: {}", e);
+ return Err(StorageError::Internal);
+ }
+ Ok(v) => v,
+ };
+
+ Ok(Box::new(GarageStore {
+ bucket: self.conf.bucket.clone(),
+ s3: s3_client,
+ k2v: k2v_client,
+ }))
+ }
+ fn unique(&self) -> UnicityBuffer {
+ UnicityBuffer(self.unicity.clone())
+ }
+}
+
+pub struct GarageStore {
+ bucket: String,
+ s3: s3::Client,
+ k2v: k2v_client::K2vClient,
+}
+
+fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal {
+ let new_row_ref = row_ref.with_causality(causal_value.causality.into());
+ let row_values = causal_value
+ .value
+ .into_iter()
+ .map(|k2v_value| match k2v_value {
+ k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
+ k2v_client::K2vValue::Value(v) => Alternative::Value(v),
+ })
+ .collect::<Vec<_>>();
+
+ RowVal {
+ row_ref: new_row_ref,
+ value: row_values,
+ }
+}
+
+#[async_trait]
+impl IStore for GarageStore {
+ async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ tracing::trace!(select=%select, command="row_fetch");
+ let (pk_list, batch_op) = match select {
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => (
+ vec![shard.to_string()],
+ vec![k2v_client::BatchReadOp {
+ partition_key: shard,
+ filter: k2v_client::Filter {
+ start: Some(sort_begin),
+ end: Some(sort_end),
+ ..k2v_client::Filter::default()
+ },
+ ..k2v_client::BatchReadOp::default()
+ }],
+ ),
+ Selector::List(row_ref_list) => (
+ row_ref_list
+ .iter()
+ .map(|row_ref| row_ref.uid.shard.to_string())
+ .collect::<Vec<_>>(),
+ row_ref_list
+ .iter()
+ .map(|row_ref| k2v_client::BatchReadOp {
+ partition_key: &row_ref.uid.shard,
+ filter: k2v_client::Filter {
+ start: Some(&row_ref.uid.sort),
+ ..k2v_client::Filter::default()
+ },
+ single_item: true,
+ ..k2v_client::BatchReadOp::default()
+ })
+ .collect::<Vec<_>>(),
+ ),
+ Selector::Prefix { shard, sort_prefix } => (
+ vec![shard.to_string()],
+ vec![k2v_client::BatchReadOp {
+ partition_key: shard,
+ filter: k2v_client::Filter {
+ prefix: Some(sort_prefix),
+ ..k2v_client::Filter::default()
+ },
+ ..k2v_client::BatchReadOp::default()
+ }],
+ ),
+ Selector::Single(row_ref) => {
+ let causal_value = match self
+ .k2v
+ .read_item(&row_ref.uid.shard, &row_ref.uid.sort)
+ .await
+ {
+ Err(k2v_client::Error::NotFound) => {
+ tracing::debug!(
+ "K2V item not found shard={}, sort={}, bucket={}",
+ row_ref.uid.shard,
+ row_ref.uid.sort,
+ self.bucket,
+ );
+ return Err(StorageError::NotFound);
+ }
+ Err(e) => {
+ tracing::error!(
+ "K2V read item shard={}, sort={}, bucket={} failed: {}",
+ row_ref.uid.shard,
+ row_ref.uid.sort,
+ self.bucket,
+ e
+ );
+ return Err(StorageError::Internal);
+ }
+ Ok(v) => v,
+ };
+
+ let row_val = causal_to_row_val((*row_ref).clone(), causal_value);
+ return Ok(vec![row_val]);
+ }
+ };
+
+ let all_raw_res = match self.k2v.read_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!(
+ "k2v read batch failed for {:?}, bucket {} with err: {}",
+ select,
+ self.bucket,
+ e
+ );
+ return Err(StorageError::Internal);
+ }
+ Ok(v) => v,
+ };
+ //println!("fetch res -> {:?}", all_raw_res);
+
+ let row_vals =
+ all_raw_res
+ .into_iter()
+ .zip(pk_list.into_iter())
+ .fold(vec![], |mut acc, (page, pk)| {
+ page.items
+ .into_iter()
+ .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
+ .for_each(|rr| acc.push(rr));
+
+ acc
+ });
+ tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch");
+
+ Ok(row_vals)
+ }
+ async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
+ tracing::trace!(select=%select, command="row_rm");
+ let del_op = match select {
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => vec![k2v_client::BatchDeleteOp {
+ partition_key: shard,
+ prefix: None,
+ start: Some(sort_begin),
+ end: Some(sort_end),
+ single_item: false,
+ }],
+ Selector::List(row_ref_list) => {
+ // Insert null values with causality token = delete
+ let batch_op = row_ref_list
+ .iter()
+ .map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.uid.shard,
+ sort_key: &v.uid.sort,
+ causality: v.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ })
+ .collect::<Vec<_>>();
+
+ return match self.k2v.insert_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("Unable to delete the list of values: {}", e);
+ Err(StorageError::Internal)
+ }
+ Ok(_) => Ok(()),
+ };
+ }
+ Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
+ partition_key: shard,
+ prefix: Some(sort_prefix),
+ start: None,
+ end: None,
+ single_item: false,
+ }],
+ Selector::Single(row_ref) => {
+ // Insert null values with causality token = delete
+ let batch_op = vec![k2v_client::BatchInsertOp {
+ partition_key: &row_ref.uid.shard,
+ sort_key: &row_ref.uid.sort,
+ causality: row_ref.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ }];
+
+ return match self.k2v.insert_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("Unable to delete the list of values: {}", e);
+ Err(StorageError::Internal)
+ }
+ Ok(_) => Ok(()),
+ };
+ }
+ };
+
+ // Finally here we only have prefix & range
+ match self.k2v.delete_batch(&del_op).await {
+ Err(e) => {
+ tracing::error!("delete batch error: {}", e);
+ Err(StorageError::Internal)
+ }
+ Ok(_) => Ok(()),
+ }
+ }
+
+ async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
+ tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
+ let batch_ops = values
+ .iter()
+ .map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.row_ref.uid.shard,
+ sort_key: &v.row_ref.uid.sort,
+ causality: v.row_ref.causality.clone().map(|ct| ct.into()),
+ value: v
+ .value
+ .iter()
+ .next()
+ .map(|cv| match cv {
+ Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
+ Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
+ })
+ .unwrap_or(k2v_client::K2vValue::Tombstone),
+ })
+ .collect::<Vec<_>>();
+
+ match self.k2v.insert_batch(&batch_ops).await {
+ Err(e) => {
+ tracing::error!("k2v can't insert some value: {}", e);
+ Err(StorageError::Internal)
+ }
+ Ok(v) => Ok(v),
+ }
+ }
+ async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
+ tracing::trace!(entry=%value, command="row_poll");
+ loop {
+ if let Some(ct) = &value.causality {
+ match self
+ .k2v
+ .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None)
+ .await
+ {
+ Err(e) => {
+ tracing::error!("Unable to poll item: {}", e);
+ return Err(StorageError::Internal);
+ }
+ Ok(None) => continue,
+ Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)),
+ }
+ } else {
+ match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
+ Err(k2v_client::Error::NotFound) => {
+ self.k2v
+ .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
+ .await
+ .map_err(|e| {
+ tracing::error!("Unable to insert item in polling logic: {}", e);
+ StorageError::Internal
+ })?;
+ }
+ Err(e) => {
+ tracing::error!("Unable to read item in polling logic: {}", e);
+ return Err(StorageError::Internal);
+ }
+ Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
+ }
+ }
+ }
+ }
+
+ async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_fetch");
+ let maybe_out = self
+ .s3
+ .get_object()
+ .bucket(self.bucket.to_string())
+ .key(blob_ref.0.to_string())
+ .send()
+ .await;
+
+ let object_output = match maybe_out {
+ Ok(output) => output,
+ Err(SdkError::ServiceError(x)) => match x.err() {
+ GetObjectError::NoSuchKey(_) => return Err(StorageError::NotFound),
+ e => {
+ tracing::warn!("Blob Fetch Error, Service Error: {}", e);
+ return Err(StorageError::Internal);
+ }
+ },
+ Err(e) => {
+ tracing::warn!("Blob Fetch Error, {}", e);
+ return Err(StorageError::Internal);
+ }
+ };
+
+ let buffer = match object_output.body.collect().await {
+ Ok(aggreg) => aggreg.to_vec(),
+ Err(e) => {
+ tracing::warn!("Fetching body failed with {}", e);
+ return Err(StorageError::Internal);
+ }
+ };
+
+ let mut bv = BlobVal::new(blob_ref.clone(), buffer);
+ if let Some(meta) = object_output.metadata {
+ bv.meta = meta;
+ }
+ tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0);
+ Ok(bv)
+ }
+ async fn blob_insert(&self, blob_val: BlobVal) -> Result<String, StorageError> {
+ tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
+ let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
+ let obj_key = blob_val.blob_ref.0;
+
+ let maybe_send = self
+ .s3
+ .put_object()
+ .bucket(self.bucket.to_string())
+ .key(obj_key.to_string())
+ .set_metadata(Some(blob_val.meta))
+ .body(streamable_value)
+ .send()
+ .await;
+
+ match maybe_send {
+ Err(e) => {
+ tracing::error!("unable to send object: {}", e);
+ Err(StorageError::Internal)
+ }
+ Ok(put_output) => {
+ tracing::debug!("Inserted {}/{}", self.bucket, obj_key);
+ Ok(put_output
+ .e_tag()
+ .map(|v| format!("\"{}\"", v))
+ .unwrap_or(format!("W/\"{}\"", obj_key)))
+ }
+ }
+ }
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(src=%src, dst=%dst, command="blob_copy");
+ let maybe_copy = self
+ .s3
+ .copy_object()
+ .bucket(self.bucket.to_string())
+ .key(dst.0.clone())
+ .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone()))
+ .send()
+ .await;
+
+ match maybe_copy {
+ Err(e) => {
+ tracing::error!(
+ "unable to copy object {} to {} (bucket: {}), error: {}",
+ src.0,
+ dst.0,
+ self.bucket,
+ e
+ );
+ Err(StorageError::Internal)
+ }
+ Ok(_) => {
+ tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket);
+ Ok(())
+ }
+ }
+ }
+ async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
+ tracing::trace!(prefix = prefix, command = "blob_list");
+ let maybe_list = self
+ .s3
+ .list_objects_v2()
+ .bucket(self.bucket.to_string())
+ .prefix(prefix)
+ .into_paginator()
+ .send()
+ .try_collect()
+ .await;
+
+ match maybe_list {
+ Err(e) => {
+ tracing::error!(
+ "listing prefix {} on bucket {} failed: {}",
+ prefix,
+ self.bucket,
+ e
+ );
+ Err(StorageError::Internal)
+ }
+ Ok(pagin_list_out) => Ok(pagin_list_out
+ .into_iter()
+ .map(|list_out| list_out.contents.unwrap_or(vec![]))
+ .flatten()
+ .map(|obj| BlobRef(obj.key.unwrap_or(String::new())))
+ .collect::<Vec<_>>()),
+ }
+ }
+ async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_rm");
+ let maybe_delete = self
+ .s3
+ .delete_object()
+ .bucket(self.bucket.to_string())
+ .key(blob_ref.0.clone())
+ .send()
+ .await;
+
+ match maybe_delete {
+ Err(e) => {
+ tracing::error!(
+ "unable to delete {} (bucket: {}), error {}",
+ blob_ref.0,
+ self.bucket,
+ e
+ );
+ Err(StorageError::Internal)
+ }
+ Ok(_) => {
+ tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket);
+ Ok(())
+ }
+ }
+ }
+}
diff --git a/aero-user/src/storage/in_memory.rs b/aero-user/src/storage/in_memory.rs
new file mode 100644
index 0000000..5c8eb26
--- /dev/null
+++ b/aero-user/src/storage/in_memory.rs
@@ -0,0 +1,344 @@
+use std::collections::BTreeMap;
+use std::ops::Bound::{self, Excluded, Included, Unbounded};
+use std::sync::RwLock;
+
+use sodiumoxide::{crypto::hash, hex};
+use tokio::sync::Notify;
+
+use crate::storage::*;
+
+/// This implementation is very inneficient, and not completely correct
+/// Indeed, when the connector is dropped, the memory is freed.
+/// It means that when a user disconnects, its data are lost.
+/// It's intended only for basic debugging, do not use it for advanced tests...
+
+#[derive(Debug, Default)]
+pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>);
+impl MemDb {
+ pub fn new() -> Self {
+ Self(tokio::sync::Mutex::new(HashMap::new()))
+ }
+
+ pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
+ let mut global_storage = self.0.lock().await;
+ global_storage
+ .entry(username.to_string())
+ .or_insert(MemBuilder::new(username))
+ .clone()
+ }
+}
+
+#[derive(Debug, Clone)]
+enum InternalData {
+ Tombstone,
+ Value(Vec<u8>),
+}
+impl InternalData {
+ fn to_alternative(&self) -> Alternative {
+ match self {
+ Self::Tombstone => Alternative::Tombstone,
+ Self::Value(x) => Alternative::Value(x.clone()),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct InternalRowVal {
+ data: Vec<InternalData>,
+ version: u64,
+ change: Arc<Notify>,
+}
+impl std::default::Default for InternalRowVal {
+ fn default() -> Self {
+ Self {
+ data: vec![],
+ version: 1,
+ change: Arc::new(Notify::new()),
+ }
+ }
+}
+impl InternalRowVal {
+ fn concurrent_values(&self) -> Vec<Alternative> {
+ self.data.iter().map(InternalData::to_alternative).collect()
+ }
+
+ fn to_row_val(&self, row_ref: RowRef) -> RowVal {
+ RowVal {
+ row_ref: row_ref.with_causality(self.version.to_string()),
+ value: self.concurrent_values(),
+ }
+ }
+}
+
+#[derive(Debug, Default, Clone)]
+struct InternalBlobVal {
+ data: Vec<u8>,
+ metadata: HashMap<String, String>,
+}
+impl InternalBlobVal {
+ fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
+ BlobVal {
+ blob_ref: bref.clone(),
+ meta: self.metadata.clone(),
+ value: self.data.clone(),
+ }
+ }
+ fn etag(&self) -> String {
+ let digest = hash::hash(self.data.as_ref());
+ let buff = digest.as_ref();
+ let hexstr = hex::encode(buff);
+ format!("\"{}\"", hexstr)
+ }
+}
+
+type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>;
+type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>;
+
+#[derive(Clone, Debug)]
+pub struct MemBuilder {
+ unicity: Vec<u8>,
+ row: ArcRow,
+ blob: ArcBlob,
+}
+
+impl MemBuilder {
+ pub fn new(user: &str) -> Arc<Self> {
+ tracing::debug!("initialize membuilder for {}", user);
+ let mut unicity: Vec<u8> = vec![];
+ unicity.extend_from_slice(file!().as_bytes());
+ unicity.extend_from_slice(user.as_bytes());
+ Arc::new(Self {
+ unicity,
+ row: Arc::new(RwLock::new(HashMap::new())),
+ blob: Arc::new(RwLock::new(BTreeMap::new())),
+ })
+ }
+}
+
+#[async_trait]
+impl IBuilder for MemBuilder {
+ async fn build(&self) -> Result<Store, StorageError> {
+ Ok(Box::new(MemStore {
+ row: self.row.clone(),
+ blob: self.blob.clone(),
+ }))
+ }
+
+ fn unique(&self) -> UnicityBuffer {
+ UnicityBuffer(self.unicity.clone())
+ }
+}
+
+pub struct MemStore {
+ row: ArcRow,
+ blob: ArcBlob,
+}
+
+fn prefix_last_bound(prefix: &str) -> Bound<String> {
+ let mut sort_end = prefix.to_string();
+ match sort_end.pop() {
+ None => Unbounded,
+ Some(ch) => {
+ let nc = char::from_u32(ch as u32 + 1).unwrap();
+ sort_end.push(nc);
+ Excluded(sort_end)
+ }
+ }
+}
+
+impl MemStore {
+ fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
+ tracing::trace!(entry=%entry, command="row_rm_single");
+ let mut store = self.row.write().or(Err(StorageError::Internal))?;
+ let shard = &entry.uid.shard;
+ let sort = &entry.uid.sort;
+
+ let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+
+ let bt = store.entry(shard.to_string()).or_default();
+ let intval = bt.entry(sort.to_string()).or_default();
+
+ if cauz == intval.version {
+ intval.data.clear();
+ }
+ intval.data.push(InternalData::Tombstone);
+ intval.version += 1;
+ intval.change.notify_waiters();
+
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl IStore for MemStore {
+ async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ tracing::trace!(select=%select, command="row_fetch");
+ let store = self.row.read().or(Err(StorageError::Internal))?;
+
+ match select {
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => Ok(store
+ .get(*shard)
+ .unwrap_or(&BTreeMap::new())
+ .range((
+ Included(sort_begin.to_string()),
+ Excluded(sort_end.to_string()),
+ ))
+ .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
+ .collect::<Vec<_>>()),
+ Selector::List(rlist) => {
+ let mut acc = vec![];
+ for row_ref in rlist {
+ let maybe_intval = store
+ .get(&row_ref.uid.shard)
+ .map(|v| v.get(&row_ref.uid.sort))
+ .flatten();
+ if let Some(intval) = maybe_intval {
+ acc.push(intval.to_row_val(row_ref.clone()));
+ }
+ }
+ Ok(acc)
+ }
+ Selector::Prefix { shard, sort_prefix } => {
+ let last_bound = prefix_last_bound(sort_prefix);
+
+ Ok(store
+ .get(*shard)
+ .unwrap_or(&BTreeMap::new())
+ .range((Included(sort_prefix.to_string()), last_bound))
+ .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
+ .collect::<Vec<_>>())
+ }
+ Selector::Single(row_ref) => {
+ let intval = store
+ .get(&row_ref.uid.shard)
+ .ok_or(StorageError::NotFound)?
+ .get(&row_ref.uid.sort)
+ .ok_or(StorageError::NotFound)?;
+ Ok(vec![intval.to_row_val((*row_ref).clone())])
+ }
+ }
+ }
+
+ async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
+ tracing::trace!(select=%select, command="row_rm");
+
+ let values = match select {
+ Selector::Range { .. } | Selector::Prefix { .. } => self
+ .row_fetch(select)
+ .await?
+ .into_iter()
+ .map(|rv| rv.row_ref)
+ .collect::<Vec<_>>(),
+ Selector::List(rlist) => rlist.clone(),
+ Selector::Single(row_ref) => vec![(*row_ref).clone()],
+ };
+
+ for v in values.into_iter() {
+ self.row_rm_single(&v)?;
+ }
+ Ok(())
+ }
+
+ async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
+ tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
+ let mut store = self.row.write().or(Err(StorageError::Internal))?;
+ for v in values.into_iter() {
+ let shard = v.row_ref.uid.shard;
+ let sort = v.row_ref.uid.sort;
+
+ let val = match v.value.into_iter().next() {
+ Some(Alternative::Value(x)) => x,
+ _ => vec![],
+ };
+
+ let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+
+ let bt = store.entry(shard).or_default();
+ let intval = bt.entry(sort).or_default();
+
+ if cauz == intval.version {
+ intval.data.clear();
+ }
+ intval.data.push(InternalData::Value(val));
+ intval.version += 1;
+ intval.change.notify_waiters();
+ }
+ Ok(())
+ }
+ async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
+ tracing::trace!(entry=%value, command="row_poll");
+ let shard = &value.uid.shard;
+ let sort = &value.uid.sort;
+ let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+
+ let notify_me = {
+ let mut store = self.row.write().or(Err(StorageError::Internal))?;
+ let bt = store.entry(shard.to_string()).or_default();
+ let intval = bt.entry(sort.to_string()).or_default();
+
+ if intval.version != cauz {
+ return Ok(intval.to_row_val(value.clone()));
+ }
+ intval.change.clone()
+ };
+
+ notify_me.notified().await;
+
+ let res = self.row_fetch(&Selector::Single(value)).await?;
+ res.into_iter().next().ok_or(StorageError::NotFound)
+ }
+
+ async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_fetch");
+ let store = self.blob.read().or(Err(StorageError::Internal))?;
+ store
+ .get(&blob_ref.0)
+ .ok_or(StorageError::NotFound)
+ .map(|v| v.to_blob_val(blob_ref))
+ }
+ async fn blob_insert(&self, blob_val: BlobVal) -> Result<String, StorageError> {
+ tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
+ let mut store = self.blob.write().or(Err(StorageError::Internal))?;
+ let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
+ entry.data = blob_val.value.clone();
+ entry.metadata = blob_val.meta.clone();
+
+ Ok(entry.etag())
+ }
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(src=%src, dst=%dst, command="blob_copy");
+ let mut store = self.blob.write().or(Err(StorageError::Internal))?;
+ let blob_src = store.entry(src.0.clone()).or_default().clone();
+ store.insert(dst.0.clone(), blob_src);
+ Ok(())
+ }
+ async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
+ tracing::trace!(prefix = prefix, command = "blob_list");
+ let store = self.blob.read().or(Err(StorageError::Internal))?;
+ let last_bound = prefix_last_bound(prefix);
+ let blist = store
+ .range((Included(prefix.to_string()), last_bound))
+ .map(|(k, _)| BlobRef(k.to_string()))
+ .collect::<Vec<_>>();
+ Ok(blist)
+ }
+ async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_rm");
+ let mut store = self.blob.write().or(Err(StorageError::Internal))?;
+ store.remove(&blob_ref.0);
+ Ok(())
+ }
+}
diff --git a/aero-user/src/storage/mod.rs b/aero-user/src/storage/mod.rs
new file mode 100644
index 0000000..527765f
--- /dev/null
+++ b/aero-user/src/storage/mod.rs
@@ -0,0 +1,180 @@
+/*
+ *
+ * This abstraction goal is to leverage all the semantic of Garage K2V+S3,
+ * to be as tailored as possible to it ; it aims to be a zero-cost abstraction
+ * compared to when we where directly using the K2V+S3 client.
+ *
+ * My idea: we can encapsulate the causality token
+ * into the object system so it is not exposed.
+ */
+
+pub mod garage;
+pub mod in_memory;
+
+use std::collections::HashMap;
+use std::hash::Hash;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+#[derive(Debug, Clone)]
+pub enum Alternative {
+ Tombstone,
+ Value(Vec<u8>),
+}
+type ConcurrentValues = Vec<Alternative>;
+
+#[derive(Debug, Clone)]
+pub enum StorageError {
+ NotFound,
+ Internal,
+}
+impl std::fmt::Display for StorageError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str("Storage Error: ")?;
+ match self {
+ Self::NotFound => f.write_str("Item not found"),
+ Self::Internal => f.write_str("An internal error occured"),
+ }
+ }
+}
+impl std::error::Error for StorageError {}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct RowUid {
+ pub shard: String,
+ pub sort: String,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct RowRef {
+ pub uid: RowUid,
+ pub causality: Option<String>,
+}
+impl std::fmt::Display for RowRef {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "RowRef({}, {}, {:?})",
+ self.uid.shard, self.uid.sort, self.causality
+ )
+ }
+}
+
+impl RowRef {
+ pub fn new(shard: &str, sort: &str) -> Self {
+ Self {
+ uid: RowUid {
+ shard: shard.to_string(),
+ sort: sort.to_string(),
+ },
+ causality: None,
+ }
+ }
+ pub fn with_causality(mut self, causality: String) -> Self {
+ self.causality = Some(causality);
+ self
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct RowVal {
+ pub row_ref: RowRef,
+ pub value: ConcurrentValues,
+}
+
+impl RowVal {
+ pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self {
+ Self {
+ row_ref,
+ value: vec![Alternative::Value(value)],
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct BlobRef(pub String);
+impl std::fmt::Display for BlobRef {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "BlobRef({})", self.0)
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct BlobVal {
+ pub blob_ref: BlobRef,
+ pub meta: HashMap<String, String>,
+ pub value: Vec<u8>,
+}
+impl BlobVal {
+ pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self {
+ Self {
+ blob_ref,
+ value,
+ meta: HashMap::new(),
+ }
+ }
+
+ pub fn with_meta(mut self, k: String, v: String) -> Self {
+ self.meta.insert(k, v);
+ self
+ }
+}
+
+#[derive(Debug)]
+pub enum Selector<'a> {
+ Range {
+ shard: &'a str,
+ sort_begin: &'a str,
+ sort_end: &'a str,
+ },
+ List(Vec<RowRef>), // list of (shard_key, sort_key)
+ #[allow(dead_code)]
+ Prefix {
+ shard: &'a str,
+ sort_prefix: &'a str,
+ },
+ Single(&'a RowRef),
+}
+impl<'a> std::fmt::Display for Selector<'a> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end),
+ Self::List(list) => write!(f, "List({:?})", list),
+ Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix),
+ Self::Single(row_ref) => write!(f, "Single({})", row_ref),
+ }
+ }
+}
+
+#[async_trait]
+pub trait IStore {
+ async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
+ async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
+ async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
+ async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
+
+ async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
+ async fn blob_insert(&self, blob_val: BlobVal) -> Result<String, StorageError>;
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>;
+ async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
+ async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+pub struct UnicityBuffer(Vec<u8>);
+
+#[async_trait]
+pub trait IBuilder: std::fmt::Debug {
+ async fn build(&self) -> Result<Store, StorageError>;
+
+ /// Returns an opaque buffer that uniquely identifies this builder
+ fn unique(&self) -> UnicityBuffer;
+}
+
+pub type Builder = Arc<dyn IBuilder + Send + Sync>;
+pub type Store = Box<dyn IStore + Send + Sync>;