diff options
Diffstat (limited to 'aero-user')
-rw-r--r-- | aero-user/Cargo.toml | 30 | ||||
-rw-r--r-- | aero-user/src/config.rs | 191 | ||||
-rw-r--r-- | aero-user/src/cryptoblob.rs | 67 | ||||
-rw-r--r-- | aero-user/src/lib.rs | 9 | ||||
-rw-r--r-- | aero-user/src/login/demo_provider.rs | 51 | ||||
-rw-r--r-- | aero-user/src/login/ldap_provider.rs | 264 | ||||
-rw-r--r-- | aero-user/src/login/mod.rs | 245 | ||||
-rw-r--r-- | aero-user/src/login/static_provider.rs | 188 | ||||
-rw-r--r-- | aero-user/src/storage/garage.rs | 538 | ||||
-rw-r--r-- | aero-user/src/storage/in_memory.rs | 334 | ||||
-rw-r--r-- | aero-user/src/storage/mod.rs | 180 |
11 files changed, 2097 insertions, 0 deletions
diff --git a/aero-user/Cargo.toml b/aero-user/Cargo.toml new file mode 100644 index 0000000..fc851e2 --- /dev/null +++ b/aero-user/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "aero-user" +version = "0.3.0" +authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"] +edition = "2021" +license = "EUPL-1.2" +description = "Represent an encrypted user profile" + +[dependencies] +anyhow.workspace = true +serde.workspace = true +zstd.workspace = true +sodiumoxide.workspace = true +log.workspace = true +async-trait.workspace = true +ldap3.workspace = true +base64.workspace = true +rand.workspace = true +tokio.workspace = true +aws-config.workspace = true +aws-sdk-s3.workspace = true +aws-smithy-runtime.workspace = true +aws-smithy-runtime-api.workspace = true +hyper-rustls.workspace = true +hyper-util.workspace = true +k2v-client.workspace = true +rmp-serde.workspace = true +toml.workspace = true +tracing.workspace = true +argon2.workspace = true diff --git a/aero-user/src/config.rs b/aero-user/src/config.rs new file mode 100644 index 0000000..7de2eac --- /dev/null +++ b/aero-user/src/config.rs @@ -0,0 +1,191 @@ +use std::collections::HashMap; +use std::io::{Read, Write}; +use std::net::SocketAddr; +use std::path::PathBuf; + +use anyhow::Result; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CompanionConfig { + pub pid: Option<PathBuf>, + pub imap: ImapUnsecureConfig, + // @FIXME Add DAV + + #[serde(flatten)] + pub users: LoginStaticConfig, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ProviderConfig { + pub pid: Option<PathBuf>, + pub imap: Option<ImapConfig>, + pub imap_unsecure: Option<ImapUnsecureConfig>, + pub lmtp: Option<LmtpConfig>, + pub auth: Option<AuthConfig>, + pub dav_unsecure: Option<DavUnsecureConfig>, + pub users: UserManagement, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "user_driver")] +pub enum UserManagement { + Demo, + Static(LoginStaticConfig), + Ldap(LoginLdapConfig), +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AuthConfig { + pub bind_addr: SocketAddr, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LmtpConfig { + pub bind_addr: SocketAddr, + pub hostname: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ImapConfig { + pub bind_addr: SocketAddr, + pub certs: PathBuf, + pub key: PathBuf, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DavUnsecureConfig { + pub bind_addr: SocketAddr, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ImapUnsecureConfig { + pub bind_addr: SocketAddr, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LoginStaticConfig { + pub user_list: PathBuf, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "storage_driver")] +pub enum LdapStorage { + Garage(LdapGarageConfig), + InMemory, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LdapGarageConfig { + pub s3_endpoint: String, + pub k2v_endpoint: String, + pub aws_region: String, + + pub aws_access_key_id_attr: String, + pub aws_secret_access_key_attr: String, + pub bucket_attr: Option<String>, + pub default_bucket: Option<String>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LoginLdapConfig { + // LDAP connection info + pub ldap_server: String, + #[serde(default)] + pub pre_bind_on_login: bool, + pub bind_dn: Option<String>, + pub bind_password: Option<String>, + pub search_base: String, + + // Schema-like info required for Aerogramme's logic + pub username_attr: String, + #[serde(default = "default_mail_attr")] + pub mail_attr: String, + + // The field that will contain the crypto root thingy + pub crypto_root_attr: String, + + // Storage related thing + #[serde(flatten)] + pub storage: LdapStorage, +} + +// ---- + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "storage_driver")] +pub enum StaticStorage { + Garage(StaticGarageConfig), + InMemory, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct StaticGarageConfig { + pub s3_endpoint: String, + pub k2v_endpoint: String, + pub aws_region: String, + + pub aws_access_key_id: String, + pub aws_secret_access_key: String, + pub bucket: String, +} + +pub type UserList = HashMap<String, UserEntry>; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct UserEntry { + #[serde(default)] + pub email_addresses: Vec<String>, + pub password: String, + pub crypto_root: String, + + #[serde(flatten)] + pub storage: StaticStorage, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SetupEntry { + #[serde(default)] + pub email_addresses: Vec<String>, + + #[serde(default)] + pub clear_password: Option<String>, + + #[serde(flatten)] + pub storage: StaticStorage, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "role")] +pub enum AnyConfig { + Companion(CompanionConfig), + Provider(ProviderConfig), +} + +// --- +pub fn read_config<T: serde::de::DeserializeOwned>(config_file: PathBuf) -> Result<T> { + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(config_file.as_path())?; + + let mut config = String::new(); + file.read_to_string(&mut config)?; + + Ok(toml::from_str(&config)?) +} + +pub fn write_config<T: Serialize>(config_file: PathBuf, config: &T) -> Result<()> { + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(config_file.as_path())?; + + file.write_all(toml::to_string(config)?.as_bytes())?; + + Ok(()) +} + +fn default_mail_attr() -> String { + "mail".into() +} diff --git a/aero-user/src/cryptoblob.rs b/aero-user/src/cryptoblob.rs new file mode 100644 index 0000000..327a642 --- /dev/null +++ b/aero-user/src/cryptoblob.rs @@ -0,0 +1,67 @@ +//! Helper functions for secret-key encrypted blobs +//! that contain Zstd encrypted data + +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode}; + +//use sodiumoxide::crypto::box_ as publicbox; +use sodiumoxide::crypto::secretbox::xsalsa20poly1305 as secretbox; + +pub use sodiumoxide::crypto::box_::{ + gen_keypair, PublicKey, SecretKey, PUBLICKEYBYTES, SECRETKEYBYTES, +}; +pub use sodiumoxide::crypto::secretbox::xsalsa20poly1305::{gen_key, Key, KEYBYTES}; + +pub fn open(cryptoblob: &[u8], key: &Key) -> Result<Vec<u8>> { + use secretbox::{Nonce, NONCEBYTES}; + + if cryptoblob.len() < NONCEBYTES { + return Err(anyhow!("Cyphertext too short")); + } + + // Decrypt -> get Zstd data + let nonce = Nonce::from_slice(&cryptoblob[..NONCEBYTES]).unwrap(); + let zstdblob = secretbox::open(&cryptoblob[NONCEBYTES..], &nonce, key) + .map_err(|_| anyhow!("Could not decrypt blob"))?; + + // Decompress zstd data + let mut reader = &zstdblob[..]; + let data = zstd_decode(&mut reader)?; + + Ok(data) +} + +pub fn seal(plainblob: &[u8], key: &Key) -> Result<Vec<u8>> { + use secretbox::{gen_nonce, NONCEBYTES}; + + // Compress data using zstd + let mut reader = plainblob; + let zstdblob = zstd_encode(&mut reader, 0)?; + + // Encrypt + let nonce = gen_nonce(); + let cryptoblob = secretbox::seal(&zstdblob, &nonce, key); + + let mut res = Vec::with_capacity(NONCEBYTES + cryptoblob.len()); + res.extend(nonce.as_ref()); + res.extend(cryptoblob); + + Ok(res) +} + +pub fn open_deserialize<T: for<'de> Deserialize<'de>>(cryptoblob: &[u8], key: &Key) -> Result<T> { + let blob = open(cryptoblob, key)?; + + Ok(rmp_serde::decode::from_read_ref::<_, T>(&blob)?) +} + +pub fn seal_serialize<T: Serialize>(obj: T, key: &Key) -> Result<Vec<u8>> { + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + obj.serialize(&mut se)?; + + seal(&wr, key) +} diff --git a/aero-user/src/lib.rs b/aero-user/src/lib.rs new file mode 100644 index 0000000..9b08fe2 --- /dev/null +++ b/aero-user/src/lib.rs @@ -0,0 +1,9 @@ +pub mod config; +pub mod cryptoblob; +pub mod login; +pub mod storage; + +// A user is composed of 3 things: +// - An identity (login) +// - A storage profile (storage) +// - Some cryptography data (cryptoblob) diff --git a/aero-user/src/login/demo_provider.rs b/aero-user/src/login/demo_provider.rs new file mode 100644 index 0000000..11c7d54 --- /dev/null +++ b/aero-user/src/login/demo_provider.rs @@ -0,0 +1,51 @@ +use crate::login::*; +use crate::storage::*; + +pub struct DemoLoginProvider { + keys: CryptoKeys, + in_memory_store: in_memory::MemDb, +} + +impl DemoLoginProvider { + pub fn new() -> Self { + Self { + keys: CryptoKeys::init(), + in_memory_store: in_memory::MemDb::new(), + } + } +} + +#[async_trait] +impl LoginProvider for DemoLoginProvider { + async fn login(&self, username: &str, password: &str) -> Result<Credentials> { + tracing::debug!(user=%username, "login"); + + if username != "alice" { + bail!("user does not exist"); + } + + if password != "hunter2" { + bail!("wrong password"); + } + + let storage = self.in_memory_store.builder("alice").await; + let keys = self.keys.clone(); + + Ok(Credentials { storage, keys }) + } + + async fn public_login(&self, email: &str) -> Result<PublicCredentials> { + tracing::debug!(user=%email, "public_login"); + if email != "alice@example.tld" { + bail!("invalid email address"); + } + + let storage = self.in_memory_store.builder("alice").await; + let public_key = self.keys.public.clone(); + + Ok(PublicCredentials { + storage, + public_key, + }) + } +} diff --git a/aero-user/src/login/ldap_provider.rs b/aero-user/src/login/ldap_provider.rs new file mode 100644 index 0000000..ca5a356 --- /dev/null +++ b/aero-user/src/login/ldap_provider.rs @@ -0,0 +1,264 @@ +use async_trait::async_trait; +use ldap3::{LdapConnAsync, Scope, SearchEntry}; +use log::debug; + +use crate::config::*; +use crate::storage; +use super::*; + +pub struct LdapLoginProvider { + ldap_server: String, + + pre_bind_on_login: bool, + bind_dn_and_pw: Option<(String, String)>, + + search_base: String, + attrs_to_retrieve: Vec<String>, + username_attr: String, + mail_attr: String, + crypto_root_attr: String, + + storage_specific: StorageSpecific, + in_memory_store: storage::in_memory::MemDb, + garage_store: storage::garage::GarageRoot, +} + +enum BucketSource { + Constant(String), + Attr(String), +} + +enum StorageSpecific { + InMemory, + Garage { + from_config: LdapGarageConfig, + bucket_source: BucketSource, + }, +} + +impl LdapLoginProvider { + pub fn new(config: LoginLdapConfig) -> Result<Self> { + let bind_dn_and_pw = match (config.bind_dn, config.bind_password) { + (Some(dn), Some(pw)) => Some((dn, pw)), + (None, None) => None, + _ => bail!( + "If either of `bind_dn` or `bind_password` is set, the other must be set as well." + ), + }; + + if config.pre_bind_on_login && bind_dn_and_pw.is_none() { + bail!("Cannot use `pre_bind_on_login` without setting `bind_dn` and `bind_password`"); + } + + let mut attrs_to_retrieve = vec![ + config.username_attr.clone(), + config.mail_attr.clone(), + config.crypto_root_attr.clone(), + ]; + + // storage specific + let specific = match config.storage { + LdapStorage::InMemory => StorageSpecific::InMemory, + LdapStorage::Garage(grgconf) => { + attrs_to_retrieve.push(grgconf.aws_access_key_id_attr.clone()); + attrs_to_retrieve.push(grgconf.aws_secret_access_key_attr.clone()); + + let bucket_source = + match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) { + (Some(b), None) => BucketSource::Constant(b), + (None, Some(a)) => BucketSource::Attr(a), + _ => bail!("Must set `bucket` or `bucket_attr`, but not both"), + }; + + if let BucketSource::Attr(a) = &bucket_source { + attrs_to_retrieve.push(a.clone()); + } + + StorageSpecific::Garage { + from_config: grgconf, + bucket_source, + } + } + }; + + Ok(Self { + ldap_server: config.ldap_server, + pre_bind_on_login: config.pre_bind_on_login, + bind_dn_and_pw, + search_base: config.search_base, + attrs_to_retrieve, + username_attr: config.username_attr, + mail_attr: config.mail_attr, + crypto_root_attr: config.crypto_root_attr, + storage_specific: specific, + //@FIXME should be created outside of the login provider + //Login provider should return only a cryptoroot + a storage URI + //storage URI that should be resolved outside... + in_memory_store: storage::in_memory::MemDb::new(), + garage_store: storage::garage::GarageRoot::new()?, + }) + } + + async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> { + let storage: Builder = match &self.storage_specific { + StorageSpecific::InMemory => { + self.in_memory_store + .builder(&get_attr(user, &self.username_attr)?) + .await + } + StorageSpecific::Garage { + from_config, + bucket_source, + } => { + let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; + let aws_secret_access_key = + get_attr(user, &from_config.aws_secret_access_key_attr)?; + let bucket = match bucket_source { + BucketSource::Constant(b) => b.clone(), + BucketSource::Attr(a) => get_attr(user, &a)?, + }; + + self.garage_store.user(storage::garage::GarageConf { + region: from_config.aws_region.clone(), + s3_endpoint: from_config.s3_endpoint.clone(), + k2v_endpoint: from_config.k2v_endpoint.clone(), + aws_access_key_id, + aws_secret_access_key, + bucket, + })? + } + }; + + Ok(storage) + } +} + +#[async_trait] +impl LoginProvider for LdapLoginProvider { + async fn login(&self, username: &str, password: &str) -> Result<Credentials> { + check_identifier(username)?; + + let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?; + ldap3::drive!(conn); + + if self.pre_bind_on_login { + let (dn, pw) = self.bind_dn_and_pw.as_ref().unwrap(); + ldap.simple_bind(dn, pw).await?.success()?; + } + + let (matches, _res) = ldap + .search( + &self.search_base, + Scope::Subtree, + &format!( + "(&(objectClass=inetOrgPerson)({}={}))", + self.username_attr, username + ), + &self.attrs_to_retrieve, + ) + .await? + .success()?; + + if matches.is_empty() { + bail!("Invalid username"); + } + if matches.len() > 1 { + bail!("Invalid username (multiple matching accounts)"); + } + let user = SearchEntry::construct(matches.into_iter().next().unwrap()); + debug!( + "Found matching LDAP user for username {}: {}", + username, user.dn + ); + + // Try to login against LDAP server with provided password + // to check user's password + ldap.simple_bind(&user.dn, password) + .await? + .success() + .context("Invalid password")?; + debug!("Ldap login with user name {} successfull", username); + + // cryptography + let crstr = get_attr(&user, &self.crypto_root_attr)?; + let cr = CryptoRoot(crstr); + let keys = cr.crypto_keys(password)?; + + // storage + let storage = self.storage_creds_from_ldap_user(&user).await?; + + drop(ldap); + + Ok(Credentials { storage, keys }) + } + + async fn public_login(&self, email: &str) -> Result<PublicCredentials> { + check_identifier(email)?; + + let (dn, pw) = match self.bind_dn_and_pw.as_ref() { + Some(x) => x, + None => bail!("Missing bind_dn and bind_password in LDAP login provider config"), + }; + + let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?; + ldap3::drive!(conn); + ldap.simple_bind(dn, pw).await?.success()?; + + let (matches, _res) = ldap + .search( + &self.search_base, + Scope::Subtree, + &format!( + "(&(objectClass=inetOrgPerson)({}={}))", + self.mail_attr, email + ), + &self.attrs_to_retrieve, + ) + .await? + .success()?; + + if matches.is_empty() { + bail!("No such user account"); + } + if matches.len() > 1 { + bail!("Multiple matching user accounts"); + } + let user = SearchEntry::construct(matches.into_iter().next().unwrap()); + debug!("Found matching LDAP user for email {}: {}", email, user.dn); + + // cryptography + let crstr = get_attr(&user, &self.crypto_root_attr)?; + let cr = CryptoRoot(crstr); + let public_key = cr.public_key()?; + + // storage + let storage = self.storage_creds_from_ldap_user(&user).await?; + drop(ldap); + + Ok(PublicCredentials { + storage, + public_key, + }) + } +} + +fn get_attr(user: &SearchEntry, attr: &str) -> Result<String> { + Ok(user + .attrs + .get(attr) + .ok_or(anyhow!("Missing attr: {}", attr))? + .iter() + .next() + .ok_or(anyhow!("No value for attr: {}", attr))? + .clone()) +} + +fn check_identifier(id: &str) -> Result<()> { + let is_ok = id + .chars() + .all(|c| c.is_alphanumeric() || "-+_.@".contains(c)); + if !is_ok { + bail!("Invalid username/email address, must contain only a-z A-Z 0-9 - + _ . @"); + } + Ok(()) +} diff --git a/aero-user/src/login/mod.rs b/aero-user/src/login/mod.rs new file mode 100644 index 0000000..5e54b4a --- /dev/null +++ b/aero-user/src/login/mod.rs @@ -0,0 +1,245 @@ +pub mod demo_provider; +pub mod ldap_provider; +pub mod static_provider; + +use std::sync::Arc; + +use anyhow::{anyhow, bail, Context, Result}; +use async_trait::async_trait; +use base64::Engine; +use rand::prelude::*; + +use crate::cryptoblob::*; +use crate::storage::*; + +/// The trait LoginProvider defines the interface for a login provider that allows +/// to retrieve storage and cryptographic credentials for access to a user account +/// from their username and password. +#[async_trait] +pub trait LoginProvider { + /// The login method takes an account's password as an input to decypher + /// decryption keys and obtain full access to the user's account. + async fn login(&self, username: &str, password: &str) -> Result<Credentials>; + /// The public_login method takes an account's email address and returns + /// public credentials for adding mails to the user's inbox. + async fn public_login(&self, email: &str) -> Result<PublicCredentials>; +} + +/// ArcLoginProvider is simply an alias on a structure that is used +/// in many places in the code +pub type ArcLoginProvider = Arc<dyn LoginProvider + Send + Sync>; + +/// The struct Credentials represent all of the necessary information to interact +/// with a user account's data after they are logged in. +#[derive(Clone, Debug)] +pub struct Credentials { + /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) + pub storage: Builder, + /// The cryptographic keys are used to encrypt and decrypt data stored in S3 and K2V + pub keys: CryptoKeys, +} + +#[derive(Clone, Debug)] +pub struct PublicCredentials { + /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) + pub storage: Builder, + pub public_key: PublicKey, +} + +use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CryptoRoot(pub String); + +impl CryptoRoot { + pub fn create_pass(password: &str, k: &CryptoKeys) -> Result<Self> { + let bytes = k.password_seal(password)?; + let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); + let cr = format!("aero:cryptoroot:pass:{}", b64); + Ok(Self(cr)) + } + + pub fn create_cleartext(k: &CryptoKeys) -> Self { + let bytes = k.serialize(); + let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); + let cr = format!("aero:cryptoroot:cleartext:{}", b64); + Self(cr) + } + + pub fn create_incoming(pk: &PublicKey) -> Self { + let bytes: &[u8] = &pk[..]; + let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); + let cr = format!("aero:cryptoroot:incoming:{}", b64); + Self(cr) + } + + pub fn public_key(&self) -> Result<PublicKey> { + match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { + ["aero", "cryptoroot", "pass", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + if blob.len() < 32 { + bail!( + "Decoded data is {} bytes long, expect at least 32 bytes", + blob.len() + ); + } + PublicKey::from_slice(&blob[..32]).context("must be a valid public key") + } + ["aero", "cryptoroot", "cleartext", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + Ok(CryptoKeys::deserialize(&blob)?.public) + } + ["aero", "cryptoroot", "incoming", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + if blob.len() < 32 { + bail!( + "Decoded data is {} bytes long, expect at least 32 bytes", + blob.len() + ); + } + PublicKey::from_slice(&blob[..32]).context("must be a valid public key") + } + ["aero", "cryptoroot", "keyring", _] => { + bail!("keyring is not yet implemented!") + } + _ => bail!(format!( + "passed string '{}' is not a valid cryptoroot", + self.0 + )), + } + } + pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> { + match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { + ["aero", "cryptoroot", "pass", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + CryptoKeys::password_open(password, &blob) + } + ["aero", "cryptoroot", "cleartext", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + CryptoKeys::deserialize(&blob) + } + ["aero", "cryptoroot", "incoming", _] => { + bail!("incoming cryptoroot does not contain a crypto key!") + } + ["aero", "cryptoroot", "keyring", _] => { + bail!("keyring is not yet implemented!") + } + _ => bail!(format!( + "passed string '{}' is not a valid cryptoroot", + self.0 + )), + } + } +} + +/// The struct CryptoKeys contains the cryptographic keys used to encrypt and decrypt +/// data in a user's mailbox. +#[derive(Clone, Debug)] +pub struct CryptoKeys { + /// Master key for symmetric encryption of mailbox data + pub master: Key, + /// Public/private keypair for encryption of incomming emails (secret part) + pub secret: SecretKey, + /// Public/private keypair for encryption of incomming emails (public part) + pub public: PublicKey, +} + +// ---- + +impl CryptoKeys { + /// Initialize a new cryptography root + pub fn init() -> Self { + let (public, secret) = gen_keypair(); + let master = gen_key(); + CryptoKeys { + master, + secret, + public, + } + } + + // Clear text serialize/deserialize + /// Serialize the root as bytes without encryption + fn serialize(&self) -> [u8; 64] { + let mut res = [0u8; 64]; + res[..32].copy_from_slice(self.master.as_ref()); + res[32..].copy_from_slice(self.secret.as_ref()); + res + } + + /// Deserialize a clear text crypto root without encryption + fn deserialize(bytes: &[u8]) -> Result<Self> { + if bytes.len() != 64 { + bail!("Invalid length: {}, expected 64", bytes.len()); + } + let master = Key::from_slice(&bytes[..32]).unwrap(); + let secret = SecretKey::from_slice(&bytes[32..]).unwrap(); + let public = secret.public_key(); + Ok(Self { + master, + secret, + public, + }) + } + + // Password sealed keys serialize/deserialize + pub fn password_open(password: &str, blob: &[u8]) -> Result<Self> { + let _pubkey = &blob[0..32]; + let kdf_salt = &blob[32..64]; + let password_openned = try_open_encrypted_keys(kdf_salt, password, &blob[64..])?; + + let keys = Self::deserialize(&password_openned)?; + Ok(keys) + } + + pub fn password_seal(&self, password: &str) -> Result<Vec<u8>> { + let mut kdf_salt = [0u8; 32]; + thread_rng().fill(&mut kdf_salt); + + // Calculate key for password secret box + let password_key = derive_password_key(&kdf_salt, password)?; + + // Seal a secret box that contains our crypto keys + let password_sealed = seal(&self.serialize(), &password_key)?; + + // Create blob + let password_blob = [&self.public[..], &kdf_salt[..], &password_sealed].concat(); + + Ok(password_blob) + } +} + +fn derive_password_key(kdf_salt: &[u8], password: &str) -> Result<Key> { + Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap()) +} + +fn try_open_encrypted_keys( + kdf_salt: &[u8], + password: &str, + encrypted_keys: &[u8], +) -> Result<Vec<u8>> { + let password_key = derive_password_key(kdf_salt, password)?; + open(encrypted_keys, &password_key) +} + +// ---- UTIL ---- + +pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> { + use argon2::{password_hash, Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version}; + + let params = ParamsBuilder::new() + .output_len(output_len) + .build() + .map_err(|e| anyhow!("Invalid argon2 params: {}", e))?; + let argon2 = Argon2::new(Algorithm::default(), Version::default(), params); + + let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt); + let valid_salt = password_hash::Salt::from_b64(&b64_salt) + .map_err(|e| anyhow!("Invalid salt, error {}", e))?; + let hash = argon2 + .hash_password(password, valid_salt) + .map_err(|e| anyhow!("Unable to hash: {}", e))?; + + let hash = hash.hash.ok_or(anyhow!("Missing output"))?; + assert!(hash.len() == output_len); + Ok(hash.as_bytes().to_vec()) +} diff --git a/aero-user/src/login/static_provider.rs b/aero-user/src/login/static_provider.rs new file mode 100644 index 0000000..ed39343 --- /dev/null +++ b/aero-user/src/login/static_provider.rs @@ -0,0 +1,188 @@ +use std::collections::HashMap; +use std::path::PathBuf; + +use anyhow::{anyhow, bail}; +use async_trait::async_trait; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::watch; + +use crate::config::*; +use crate::login::*; +use crate::storage; + +pub struct ContextualUserEntry { + pub username: String, + pub config: UserEntry, +} + +#[derive(Default)] +pub struct UserDatabase { + users: HashMap<String, Arc<ContextualUserEntry>>, + users_by_email: HashMap<String, Arc<ContextualUserEntry>>, +} + +pub struct StaticLoginProvider { + user_db: watch::Receiver<UserDatabase>, + in_memory_store: storage::in_memory::MemDb, + garage_store: storage::garage::GarageRoot, +} + +pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> { + let mut stream = signal(SignalKind::user_defined1()) + .expect("failed to install SIGUSR1 signal hander for reload"); + + loop { + let ulist: UserList = match read_config(config.clone()) { + Ok(x) => x, + Err(e) => { + tracing::warn!(path=%config.as_path().to_string_lossy(), error=%e, "Unable to load config"); + stream.recv().await; + continue; + } + }; + + let users = ulist + .into_iter() + .map(|(username, config)| { + ( + username.clone(), + Arc::new(ContextualUserEntry { username, config }), + ) + }) + .collect::<HashMap<_, _>>(); + + let mut users_by_email = HashMap::new(); + for (_, u) in users.iter() { + for m in u.config.email_addresses.iter() { + if users_by_email.contains_key(m) { + tracing::warn!("Several users have the same email address: {}", m); + stream.recv().await; + continue; + } + users_by_email.insert(m.clone(), u.clone()); + } + } + + tracing::info!("{} users loaded", users.len()); + up.send(UserDatabase { + users, + users_by_email, + }) + .context("update user db config")?; + stream.recv().await; + tracing::info!("Received SIGUSR1, reloading"); + } +} + +impl StaticLoginProvider { + pub async fn new(config: LoginStaticConfig) -> Result<Self> { + let (tx, mut rx) = watch::channel(UserDatabase::default()); + + tokio::spawn(update_user_list(config.user_list, tx)); + rx.changed().await?; + + Ok(Self { + user_db: rx, + in_memory_store: storage::in_memory::MemDb::new(), + garage_store: storage::garage::GarageRoot::new()?, + }) + } +} + +#[async_trait] +impl LoginProvider for StaticLoginProvider { + async fn login(&self, username: &str, password: &str) -> Result<Credentials> { + tracing::debug!(user=%username, "login"); + let user = { + let user_db = self.user_db.borrow(); + match user_db.users.get(username) { + None => bail!("User {} does not exist", username), + Some(u) => u.clone(), + } + }; + + tracing::debug!(user=%username, "verify password"); + if !verify_password(password, &user.config.password)? { + bail!("Wrong password"); + } + + tracing::debug!(user=%username, "fetch keys"); + let storage: storage::Builder = match &user.config.storage { + StaticStorage::InMemory => self.in_memory_store.builder(username).await, + StaticStorage::Garage(grgconf) => { + self.garage_store.user(storage::garage::GarageConf { + region: grgconf.aws_region.clone(), + k2v_endpoint: grgconf.k2v_endpoint.clone(), + s3_endpoint: grgconf.s3_endpoint.clone(), + aws_access_key_id: grgconf.aws_access_key_id.clone(), + aws_secret_access_key: grgconf.aws_secret_access_key.clone(), + bucket: grgconf.bucket.clone(), + })? + } + }; + + let cr = CryptoRoot(user.config.crypto_root.clone()); + let keys = cr.crypto_keys(password)?; + + tracing::debug!(user=%username, "logged"); + Ok(Credentials { storage, keys }) + } + + async fn public_login(&self, email: &str) -> Result<PublicCredentials> { + let user = { + let user_db = self.user_db.borrow(); + match user_db.users_by_email.get(email) { + None => bail!("Email {} does not exist", email), + Some(u) => u.clone(), + } + }; + tracing::debug!(user=%user.username, "public_login"); + + let storage: storage::Builder = match &user.config.storage { + StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await, + StaticStorage::Garage(grgconf) => { + self.garage_store.user(storage::garage::GarageConf { + region: grgconf.aws_region.clone(), + k2v_endpoint: grgconf.k2v_endpoint.clone(), + s3_endpoint: grgconf.s3_endpoint.clone(), + aws_access_key_id: grgconf.aws_access_key_id.clone(), + aws_secret_access_key: grgconf.aws_secret_access_key.clone(), + bucket: grgconf.bucket.clone(), + })? + } + }; + + let cr = CryptoRoot(user.config.crypto_root.clone()); + let public_key = cr.public_key()?; + + Ok(PublicCredentials { + storage, + public_key, + }) + } +} + +pub fn hash_password(password: &str) -> Result<String> { + use argon2::{ + password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, + Argon2, + }; + let salt = SaltString::generate(&mut OsRng); + let argon2 = Argon2::default(); + Ok(argon2 + .hash_password(password.as_bytes(), &salt) + .map_err(|e| anyhow!("Argon2 error: {}", e))? + .to_string()) +} + +pub fn verify_password(password: &str, hash: &str) -> Result<bool> { + use argon2::{ + password_hash::{PasswordHash, PasswordVerifier}, + Argon2, + }; + let parsed_hash = + PasswordHash::new(hash).map_err(|e| anyhow!("Invalid hashed password: {}", e))?; + Ok(Argon2::default() + .verify_password(password.as_bytes(), &parsed_hash) + .is_ok()) +} diff --git a/aero-user/src/storage/garage.rs b/aero-user/src/storage/garage.rs new file mode 100644 index 0000000..7e930c3 --- /dev/null +++ b/aero-user/src/storage/garage.rs @@ -0,0 +1,538 @@ +use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use aws_smithy_runtime_api::client::http::SharedHttpClient; +use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; +use hyper_util::rt::TokioExecutor; +use serde::Serialize; + +use super::*; + +pub struct GarageRoot { + k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>, + aws_http: SharedHttpClient, +} + +impl GarageRoot { + pub fn new() -> anyhow::Result<Self> { + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots()? + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector); + let aws_http = HyperClientBuilder::new().build_https(); + Ok(Self { k2v_http, aws_http }) + } + + pub fn user(&self, conf: GarageConf) -> anyhow::Result<Arc<GarageUser>> { + let mut unicity: Vec<u8> = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.append(&mut rmp_serde::to_vec(&conf)?); + + Ok(Arc::new(GarageUser { + conf, + aws_http: self.aws_http.clone(), + k2v_http: self.k2v_http.clone(), + unicity, + })) + } +} + +#[derive(Clone, Debug, Serialize)] +pub struct GarageConf { + pub region: String, + pub s3_endpoint: String, + pub k2v_endpoint: String, + pub aws_access_key_id: String, + pub aws_secret_access_key: String, + pub bucket: String, +} + +//@FIXME we should get rid of this builder +//and allocate a S3 + K2V client only once per user +//(and using a shared HTTP client) +#[derive(Clone, Debug)] +pub struct GarageUser { + conf: GarageConf, + aws_http: SharedHttpClient, + k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>, + unicity: Vec<u8>, +} + +#[async_trait] +impl IBuilder for GarageUser { + async fn build(&self) -> Result<Store, StorageError> { + let s3_creds = s3::config::Credentials::new( + self.conf.aws_access_key_id.clone(), + self.conf.aws_secret_access_key.clone(), + None, + None, + "aerogramme", + ); + + let sdk_config = aws_config::from_env() + .region(aws_config::Region::new(self.conf.region.clone())) + .credentials_provider(s3_creds) + .http_client(self.aws_http.clone()) + .endpoint_url(self.conf.s3_endpoint.clone()) + .load() + .await; + + let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + + let k2v_config = k2v_client::K2vClientConfig { + endpoint: self.conf.k2v_endpoint.clone(), + region: self.conf.region.clone(), + aws_access_key_id: self.conf.aws_access_key_id.clone(), + aws_secret_access_key: self.conf.aws_secret_access_key.clone(), + bucket: self.conf.bucket.clone(), + user_agent: None, + }; + + let k2v_client = + match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) { + Err(e) => { + tracing::error!("unable to build k2v client: {}", e); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; + + Ok(Box::new(GarageStore { + bucket: self.conf.bucket.clone(), + s3: s3_client, + k2v: k2v_client, + })) + } + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } +} + +pub struct GarageStore { + bucket: String, + s3: s3::Client, + k2v: k2v_client::K2vClient, +} + +fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { + let new_row_ref = row_ref.with_causality(causal_value.causality.into()); + let row_values = causal_value + .value + .into_iter() + .map(|k2v_value| match k2v_value { + k2v_client::K2vValue::Tombstone => Alternative::Tombstone, + k2v_client::K2vValue::Value(v) => Alternative::Value(v), + }) + .collect::<Vec<_>>(); + + RowVal { + row_ref: new_row_ref, + value: row_values, + } +} + +#[async_trait] +impl IStore for GarageStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); + let (pk_list, batch_op) = match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + start: Some(sort_begin), + end: Some(sort_end), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }], + ), + Selector::List(row_ref_list) => ( + row_ref_list + .iter() + .map(|row_ref| row_ref.uid.shard.to_string()) + .collect::<Vec<_>>(), + row_ref_list + .iter() + .map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }) + .collect::<Vec<_>>(), + ), + Selector::Prefix { shard, sort_prefix } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + prefix: Some(sort_prefix), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }], + ), + Selector::Single(row_ref) => { + let causal_value = match self + .k2v + .read_item(&row_ref.uid.shard, &row_ref.uid.sort) + .await + { + Err(k2v_client::Error::NotFound) => { + tracing::debug!( + "K2V item not found shard={}, sort={}, bucket={}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + ); + return Err(StorageError::NotFound); + } + Err(e) => { + tracing::error!( + "K2V read item shard={}, sort={}, bucket={} failed: {}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + e + ); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; + + let row_val = causal_to_row_val((*row_ref).clone(), causal_value); + return Ok(vec![row_val]); + } + }; + + let all_raw_res = match self.k2v.read_batch(&batch_op).await { + Err(e) => { + tracing::error!( + "k2v read batch failed for {:?}, bucket {} with err: {}", + select, + self.bucket, + e + ); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; + //println!("fetch res -> {:?}", all_raw_res); + + let row_vals = + all_raw_res + .into_iter() + .zip(pk_list.into_iter()) + .fold(vec![], |mut acc, (page, pk)| { + page.items + .into_iter() + .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) + .for_each(|rr| acc.push(rr)); + + acc + }); + tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch"); + + Ok(row_vals) + } + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); + let del_op = match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: None, + start: Some(sort_begin), + end: Some(sort_end), + single_item: false, + }], + Selector::List(row_ref_list) => { + // Insert null values with causality token = delete + let batch_op = row_ref_list + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }) + .collect::<Vec<_>>(); + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + } + Ok(_) => Ok(()), + }; + } + Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: Some(sort_prefix), + start: None, + end: None, + single_item: false, + }], + Selector::Single(row_ref) => { + // Insert null values with causality token = delete + let batch_op = vec![k2v_client::BatchInsertOp { + partition_key: &row_ref.uid.shard, + sort_key: &row_ref.uid.sort, + causality: row_ref.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }]; + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + } + Ok(_) => Ok(()), + }; + } + }; + + // Finally here we only have prefix & range + match self.k2v.delete_batch(&del_op).await { + Err(e) => { + tracing::error!("delete batch error: {}", e); + Err(StorageError::Internal) + } + Ok(_) => Ok(()), + } + } + + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); + let batch_ops = values + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.row_ref.uid.shard, + sort_key: &v.row_ref.uid.sort, + causality: v.row_ref.causality.clone().map(|ct| ct.into()), + value: v + .value + .iter() + .next() + .map(|cv| match cv { + Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), + Alternative::Tombstone => k2v_client::K2vValue::Tombstone, + }) + .unwrap_or(k2v_client::K2vValue::Tombstone), + }) + .collect::<Vec<_>>(); + + match self.k2v.insert_batch(&batch_ops).await { + Err(e) => { + tracing::error!("k2v can't insert some value: {}", e); + Err(StorageError::Internal) + } + Ok(v) => Ok(v), + } + } + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + tracing::trace!(entry=%value, command="row_poll"); + loop { + if let Some(ct) = &value.causality { + match self + .k2v + .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None) + .await + { + Err(e) => { + tracing::error!("Unable to poll item: {}", e); + return Err(StorageError::Internal); + } + Ok(None) => continue, + Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)), + } + } else { + match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { + Err(k2v_client::Error::NotFound) => { + self.k2v + .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) + .await + .map_err(|e| { + tracing::error!("Unable to insert item in polling logic: {}", e); + StorageError::Internal + })?; + } + Err(e) => { + tracing::error!("Unable to read item in polling logic: {}", e); + return Err(StorageError::Internal); + } + Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), + } + } + } + } + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); + let maybe_out = self + .s3 + .get_object() + .bucket(self.bucket.to_string()) + .key(blob_ref.0.to_string()) + .send() + .await; + + let object_output = match maybe_out { + Ok(output) => output, + Err(SdkError::ServiceError(x)) => match x.err() { + GetObjectError::NoSuchKey(_) => return Err(StorageError::NotFound), + e => { + tracing::warn!("Blob Fetch Error, Service Error: {}", e); + return Err(StorageError::Internal); + } + }, + Err(e) => { + tracing::warn!("Blob Fetch Error, {}", e); + return Err(StorageError::Internal); + } + }; + + let buffer = match object_output.body.collect().await { + Ok(aggreg) => aggreg.to_vec(), + Err(e) => { + tracing::warn!("Fetching body failed with {}", e); + return Err(StorageError::Internal); + } + }; + + let mut bv = BlobVal::new(blob_ref.clone(), buffer); + if let Some(meta) = object_output.metadata { + bv.meta = meta; + } + tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0); + Ok(bv) + } + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + + let maybe_send = self + .s3 + .put_object() + .bucket(self.bucket.to_string()) + .key(blob_val.blob_ref.0.to_string()) + .set_metadata(Some(blob_val.meta)) + .body(streamable_value) + .send() + .await; + + match maybe_send { + Err(e) => { + tracing::error!("unable to send object: {}", e); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0); + Ok(()) + } + } + } + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); + let maybe_copy = self + .s3 + .copy_object() + .bucket(self.bucket.to_string()) + .key(dst.0.clone()) + .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone())) + .send() + .await; + + match maybe_copy { + Err(e) => { + tracing::error!( + "unable to copy object {} to {} (bucket: {}), error: {}", + src.0, + dst.0, + self.bucket, + e + ); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); + Ok(()) + } + } + } + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + tracing::trace!(prefix = prefix, command = "blob_list"); + let maybe_list = self + .s3 + .list_objects_v2() + .bucket(self.bucket.to_string()) + .prefix(prefix) + .into_paginator() + .send() + .try_collect() + .await; + + match maybe_list { + Err(e) => { + tracing::error!( + "listing prefix {} on bucket {} failed: {}", + prefix, + self.bucket, + e + ); + Err(StorageError::Internal) + } + Ok(pagin_list_out) => Ok(pagin_list_out + .into_iter() + .map(|list_out| list_out.contents.unwrap_or(vec![])) + .flatten() + .map(|obj| BlobRef(obj.key.unwrap_or(String::new()))) + .collect::<Vec<_>>()), + } + } + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); + let maybe_delete = self + .s3 + .delete_object() + .bucket(self.bucket.to_string()) + .key(blob_ref.0.clone()) + .send() + .await; + + match maybe_delete { + Err(e) => { + tracing::error!( + "unable to delete {} (bucket: {}), error {}", + blob_ref.0, + self.bucket, + e + ); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); + Ok(()) + } + } + } +} diff --git a/aero-user/src/storage/in_memory.rs b/aero-user/src/storage/in_memory.rs new file mode 100644 index 0000000..a676797 --- /dev/null +++ b/aero-user/src/storage/in_memory.rs @@ -0,0 +1,334 @@ +use crate::storage::*; +use std::collections::BTreeMap; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; +use std::sync::RwLock; +use tokio::sync::Notify; + +/// This implementation is very inneficient, and not completely correct +/// Indeed, when the connector is dropped, the memory is freed. +/// It means that when a user disconnects, its data are lost. +/// It's intended only for basic debugging, do not use it for advanced tests... + +#[derive(Debug, Default)] +pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>); +impl MemDb { + pub fn new() -> Self { + Self(tokio::sync::Mutex::new(HashMap::new())) + } + + pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { + let mut global_storage = self.0.lock().await; + global_storage + .entry(username.to_string()) + .or_insert(MemBuilder::new(username)) + .clone() + } +} + +#[derive(Debug, Clone)] +enum InternalData { + Tombstone, + Value(Vec<u8>), +} +impl InternalData { + fn to_alternative(&self) -> Alternative { + match self { + Self::Tombstone => Alternative::Tombstone, + Self::Value(x) => Alternative::Value(x.clone()), + } + } +} + +#[derive(Debug)] +struct InternalRowVal { + data: Vec<InternalData>, + version: u64, + change: Arc<Notify>, +} +impl std::default::Default for InternalRowVal { + fn default() -> Self { + Self { + data: vec![], + version: 1, + change: Arc::new(Notify::new()), + } + } +} +impl InternalRowVal { + fn concurrent_values(&self) -> Vec<Alternative> { + self.data.iter().map(InternalData::to_alternative).collect() + } + + fn to_row_val(&self, row_ref: RowRef) -> RowVal { + RowVal { + row_ref: row_ref.with_causality(self.version.to_string()), + value: self.concurrent_values(), + } + } +} + +#[derive(Debug, Default, Clone)] +struct InternalBlobVal { + data: Vec<u8>, + metadata: HashMap<String, String>, +} +impl InternalBlobVal { + fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { + BlobVal { + blob_ref: bref.clone(), + meta: self.metadata.clone(), + value: self.data.clone(), + } + } +} + +type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>; +type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>; + +#[derive(Clone, Debug)] +pub struct MemBuilder { + unicity: Vec<u8>, + row: ArcRow, + blob: ArcBlob, +} + +impl MemBuilder { + pub fn new(user: &str) -> Arc<Self> { + tracing::debug!("initialize membuilder for {}", user); + let mut unicity: Vec<u8> = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.extend_from_slice(user.as_bytes()); + Arc::new(Self { + unicity, + row: Arc::new(RwLock::new(HashMap::new())), + blob: Arc::new(RwLock::new(BTreeMap::new())), + }) + } +} + +#[async_trait] +impl IBuilder for MemBuilder { + async fn build(&self) -> Result<Store, StorageError> { + Ok(Box::new(MemStore { + row: self.row.clone(), + blob: self.blob.clone(), + })) + } + + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } +} + +pub struct MemStore { + row: ArcRow, + blob: ArcBlob, +} + +fn prefix_last_bound(prefix: &str) -> Bound<String> { + let mut sort_end = prefix.to_string(); + match sort_end.pop() { + None => Unbounded, + Some(ch) => { + let nc = char::from_u32(ch as u32 + 1).unwrap(); + sort_end.push(nc); + Excluded(sort_end) + } + } +} + +impl MemStore { + fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + tracing::trace!(entry=%entry, command="row_rm_single"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let shard = &entry.uid.shard; + let sort = &entry.uid.sort; + + let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Tombstone); + intval.version += 1; + intval.change.notify_waiters(); + + Ok(()) + } +} + +#[async_trait] +impl IStore for MemStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); + let store = self.row.read().or(Err(StorageError::Internal))?; + + match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range(( + Included(sort_begin.to_string()), + Excluded(sort_end.to_string()), + )) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()), + Selector::List(rlist) => { + let mut acc = vec![]; + for row_ref in rlist { + let maybe_intval = store + .get(&row_ref.uid.shard) + .map(|v| v.get(&row_ref.uid.sort)) + .flatten(); + if let Some(intval) = maybe_intval { + acc.push(intval.to_row_val(row_ref.clone())); + } + } + Ok(acc) + } + Selector::Prefix { shard, sort_prefix } => { + let last_bound = prefix_last_bound(sort_prefix); + + Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range((Included(sort_prefix.to_string()), last_bound)) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()) + } + Selector::Single(row_ref) => { + let intval = store + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; + Ok(vec![intval.to_row_val((*row_ref).clone())]) + } + } + } + + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); + + let values = match select { + Selector::Range { .. } | Selector::Prefix { .. } => self + .row_fetch(select) + .await? + .into_iter() + .map(|rv| rv.row_ref) + .collect::<Vec<_>>(), + Selector::List(rlist) => rlist.clone(), + Selector::Single(row_ref) => vec![(*row_ref).clone()], + }; + + for v in values.into_iter() { + self.row_rm_single(&v)?; + } + Ok(()) + } + + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + for v in values.into_iter() { + let shard = v.row_ref.uid.shard; + let sort = v.row_ref.uid.sort; + + let val = match v.value.into_iter().next() { + Some(Alternative::Value(x)) => x, + _ => vec![], + }; + + let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard).or_default(); + let intval = bt.entry(sort).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Value(val)); + intval.version += 1; + intval.change.notify_waiters(); + } + Ok(()) + } + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + tracing::trace!(entry=%value, command="row_poll"); + let shard = &value.uid.shard; + let sort = &value.uid.sort; + let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let notify_me = { + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if intval.version != cauz { + return Ok(intval.to_row_val(value.clone())); + } + intval.change.clone() + }; + + notify_me.notified().await; + + let res = self.row_fetch(&Selector::Single(value)).await?; + res.into_iter().next().ok_or(StorageError::NotFound) + } + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); + let store = self.blob.read().or(Err(StorageError::Internal))?; + store + .get(&blob_ref.0) + .ok_or(StorageError::NotFound) + .map(|v| v.to_blob_val(blob_ref)) + } + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); + entry.data = blob_val.value.clone(); + entry.metadata = blob_val.meta.clone(); + Ok(()) + } + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let blob_src = store.entry(src.0.clone()).or_default().clone(); + store.insert(dst.0.clone(), blob_src); + Ok(()) + } + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + tracing::trace!(prefix = prefix, command = "blob_list"); + let store = self.blob.read().or(Err(StorageError::Internal))?; + let last_bound = prefix_last_bound(prefix); + let blist = store + .range((Included(prefix.to_string()), last_bound)) + .map(|(k, _)| BlobRef(k.to_string())) + .collect::<Vec<_>>(); + Ok(blist) + } + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + store.remove(&blob_ref.0); + Ok(()) + } +} diff --git a/aero-user/src/storage/mod.rs b/aero-user/src/storage/mod.rs new file mode 100644 index 0000000..f5eb8d3 --- /dev/null +++ b/aero-user/src/storage/mod.rs @@ -0,0 +1,180 @@ +/* + * + * This abstraction goal is to leverage all the semantic of Garage K2V+S3, + * to be as tailored as possible to it ; it aims to be a zero-cost abstraction + * compared to when we where directly using the K2V+S3 client. + * + * My idea: we can encapsulate the causality token + * into the object system so it is not exposed. + */ + +pub mod garage; +pub mod in_memory; + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +use async_trait::async_trait; + +#[derive(Debug, Clone)] +pub enum Alternative { + Tombstone, + Value(Vec<u8>), +} +type ConcurrentValues = Vec<Alternative>; + +#[derive(Debug, Clone)] +pub enum StorageError { + NotFound, + Internal, +} +impl std::fmt::Display for StorageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Storage Error: ")?; + match self { + Self::NotFound => f.write_str("Item not found"), + Self::Internal => f.write_str("An internal error occured"), + } + } +} +impl std::error::Error for StorageError {} + +#[derive(Debug, Clone, PartialEq)] +pub struct RowUid { + pub shard: String, + pub sort: String, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct RowRef { + pub uid: RowUid, + pub causality: Option<String>, +} +impl std::fmt::Display for RowRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RowRef({}, {}, {:?})", + self.uid.shard, self.uid.sort, self.causality + ) + } +} + +impl RowRef { + pub fn new(shard: &str, sort: &str) -> Self { + Self { + uid: RowUid { + shard: shard.to_string(), + sort: sort.to_string(), + }, + causality: None, + } + } + pub fn with_causality(mut self, causality: String) -> Self { + self.causality = Some(causality); + self + } +} + +#[derive(Debug, Clone)] +pub struct RowVal { + pub row_ref: RowRef, + pub value: ConcurrentValues, +} + +impl RowVal { + pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self { + Self { + row_ref, + value: vec![Alternative::Value(value)], + } + } +} + +#[derive(Debug, Clone)] +pub struct BlobRef(pub String); +impl std::fmt::Display for BlobRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BlobRef({})", self.0) + } +} + +#[derive(Debug, Clone)] +pub struct BlobVal { + pub blob_ref: BlobRef, + pub meta: HashMap<String, String>, + pub value: Vec<u8>, +} +impl BlobVal { + pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self { + Self { + blob_ref, + value, + meta: HashMap::new(), + } + } + + pub fn with_meta(mut self, k: String, v: String) -> Self { + self.meta.insert(k, v); + self + } +} + +#[derive(Debug)] +pub enum Selector<'a> { + Range { + shard: &'a str, + sort_begin: &'a str, + sort_end: &'a str, + }, + List(Vec<RowRef>), // list of (shard_key, sort_key) + #[allow(dead_code)] + Prefix { + shard: &'a str, + sort_prefix: &'a str, + }, + Single(&'a RowRef), +} +impl<'a> std::fmt::Display for Selector<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Range { + shard, + sort_begin, + sort_end, + } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), + Self::List(list) => write!(f, "List({:?})", list), + Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), + Self::Single(row_ref) => write!(f, "Single({})", row_ref), + } + } +} + +#[async_trait] +pub trait IStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>; + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>; + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>; + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>; + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>; + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>; + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct UnicityBuffer(Vec<u8>); + +#[async_trait] +pub trait IBuilder: std::fmt::Debug { + async fn build(&self) -> Result<Store, StorageError>; + + /// Returns an opaque buffer that uniquely identifies this builder + fn unique(&self) -> UnicityBuffer; +} + +pub type Builder = Arc<dyn IBuilder + Send + Sync>; +pub type Store = Box<dyn IStore + Send + Sync>; |