aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bayou.rs54
-rw-r--r--src/login/ldap_provider.rs53
-rw-r--r--src/login/mod.rs70
-rw-r--r--src/login/static_provider.rs63
-rw-r--r--src/mail/incoming.rs21
-rw-r--r--src/mail/mailbox.rs64
-rw-r--r--src/mail/user.rs6
-rw-r--r--src/main.rs179
-rw-r--r--src/server.rs19
-rw-r--r--src/storage/garage.rs252
-rw-r--r--src/storage/in_memory.rs71
-rw-r--r--src/storage/mod.rs41
12 files changed, 546 insertions, 347 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 1361e49..7253a30 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -9,9 +9,8 @@ use tokio::sync::{watch, Notify};
use crate::cryptoblob::*;
use crate::login::Credentials;
-use crate::timestamp::*;
use crate::storage;
-
+use crate::timestamp::*;
const KEEP_STATE_EVERY: usize = 64;
@@ -94,7 +93,11 @@ impl<S: BayouState> Bayou<S> {
} else {
debug!("(sync) loading checkpoint: {}", key);
- let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value;
+ let buf = self
+ .storage
+ .blob_fetch(&storage::BlobRef(key.to_string()))
+ .await?
+ .value;
debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?;
@@ -125,17 +128,22 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
- let ops_map = self.storage.row_fetch(&storage::Selector::Range {
- shard: &self.path,
- sort_begin: &ts_ser,
- sort_end: WATCH_SK
- }).await?;
+ let ops_map = self
+ .storage
+ .row_fetch(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: &ts_ser,
+ sort_end: WATCH_SK,
+ })
+ .await?;
let mut ops = vec![];
for row_value in ops_map {
let row = row_value.row_ref;
let sort_key = row.uid.sort;
- let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
+ let ts = sort_key
+ .parse::<Timestamp>()
+ .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
let val = row_value.value;
if val.len() != 1 {
@@ -211,7 +219,7 @@ impl<S: BayouState> Bayou<S> {
// Save info that sync has been done
self.last_sync = new_last_sync;
- self.last_sync_watch_ct = new_last_sync_watch_ct;
+ self.last_sync_watch_ct = new_last_sync_watch_ct;
Ok(())
}
@@ -362,16 +370,20 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key);
- self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?;
+ self.storage
+ .blob_rm(&storage::BlobRef(key.to_string()))
+ .await?;
}
// Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
- self.storage.row_rm(&storage::Selector::Range {
- shard: &self.path,
- sort_begin: "",
- sort_end: &ts_ser
- }).await?
+ self.storage
+ .row_rm(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: "",
+ sort_end: &ts_ser,
+ })
+ .await?
}
Ok(())
@@ -426,11 +438,7 @@ impl K2vWatch {
let watch = Arc::new(K2vWatch { target, rx, notify });
- tokio::spawn(Self::background_task(
- Arc::downgrade(&watch),
- storage,
- tx,
- ));
+ tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
Ok(watch)
}
@@ -444,8 +452,8 @@ impl K2vWatch {
Some(this) => this.target.clone(),
None => {
error!("can't start loop");
- return
- },
+ return;
+ }
};
while let Some(this) = Weak::upgrade(&self_weak) {
diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs
index a7f56e4..81e5879 100644
--- a/src/login/ldap_provider.rs
+++ b/src/login/ldap_provider.rs
@@ -30,7 +30,10 @@ enum BucketSource {
enum StorageSpecific {
InMemory,
- Garage { from_config: LdapGarageConfig, bucket_source: BucketSource },
+ Garage {
+ from_config: LdapGarageConfig,
+ bucket_source: BucketSource,
+ },
}
impl LdapLoginProvider {
@@ -57,22 +60,24 @@ impl LdapLoginProvider {
let specific = match config.storage {
LdapStorage::InMemory => StorageSpecific::InMemory,
LdapStorage::Garage(grgconf) => {
- let bucket_source = match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) {
- (Some(b), None) => BucketSource::Constant(b),
- (None, Some(a)) => BucketSource::Attr(a),
- _ => bail!("Must set `bucket` or `bucket_attr`, but not both"),
- };
+ let bucket_source =
+ match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) {
+ (Some(b), None) => BucketSource::Constant(b),
+ (None, Some(a)) => BucketSource::Attr(a),
+ _ => bail!("Must set `bucket` or `bucket_attr`, but not both"),
+ };
if let BucketSource::Attr(a) = &bucket_source {
attrs_to_retrieve.push(a.clone());
}
- StorageSpecific::Garage { from_config: grgconf, bucket_source }
- },
+ StorageSpecific::Garage {
+ from_config: grgconf,
+ bucket_source,
+ }
+ }
};
-
-
Ok(Self {
ldap_server: config.ldap_server,
pre_bind_on_login: config.pre_bind_on_login,
@@ -89,27 +94,32 @@ impl LdapLoginProvider {
async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> {
let storage: Builder = match &self.storage_specific {
- StorageSpecific::InMemory => self.in_memory_store.builder(
- &get_attr(user, &self.username_attr)?
- ).await,
- StorageSpecific::Garage { from_config, bucket_source } => {
+ StorageSpecific::InMemory => {
+ self.in_memory_store
+ .builder(&get_attr(user, &self.username_attr)?)
+ .await
+ }
+ StorageSpecific::Garage {
+ from_config,
+ bucket_source,
+ } => {
let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
- let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?;
+ let aws_secret_access_key =
+ get_attr(user, &from_config.aws_secret_access_key_attr)?;
let bucket = match bucket_source {
BucketSource::Constant(b) => b.clone(),
BucketSource::Attr(a) => get_attr(user, &a)?,
};
-
storage::garage::GarageBuilder::new(storage::garage::GarageConf {
- region: from_config.aws_region.clone(),
+ region: from_config.aws_region.clone(),
s3_endpoint: from_config.s3_endpoint.clone(),
k2v_endpoint: from_config.k2v_endpoint.clone(),
- aws_access_key_id,
- aws_secret_access_key,
+ aws_access_key_id,
+ aws_secret_access_key,
bucket,
})?
- },
+ }
};
Ok(storage)
@@ -172,7 +182,6 @@ impl LoginProvider for LdapLoginProvider {
drop(ldap);
-
Ok(Credentials { storage, keys })
}
@@ -215,7 +224,7 @@ impl LoginProvider for LdapLoginProvider {
let cr = CryptoRoot(crstr);
let public_key = cr.public_key()?;
- // storage
+ // storage
let storage = self.storage_creds_from_ldap_user(&user).await?;
drop(ldap);
diff --git a/src/login/mod.rs b/src/login/mod.rs
index 3369ac2..2926738 100644
--- a/src/login/mod.rs
+++ b/src/login/mod.rs
@@ -1,8 +1,8 @@
pub mod ldap_provider;
pub mod static_provider;
-use std::sync::Arc;
use base64::Engine;
+use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait;
@@ -45,7 +45,7 @@ pub struct PublicCredentials {
pub public_key: PublicKey,
}
-use serde::{Serialize, Deserialize};
+use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CryptoRoot(pub String);
@@ -73,47 +73,59 @@ impl CryptoRoot {
pub fn public_key(&self) -> Result<PublicKey> {
match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] {
- [ "aero", "cryptoroot", "pass", b64blob ] => {
+ ["aero", "cryptoroot", "pass", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
if blob.len() < 32 {
- bail!("Decoded data is {} bytes long, expect at least 32 bytes", blob.len());
+ bail!(
+ "Decoded data is {} bytes long, expect at least 32 bytes",
+ blob.len()
+ );
}
PublicKey::from_slice(&blob[..32]).context("must be a valid public key")
- },
- [ "aero", "cryptoroot", "cleartext", b64blob ] => {
+ }
+ ["aero", "cryptoroot", "cleartext", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
Ok(CryptoKeys::deserialize(&blob)?.public)
- },
- [ "aero", "cryptoroot", "incoming", b64blob ] => {
+ }
+ ["aero", "cryptoroot", "incoming", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
if blob.len() < 32 {
- bail!("Decoded data is {} bytes long, expect at least 32 bytes", blob.len());
+ bail!(
+ "Decoded data is {} bytes long, expect at least 32 bytes",
+ blob.len()
+ );
}
PublicKey::from_slice(&blob[..32]).context("must be a valid public key")
- },
- [ "aero", "cryptoroot", "keyring", _ ] => {
+ }
+ ["aero", "cryptoroot", "keyring", _] => {
bail!("keyring is not yet implemented!")
- },
- _ => bail!(format!("passed string '{}' is not a valid cryptoroot", self.0)),
+ }
+ _ => bail!(format!(
+ "passed string '{}' is not a valid cryptoroot",
+ self.0
+ )),
}
}
pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> {
match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] {
- [ "aero", "cryptoroot", "pass", b64blob ] => {
+ ["aero", "cryptoroot", "pass", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
CryptoKeys::password_open(password, &blob)
- },
- [ "aero", "cryptoroot", "cleartext", b64blob ] => {
+ }
+ ["aero", "cryptoroot", "cleartext", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
CryptoKeys::deserialize(&blob)
- },
- [ "aero", "cryptoroot", "incoming", _ ] => {
+ }
+ ["aero", "cryptoroot", "incoming", _] => {
bail!("incoming cryptoroot does not contain a crypto key!")
- },
- [ "aero", "cryptoroot", "keyring", _ ] =>{
+ }
+ ["aero", "cryptoroot", "keyring", _] => {
bail!("keyring is not yet implemented!")
- },
- _ => bail!(format!("passed string '{}' is not a valid cryptoroot", self.0)),
+ }
+ _ => bail!(format!(
+ "passed string '{}' is not a valid cryptoroot",
+ self.0
+ )),
}
}
}
@@ -132,9 +144,6 @@ pub struct CryptoKeys {
// ----
-
-
-
impl CryptoKeys {
/// Initialize a new cryptography root
pub fn init() -> Self {
@@ -202,7 +211,11 @@ fn derive_password_key(kdf_salt: &[u8], password: &str) -> Result<Key> {
Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap())
}
-fn try_open_encrypted_keys(kdf_salt: &[u8], password: &str, encrypted_keys: &[u8]) -> Result<Vec<u8>> {
+fn try_open_encrypted_keys(
+ kdf_salt: &[u8],
+ password: &str,
+ encrypted_keys: &[u8],
+) -> Result<Vec<u8>> {
let password_key = derive_password_key(kdf_salt, password)?;
open(encrypted_keys, &password_key)
}
@@ -210,7 +223,7 @@ fn try_open_encrypted_keys(kdf_salt: &[u8], password: &str, encrypted_keys: &[u8
// ---- UTIL ----
pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> {
- use argon2::{Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version, password_hash};
+ use argon2::{password_hash, Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version};
let params = ParamsBuilder::new()
.output_len(output_len)
@@ -219,7 +232,8 @@ pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec
let argon2 = Argon2::new(Algorithm::default(), Version::default(), params);
let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt);
- let valid_salt = password_hash::Salt::from_b64(&b64_salt).map_err(|e| anyhow!("Invalid salt, error {}", e))?;
+ let valid_salt = password_hash::Salt::from_b64(&b64_salt)
+ .map_err(|e| anyhow!("Invalid salt, error {}", e))?;
let hash = argon2
.hash_password(password, valid_salt)
.map_err(|e| anyhow!("Unable to hash: {}", e))?;
diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs
index b11123c..1e1ecbf 100644
--- a/src/login/static_provider.rs
+++ b/src/login/static_provider.rs
@@ -1,8 +1,8 @@
use std::collections::HashMap;
-use std::sync::Arc;
use std::path::PathBuf;
-use tokio::sync::watch;
+use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
+use tokio::sync::watch;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
@@ -28,7 +28,8 @@ pub struct StaticLoginProvider {
}
pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> {
- let mut stream = signal(SignalKind::user_defined1()).expect("failed to install SIGUSR1 signal hander for reload");
+ let mut stream = signal(SignalKind::user_defined1())
+ .expect("failed to install SIGUSR1 signal hander for reload");
loop {
let ulist: UserList = match read_config(config.clone()) {
@@ -42,7 +43,12 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>)
let users = ulist
.into_iter()
- .map(|(username, config)| (username.clone() , Arc::new(ContextualUserEntry { username, config })))
+ .map(|(username, config)| {
+ (
+ username.clone(),
+ Arc::new(ContextualUserEntry { username, config }),
+ )
+ })
.collect::<HashMap<_, _>>();
let mut users_by_email = HashMap::new();
@@ -51,14 +57,18 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>)
if users_by_email.contains_key(m) {
tracing::warn!("Several users have the same email address: {}", m);
stream.recv().await;
- continue
+ continue;
}
users_by_email.insert(m.clone(), u.clone());
}
}
tracing::info!("{} users loaded", users.len());
- up.send(UserDatabase { users, users_by_email }).context("update user db config")?;
+ up.send(UserDatabase {
+ users,
+ users_by_email,
+ })
+ .context("update user db config")?;
stream.recv().await;
tracing::info!("Received SIGUSR1, reloading");
}
@@ -71,7 +81,10 @@ impl StaticLoginProvider {
tokio::spawn(update_user_list(config.user_list, tx));
rx.changed().await?;
- Ok(Self { user_db: rx, in_memory_store: storage::in_memory::MemDb::new() })
+ Ok(Self {
+ user_db: rx,
+ in_memory_store: storage::in_memory::MemDb::new(),
+ })
}
}
@@ -95,14 +108,16 @@ impl LoginProvider for StaticLoginProvider {
tracing::debug!(user=%username, "fetch keys");
let storage: storage::Builder = match &user.config.storage {
StaticStorage::InMemory => self.in_memory_store.builder(username).await,
- StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf {
- region: grgconf.aws_region.clone(),
- k2v_endpoint: grgconf.k2v_endpoint.clone(),
- s3_endpoint: grgconf.s3_endpoint.clone(),
- aws_access_key_id: grgconf.aws_access_key_id.clone(),
- aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
- bucket: grgconf.bucket.clone(),
- })?,
+ StaticStorage::Garage(grgconf) => {
+ storage::garage::GarageBuilder::new(storage::garage::GarageConf {
+ region: grgconf.aws_region.clone(),
+ k2v_endpoint: grgconf.k2v_endpoint.clone(),
+ s3_endpoint: grgconf.s3_endpoint.clone(),
+ aws_access_key_id: grgconf.aws_access_key_id.clone(),
+ aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
+ bucket: grgconf.bucket.clone(),
+ })?
+ }
};
let cr = CryptoRoot(user.config.crypto_root.clone());
@@ -124,14 +139,16 @@ impl LoginProvider for StaticLoginProvider {
let storage: storage::Builder = match &user.config.storage {
StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await,
- StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf {
- region: grgconf.aws_region.clone(),
- k2v_endpoint: grgconf.k2v_endpoint.clone(),
- s3_endpoint: grgconf.s3_endpoint.clone(),
- aws_access_key_id: grgconf.aws_access_key_id.clone(),
- aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
- bucket: grgconf.bucket.clone(),
- })?,
+ StaticStorage::Garage(grgconf) => {
+ storage::garage::GarageBuilder::new(storage::garage::GarageConf {
+ region: grgconf.aws_region.clone(),
+ k2v_endpoint: grgconf.k2v_endpoint.clone(),
+ s3_endpoint: grgconf.s3_endpoint.clone(),
+ aws_access_key_id: grgconf.aws_access_key_id.clone(),
+ aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
+ bucket: grgconf.bucket.clone(),
+ })?
+ }
};
let cr = CryptoRoot(user.config.crypto_root.clone());
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index 7e33a9a..04d2ef1 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -51,7 +51,10 @@ async fn incoming_mail_watch_process_internal(
creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> {
- let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK));
+ let mut lock_held = k2v_lock_loop(
+ creds.storage.build().await?,
+ storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
+ );
let storage = creds.storage.build().await?;
let mut inbox: Option<Arc<Mailbox>> = None;
@@ -63,8 +66,7 @@ async fn incoming_mail_watch_process_internal(
let wait_new_mail = async {
loop {
- match storage.row_poll(&incoming_key).await
- {
+ match storage.row_poll(&incoming_key).await {
Ok(row_val) => break row_val.row_ref,
Err(e) => {
error!("Error in wait_new_mail: {}", e);
@@ -360,7 +362,10 @@ async fn k2v_lock_loop_internal(
Some(existing) => existing,
None => row_ref.clone(),
};
- if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await {
+ if let Err(e) = storage
+ .row_insert(vec![storage::RowVal::new(row, lock)])
+ .await
+ {
error!("Could not take lock: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
@@ -428,14 +433,12 @@ impl EncryptedMessage {
let blob_val = storage::BlobVal::new(
storage::BlobRef(format!("incoming/{}", gen_ident())),
self.encrypted_body.clone().into(),
- ).with_meta(MESSAGE_KEY.to_string(), key_header);
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
storage.blob_insert(blob_val).await?;
// Update watch key to signal new mail
- let watch_val = storage::RowVal::new(
- watch_ct.clone(),
- gen_ident().0.to_vec(),
- );
+ let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
storage.row_insert(vec![watch_val]).await?;
Ok(())
}
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index 6fb7dea..e424ba3 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -8,7 +8,7 @@ use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
-use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self};
+use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
use crate::timestamp::now_msec;
pub struct Mailbox {
@@ -196,7 +196,10 @@ impl MailboxInternal {
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
- let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
+ let ops = ids
+ .iter()
+ .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
+ .collect::<Vec<_>>();
let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
let mut meta_vec = vec![];
@@ -231,7 +234,10 @@ impl MailboxInternal {
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?;
+ let obj_res = self
+ .storage
+ .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
+ .await?;
let body = obj_res.value;
cryptoblob::open(&body, message_key)
}
@@ -266,10 +272,12 @@ impl MailboxInternal {
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
- self.storage.blob_insert(BlobVal::new(
- BlobRef(format!("{}/{}", self.mail_path, ident)),
- message_blob,
- )).await?;
+ self.storage
+ .blob_insert(BlobVal::new(
+ BlobRef(format!("{}/{}", self.mail_path, ident)),
+ message_blob,
+ ))
+ .await?;
Ok::<_, anyhow::Error>(())
},
async {
@@ -281,10 +289,12 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.storage.row_insert(vec![RowVal::new(
- RowRef::new(&self.mail_path, &ident.to_string()),
- meta_blob,
- )]).await?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &ident.to_string()),
+ meta_blob,
+ )])
+ .await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@@ -328,10 +338,12 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
- self.storage.row_insert(vec![RowVal::new(
+ self.storage
+ .row_insert(vec![RowVal::new(
RowRef::new(&self.mail_path, &ident.to_string()),
meta_blob,
- )]).await?;
+ )])
+ .await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@@ -355,17 +367,25 @@ impl MailboxInternal {
futures::try_join!(
async {
// Delete mail body from S3
- self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?;
+ self.storage
+ .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
+ .await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
- let res = self.storage
- .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk)))
+ let res = self
+ .storage
+ .row_fetch(&storage::Selector::Single(&RowRef::new(
+ &self.mail_path,
+ &sk,
+ )))
.await?;
if let Some(row_val) = res.into_iter().next() {
- self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?;
+ self.storage
+ .row_rm(&storage::Selector::Single(&row_val.row_ref))
+ .await?;
}
Ok::<_, anyhow::Error>(())
}
@@ -421,10 +441,12 @@ impl MailboxInternal {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
- self.storage.row_insert(vec![RowVal::new(
- RowRef::new(&self.mail_path, &new_id.to_string()),
- meta_blob,
- )]).await?;
+ self.storage
+ .row_insert(vec![RowVal::new(
+ RowRef::new(&self.mail_path, &new_id.to_string()),
+ meta_blob,
+ )])
+ .await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync(),
diff --git a/src/mail/user.rs b/src/mail/user.rs
index 8d12c58..da0d509 100644
--- a/src/mail/user.rs
+++ b/src/mail/user.rs
@@ -226,7 +226,11 @@ impl User {
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
- let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await {
+ let (mut list, row) = match self
+ .storage
+ .row_fetch(&storage::Selector::Single(&row_ref))
+ .await
+ {
Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
Err(e) => return Err(e.into()),
Ok(rv) => {
diff --git a/src/main.rs b/src/main.rs
index f08f1a3..3221c2e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,5 @@
#![feature(async_fn_in_trait)]
-mod timestamp;
mod bayou;
mod config;
mod cryptoblob;
@@ -11,17 +10,18 @@ mod login;
mod mail;
mod server;
mod storage;
+mod timestamp;
-use std::path::PathBuf;
use std::io::Read;
+use std::path::PathBuf;
-use anyhow::{bail, Result, Context};
+use anyhow::{bail, Context, Result};
use clap::{Parser, Subcommand};
-use nix::{unistd::Pid, sys::signal};
+use nix::{sys::signal, unistd::Pid};
use config::*;
-use server::Server;
use login::{static_provider::*, *};
+use server::Server;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
@@ -58,7 +58,7 @@ enum ToolsCommand {
PasswordHash {
#[clap(env = "AEROGRAMME_PASSWORD")]
maybe_password: Option<String>,
- }
+ },
}
#[derive(Subcommand, Debug)]
@@ -138,7 +138,7 @@ enum AccountManagement {
maybe_new_password: Option<String>,
#[clap(short, long)]
- login: String
+ login: String,
},
}
@@ -165,11 +165,11 @@ async fn main() -> Result<()> {
CompanionCommand::Daemon => {
let server = Server::from_companion_config(config).await?;
server.run().await?;
- },
+ }
CompanionCommand::Reload { pid } => reload(*pid, config.pid)?,
CompanionCommand::Wizard => {
unimplemented!();
- },
+ }
CompanionCommand::Account(cmd) => {
let user_file = config.users.user_list;
account_management(&args.command, cmd, user_file)?;
@@ -179,22 +179,24 @@ async fn main() -> Result<()> {
ProviderCommand::Daemon => {
let server = Server::from_provider_config(config).await?;
server.run().await?;
- },
+ }
ProviderCommand::Reload { pid } => reload(*pid, config.pid)?,
ProviderCommand::Account(cmd) => {
let user_file = match config.users {
UserManagement::Static(conf) => conf.user_list,
- UserManagement::Ldap(_) => panic!("LDAP account management is not supported from Aerogramme.")
+ UserManagement::Ldap(_) => {
+ panic!("LDAP account management is not supported from Aerogramme.")
+ }
};
account_management(&args.command, cmd, user_file)?;
}
},
(Command::Provider(_), AnyConfig::Companion(_)) => {
bail!("Your want to run a 'Provider' command but your configuration file has role 'Companion'.");
- },
+ }
(Command::Companion(_), AnyConfig::Provider(_)) => {
bail!("Your want to run a 'Companion' command but your configuration file has role 'Provider'.");
- },
+ }
(Command::Tools(subcommand), _) => match subcommand {
ToolsCommand::PasswordHash { maybe_password } => {
let password = match maybe_password {
@@ -202,60 +204,64 @@ async fn main() -> Result<()> {
None => rpassword::prompt_password("Enter password: ")?,
};
println!("{}", hash_password(&password)?);
- },
- ToolsCommand::CryptoRoot(crcommand) => {
- match crcommand {
- CryptoRootCommand::New { maybe_password } => {
- let password = match maybe_password {
- Some(pwd) => pwd.clone(),
- None => {
- let password = rpassword::prompt_password("Enter password: ")?;
- let password_confirm = rpassword::prompt_password("Confirm password: ")?;
- if password != password_confirm {
- bail!("Passwords don't match.");
- }
- password
+ }
+ ToolsCommand::CryptoRoot(crcommand) => match crcommand {
+ CryptoRootCommand::New { maybe_password } => {
+ let password = match maybe_password {
+ Some(pwd) => pwd.clone(),
+ None => {
+ let password = rpassword::prompt_password("Enter password: ")?;
+ let password_confirm =
+ rpassword::prompt_password("Confirm password: ")?;
+ if password != password_confirm {
+ bail!("Passwords don't match.");
}
- };
- let crypto_keys = CryptoKeys::init();
- let cr = CryptoRoot::create_pass(&password, &crypto_keys)?;
- println!("{}", cr.0);
- },
- CryptoRootCommand::NewClearText => {
- let crypto_keys = CryptoKeys::init();
- let cr = CryptoRoot::create_cleartext(&crypto_keys);
- println!("{}", cr.0);
- },
- CryptoRootCommand::ChangePassword { maybe_old_password, maybe_new_password, crypto_root } => {
- let old_password = match maybe_old_password {
- Some(pwd) => pwd.to_string(),
- None => rpassword::prompt_password("Enter old password: ")?,
- };
-
- let new_password = match maybe_new_password {
- Some(pwd) => pwd.to_string(),
- None => {
- let password = rpassword::prompt_password("Enter new password: ")?;
- let password_confirm = rpassword::prompt_password("Confirm new password: ")?;
- if password != password_confirm {
- bail!("Passwords don't match.");
- }
- password
+ password
+ }
+ };
+ let crypto_keys = CryptoKeys::init();
+ let cr = CryptoRoot::create_pass(&password, &crypto_keys)?;
+ println!("{}", cr.0);
+ }
+ CryptoRootCommand::NewClearText => {
+ let crypto_keys = CryptoKeys::init();
+ let cr = CryptoRoot::create_cleartext(&crypto_keys);
+ println!("{}", cr.0);
+ }
+ CryptoRootCommand::ChangePassword {
+ maybe_old_password,
+ maybe_new_password,
+ crypto_root,
+ } => {
+ let old_password = match maybe_old_password {
+ Some(pwd) => pwd.to_string(),
+ None => rpassword::prompt_password("Enter old password: ")?,
+ };
+
+ let new_password = match maybe_new_password {
+ Some(pwd) => pwd.to_string(),
+ None => {
+ let password = rpassword::prompt_password("Enter new password: ")?;
+ let password_confirm =
+ rpassword::prompt_password("Confirm new password: ")?;
+ if password != password_confirm {
+ bail!("Passwords don't match.");
}
- };
-
- let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?;
- let cr = CryptoRoot::create_pass(&new_password, &keys)?;
- println!("{}", cr.0);
- },
- CryptoRootCommand::DeriveIncoming { crypto_root } => {
- let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?;
- let cr = CryptoRoot::create_incoming(&pubkey);
- println!("{}", cr.0);
- },
+ password
+ }
+ };
+
+ let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?;
+ let cr = CryptoRoot::create_pass(&new_password, &keys)?;
+ println!("{}", cr.0);
+ }
+ CryptoRootCommand::DeriveIncoming { crypto_root } => {
+ let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?;
+ let cr = CryptoRoot::create_incoming(&pubkey);
+ println!("{}", cr.0);
}
},
- }
+ },
}
Ok(())
@@ -264,12 +270,12 @@ async fn main() -> Result<()> {
fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> {
let final_pid = match (pid, pid_path) {
(Some(pid), _) => pid,
- (_, Some(path)) => {
+ (_, Some(path)) => {
let mut f = std::fs::OpenOptions::new().read(true).open(path)?;
let mut pidstr = String::new();
f.read_to_string(&mut pidstr)?;
pidstr.parse::<i32>()?
- },
+ }
_ => bail!("Unable to infer your daemon's PID"),
};
let pid = Pid::from_raw(final_pid);
@@ -278,13 +284,15 @@ fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> {
}
fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -> Result<()> {
- let mut ulist: UserList = read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?;
+ let mut ulist: UserList =
+ read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?;
match cmd {
AccountManagement::Add { login, setup } => {
- tracing::debug!(user=login, "will-create");
- let stp: SetupEntry = read_config(setup.clone()).context(format!("'{:?}' must be a setup file", setup))?;
- tracing::debug!(user=login, "loaded setup entry");
+ tracing::debug!(user = login, "will-create");
+ let stp: SetupEntry = read_config(setup.clone())
+ .context(format!("'{:?}' must be a setup file", setup))?;
+ tracing::debug!(user = login, "loaded setup entry");
let password = match stp.clear_password {
Some(pwd) => pwd,
@@ -307,21 +315,28 @@ fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -
let hash = hash_password(password.as_str()).context("unable to hash password")?;
- ulist.insert(login.clone(), UserEntry {
- email_addresses: stp.email_addresses,
- password: hash,
- crypto_root: crypto_root.0,
- storage: stp.storage,
- });
+ ulist.insert(
+ login.clone(),
+ UserEntry {
+ email_addresses: stp.email_addresses,
+ password: hash,
+ crypto_root: crypto_root.0,
+ storage: stp.storage,
+ },
+ );
write_config(users.clone(), &ulist)?;
- },
+ }
AccountManagement::Delete { login } => {
- tracing::debug!(user=login, "will-delete");
+ tracing::debug!(user = login, "will-delete");
ulist.remove(login);
write_config(users.clone(), &ulist)?;
- },
- AccountManagement::ChangePassword { maybe_old_password, maybe_new_password, login } => {
+ }
+ AccountManagement::ChangePassword {
+ maybe_old_password,
+ maybe_new_password,
+ login,
+ } => {
let mut user = ulist.remove(login).context("user must exist first")?;
let old_password = match maybe_old_password {
@@ -345,16 +360,16 @@ fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -
}
password
}
- };
+ };
let new_hash = hash_password(&new_password)?;
let new_crypto_root = CryptoRoot::create_pass(&new_password, &crypto_keys)?;
-
+
user.password = new_hash;
user.crypto_root = new_crypto_root.0;
ulist.insert(login.clone(), user);
write_config(users.clone(), &ulist)?;
- },
+ }
};
Ok(())
diff --git a/src/server.rs b/src/server.rs
index 552a0e6..28e0b27 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,6 +1,6 @@
-use std::sync::Arc;
-use std::path::PathBuf;
use std::io::Write;
+use std::path::PathBuf;
+use std::sync::Arc;
use anyhow::Result;
use futures::try_join;
@@ -26,7 +26,11 @@ impl Server {
let lmtp_server = None;
let imap_server = Some(imap::new(config.imap, login.clone()).await?);
- Ok(Self { lmtp_server, imap_server, pid_file: config.pid })
+ Ok(Self {
+ lmtp_server,
+ imap_server,
+ pid_file: config.pid,
+ })
}
pub async fn from_provider_config(config: ProviderConfig) -> Result<Self> {
@@ -39,12 +43,16 @@ impl Server {
let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone()));
let imap_server = Some(imap::new(config.imap, login.clone()).await?);
- Ok(Self { lmtp_server, imap_server, pid_file: config.pid })
+ Ok(Self {
+ lmtp_server,
+ imap_server,
+ pid_file: config.pid,
+ })
}
pub async fn run(self) -> Result<()> {
let pid = std::process::id();
- tracing::info!(pid=pid, "Starting main loops");
+ tracing::info!(pid = pid, "Starting main loops");
// write the pid file
if let Some(pid_file) = self.pid_file {
@@ -57,7 +65,6 @@ impl Server {
drop(file);
}
-
let (exit_signal, provoke_exit) = watch_ctrl_c();
let _exit_on_err = move |err: anyhow::Error| {
error!("Error: {}", err);
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index f9ba756..d08585f 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -1,10 +1,6 @@
use crate::storage::*;
+use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError};
use serde::Serialize;
-use aws_sdk_s3::{
- self as s3,
- error::SdkError,
- operation::get_object::GetObjectError,
-};
#[derive(Clone, Debug, Serialize)]
pub struct GarageConf {
@@ -28,18 +24,18 @@ impl GarageBuilder {
unicity.extend_from_slice(file!().as_bytes());
unicity.append(&mut rmp_serde::to_vec(&conf)?);
Ok(Arc::new(Self { conf, unicity }))
- }
+ }
}
#[async_trait]
impl IBuilder for GarageBuilder {
async fn build(&self) -> Result<Store, StorageError> {
let s3_creds = s3::config::Credentials::new(
- self.conf.aws_access_key_id.clone(),
- self.conf.aws_secret_access_key.clone(),
- None,
- None,
- "aerogramme"
+ self.conf.aws_access_key_id.clone(),
+ self.conf.aws_secret_access_key.clone(),
+ None,
+ None,
+ "aerogramme",
);
let s3_config = aws_config::from_env()
@@ -51,12 +47,12 @@ impl IBuilder for GarageBuilder {
let s3_client = aws_sdk_s3::Client::new(&s3_config);
let k2v_config = k2v_client::K2vClientConfig {
- endpoint: self.conf.k2v_endpoint.clone(),
- region: self.conf.region.clone(),
- aws_access_key_id: self.conf.aws_access_key_id.clone(),
- aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
- bucket: self.conf.bucket.clone(),
- user_agent: None,
+ endpoint: self.conf.k2v_endpoint.clone(),
+ region: self.conf.region.clone(),
+ aws_access_key_id: self.conf.aws_access_key_id.clone(),
+ aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
+ bucket: self.conf.bucket.clone(),
+ user_agent: None,
};
let k2v_client = match k2v_client::K2vClient::new(k2v_config) {
@@ -67,7 +63,7 @@ impl IBuilder for GarageBuilder {
Ok(v) => v,
};
- Ok(Box::new(GarageStore {
+ Ok(Box::new(GarageStore {
bucket: self.conf.bucket.clone(),
s3: s3_client,
k2v: k2v_client,
@@ -86,19 +82,30 @@ pub struct GarageStore {
fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal {
let new_row_ref = row_ref.with_causality(causal_value.causality.into());
- let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value {
- k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
- k2v_client::K2vValue::Value(v) => Alternative::Value(v),
- }).collect::<Vec<_>>();
+ let row_values = causal_value
+ .value
+ .into_iter()
+ .map(|k2v_value| match k2v_value {
+ k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
+ k2v_client::K2vValue::Value(v) => Alternative::Value(v),
+ })
+ .collect::<Vec<_>>();
- RowVal { row_ref: new_row_ref, value: row_values }
+ RowVal {
+ row_ref: new_row_ref,
+ value: row_values,
+ }
}
#[async_trait]
impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
let (pk_list, batch_op) = match select {
- Selector::Range { shard, sort_begin, sort_end } => (
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => (
vec![shard.to_string()],
vec![k2v_client::BatchReadOp {
partition_key: shard,
@@ -108,49 +115,71 @@ impl IStore for GarageStore {
..k2v_client::Filter::default()
},
..k2v_client::BatchReadOp::default()
- }]
+ }],
),
Selector::List(row_ref_list) => (
- row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::<Vec<_>>(),
- row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp {
- partition_key: &row_ref.uid.shard,
+ row_ref_list
+ .iter()
+ .map(|row_ref| row_ref.uid.shard.to_string())
+ .collect::<Vec<_>>(),
+ row_ref_list
+ .iter()
+ .map(|row_ref| k2v_client::BatchReadOp {
+ partition_key: &row_ref.uid.shard,
+ filter: k2v_client::Filter {
+ start: Some(&row_ref.uid.sort),
+ ..k2v_client::Filter::default()
+ },
+ single_item: true,
+ ..k2v_client::BatchReadOp::default()
+ })
+ .collect::<Vec<_>>(),
+ ),
+ Selector::Prefix { shard, sort_prefix } => (
+ vec![shard.to_string()],
+ vec![k2v_client::BatchReadOp {
+ partition_key: shard,
filter: k2v_client::Filter {
- start: Some(&row_ref.uid.sort),
+ prefix: Some(sort_prefix),
..k2v_client::Filter::default()
},
- single_item: true,
..k2v_client::BatchReadOp::default()
- }).collect::<Vec<_>>()
+ }],
),
- Selector::Prefix { shard, sort_prefix } => (
- vec![shard.to_string()],
- vec![k2v_client::BatchReadOp {
- partition_key: shard,
- filter: k2v_client::Filter {
- prefix: Some(sort_prefix),
- ..k2v_client::Filter::default()
- },
- ..k2v_client::BatchReadOp::default()
- }]),
Selector::Single(row_ref) => {
- let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await {
+ let causal_value = match self
+ .k2v
+ .read_item(&row_ref.uid.shard, &row_ref.uid.sort)
+ .await
+ {
Err(e) => {
- tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e);
+ tracing::error!(
+ "K2V read item shard={}, sort={}, bucket={} failed: {}",
+ row_ref.uid.shard,
+ row_ref.uid.sort,
+ self.bucket,
+ e
+ );
return Err(StorageError::Internal);
- },
+ }
Ok(v) => v,
};
let row_val = causal_to_row_val((*row_ref).clone(), causal_value);
- return Ok(vec![row_val])
- },
+ return Ok(vec![row_val]);
+ }
};
let all_raw_res = match self.k2v.read_batch(&batch_op).await {
Err(e) => {
- tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e);
+ tracing::error!(
+ "k2v read batch failed for {:?}, bucket {} with err: {}",
+ select,
+ self.bucket,
+ e
+ );
return Err(StorageError::Internal);
- },
+ }
Ok(v) => v,
};
@@ -163,13 +192,17 @@ impl IStore for GarageStore {
.into_iter()
.zip(pk_list.into_iter())
.map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
- .collect::<Vec<_>>();
+ .collect::<Vec<_>>();
Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
let del_op = match select {
- Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp {
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => vec![k2v_client::BatchDeleteOp {
partition_key: shard,
prefix: None,
start: Some(sort_begin),
@@ -178,21 +211,24 @@ impl IStore for GarageStore {
}],
Selector::List(row_ref_list) => {
// Insert null values with causality token = delete
- let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp {
- partition_key: &v.uid.shard,
- sort_key: &v.uid.sort,
- causality: v.causality.clone().map(|ct| ct.into()),
- value: k2v_client::K2vValue::Tombstone,
- }).collect::<Vec<_>>();
-
+ let batch_op = row_ref_list
+ .iter()
+ .map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.uid.shard,
+ sort_key: &v.uid.sort,
+ causality: v.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ })
+ .collect::<Vec<_>>();
+
return match self.k2v.insert_batch(&batch_op).await {
Err(e) => {
tracing::error!("Unable to delete the list of values: {}", e);
Err(StorageError::Internal)
- },
+ }
Ok(_) => Ok(()),
};
- },
+ }
Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
partition_key: shard,
prefix: Some(sort_prefix),
@@ -208,15 +244,15 @@ impl IStore for GarageStore {
causality: row_ref.causality.clone().map(|ct| ct.into()),
value: k2v_client::K2vValue::Tombstone,
}];
-
+
return match self.k2v.insert_batch(&batch_op).await {
Err(e) => {
tracing::error!("Unable to delete the list of values: {}", e);
Err(StorageError::Internal)
- },
+ }
Ok(_) => Ok(()),
};
- },
+ }
};
// Finally here we only have prefix & range
@@ -224,34 +260,46 @@ impl IStore for GarageStore {
Err(e) => {
tracing::error!("delete batch error: {}", e);
Err(StorageError::Internal)
- },
+ }
Ok(_) => Ok(()),
}
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
- let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp {
- partition_key: &v.row_ref.uid.shard,
- sort_key: &v.row_ref.uid.sort,
- causality: v.row_ref.causality.clone().map(|ct| ct.into()),
- value: v.value.iter().next().map(|cv| match cv {
- Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
- Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
- }).unwrap_or(k2v_client::K2vValue::Tombstone)
- }).collect::<Vec<_>>();
+ let batch_ops = values
+ .iter()
+ .map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.row_ref.uid.shard,
+ sort_key: &v.row_ref.uid.sort,
+ causality: v.row_ref.causality.clone().map(|ct| ct.into()),
+ value: v
+ .value
+ .iter()
+ .next()
+ .map(|cv| match cv {
+ Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
+ Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
+ })
+ .unwrap_or(k2v_client::K2vValue::Tombstone),
+ })
+ .collect::<Vec<_>>();
match self.k2v.insert_batch(&batch_ops).await {
Err(e) => {
tracing::error!("k2v can't insert some value: {}", e);
Err(StorageError::Internal)
- },
+ }
Ok(v) => Ok(v),
}
}
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
loop {
if let Some(ct) = &value.causality {
- match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await {
+ match self
+ .k2v
+ .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None)
+ .await
+ {
Err(e) => {
tracing::error!("Unable to poll item: {}", e);
return Err(StorageError::Internal);
@@ -262,8 +310,7 @@ impl IStore for GarageStore {
} else {
match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
Err(k2v_client::Error::NotFound) => {
- self
- .k2v
+ self.k2v
.insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
.await
.map_err(|e| {
@@ -273,8 +320,8 @@ impl IStore for GarageStore {
}
Err(e) => {
tracing::error!("Unable to read item in polling logic: {}", e);
- return Err(StorageError::Internal)
- },
+ return Err(StorageError::Internal);
+ }
Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
}
}
@@ -282,7 +329,8 @@ impl IStore for GarageStore {
}
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
- let maybe_out = self.s3
+ let maybe_out = self
+ .s3
.get_object()
.bucket(self.bucket.to_string())
.key(blob_ref.0.to_string())
@@ -296,12 +344,12 @@ impl IStore for GarageStore {
e => {
tracing::warn!("Blob Fetch Error, Service Error: {}", e);
return Err(StorageError::Internal);
- },
+ }
},
Err(e) => {
tracing::warn!("Blob Fetch Error, {}", e);
return Err(StorageError::Internal);
- },
+ }
};
let buffer = match object_output.body.collect().await {
@@ -316,9 +364,10 @@ impl IStore for GarageStore {
Ok(BlobVal::new(blob_ref.clone(), buffer))
}
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
- let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
+ let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
- let maybe_send = self.s3
+ let maybe_send = self
+ .s3
.put_object()
.bucket(self.bucket.to_string())
.key(blob_val.blob_ref.0.to_string())
@@ -338,7 +387,8 @@ impl IStore for GarageStore {
}
}
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
- let maybe_copy = self.s3
+ let maybe_copy = self
+ .s3
.copy_object()
.bucket(self.bucket.to_string())
.key(dst.0.clone())
@@ -348,18 +398,24 @@ impl IStore for GarageStore {
match maybe_copy {
Err(e) => {
- tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e);
+ tracing::error!(
+ "unable to copy object {} to {} (bucket: {}), error: {}",
+ src.0,
+ dst.0,
+ self.bucket,
+ e
+ );
Err(StorageError::Internal)
- },
+ }
Ok(_) => {
tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket);
Ok(())
}
}
-
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- let maybe_list = self.s3
+ let maybe_list = self
+ .s3
.list_objects_v2()
.bucket(self.bucket.to_string())
.prefix(prefix)
@@ -370,7 +426,12 @@ impl IStore for GarageStore {
match maybe_list {
Err(e) => {
- tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e);
+ tracing::error!(
+ "listing prefix {} on bucket {} failed: {}",
+ prefix,
+ self.bucket,
+ e
+ );
Err(StorageError::Internal)
}
Ok(pagin_list_out) => Ok(pagin_list_out
@@ -382,7 +443,8 @@ impl IStore for GarageStore {
}
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
- let maybe_delete = self.s3
+ let maybe_delete = self
+ .s3
.delete_object()
.bucket(self.bucket.to_string())
.key(blob_ref.0.clone())
@@ -391,9 +453,14 @@ impl IStore for GarageStore {
match maybe_delete {
Err(e) => {
- tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e);
+ tracing::error!(
+ "unable to delete {} (bucket: {}), error {}",
+ blob_ref.0,
+ self.bucket,
+ e
+ );
Err(StorageError::Internal)
- },
+ }
Ok(_) => {
tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket);
Ok(())
@@ -401,4 +468,3 @@ impl IStore for GarageStore {
}
}
}
-
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index ee7c9a6..3c3a94c 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -1,6 +1,6 @@
use crate::storage::*;
-use std::collections::{HashMap, BTreeMap};
-use std::ops::Bound::{Included, Unbounded, Excluded, self};
+use std::collections::{BTreeMap, HashMap};
+use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::sync::{Arc, RwLock};
use tokio::sync::Notify;
@@ -16,7 +16,7 @@ impl MemDb {
Self(tokio::sync::Mutex::new(HashMap::new()))
}
- pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
+ pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
let mut global_storage = self.0.lock().await;
global_storage
.entry(username.to_string())
@@ -60,8 +60,8 @@ impl InternalRowVal {
}
fn to_row_val(&self, row_ref: RowRef) -> RowVal {
- RowVal{
- row_ref: row_ref.with_causality(self.version.to_string()),
+ RowVal {
+ row_ref: row_ref.with_causality(self.version.to_string()),
value: self.concurrent_values(),
}
}
@@ -75,7 +75,7 @@ struct InternalBlobVal {
impl InternalBlobVal {
fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
BlobVal {
- blob_ref: bref.clone(),
+ blob_ref: bref.clone(),
meta: self.metadata.clone(),
value: self.data.clone(),
}
@@ -113,7 +113,7 @@ impl IBuilder for MemBuilder {
row: self.row.clone(),
blob: self.blob.clone(),
}))
- }
+ }
fn unique(&self) -> UnicityBuffer {
UnicityBuffer(self.unicity.clone())
@@ -170,24 +170,32 @@ impl IStore for MemStore {
let store = self.row.read().or(Err(StorageError::Internal))?;
match select {
- Selector::Range { shard, sort_begin, sort_end } => {
- Ok(store
- .get(*shard)
- .unwrap_or(&BTreeMap::new())
- .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string())))
- .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
- .collect::<Vec<_>>())
- },
+ Selector::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => Ok(store
+ .get(*shard)
+ .unwrap_or(&BTreeMap::new())
+ .range((
+ Included(sort_begin.to_string()),
+ Excluded(sort_end.to_string()),
+ ))
+ .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
+ .collect::<Vec<_>>()),
Selector::List(rlist) => {
let mut acc = vec![];
for row_ref in rlist {
- let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten();
+ let maybe_intval = store
+ .get(&row_ref.uid.shard)
+ .map(|v| v.get(&row_ref.uid.sort))
+ .flatten();
if let Some(intval) = maybe_intval {
acc.push(intval.to_row_val(row_ref.clone()));
}
}
Ok(acc)
- },
+ }
Selector::Prefix { shard, sort_prefix } => {
let last_bound = prefix_last_bound(sort_prefix);
@@ -197,13 +205,13 @@ impl IStore for MemStore {
.range((Included(sort_prefix.to_string()), last_bound))
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
.collect::<Vec<_>>())
- },
+ }
Selector::Single(row_ref) => {
let intval = store
- .get(&row_ref.uid.shard)
- .ok_or(StorageError::NotFound)?
- .get(&row_ref.uid.sort)
- .ok_or(StorageError::NotFound)?;
+ .get(&row_ref.uid.shard)
+ .ok_or(StorageError::NotFound)?
+ .get(&row_ref.uid.sort)
+ .ok_or(StorageError::NotFound)?;
Ok(vec![intval.to_row_val((*row_ref).clone())])
}
}
@@ -213,7 +221,12 @@ impl IStore for MemStore {
tracing::trace!(select=%select, command="row_rm");
let values = match select {
- Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(),
+ Selector::Range { .. } | Selector::Prefix { .. } => self
+ .row_fetch(select)
+ .await?
+ .into_iter()
+ .map(|rv| rv.row_ref)
+ .collect::<Vec<_>>(),
Selector::List(rlist) => rlist.clone(),
Selector::Single(row_ref) => vec![(*row_ref).clone()],
};
@@ -282,7 +295,10 @@ impl IStore for MemStore {
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
tracing::trace!(entry=%blob_ref, command="blob_fetch");
let store = self.blob.read().or(Err(StorageError::Internal))?;
- store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
+ store
+ .get(&blob_ref.0)
+ .ok_or(StorageError::NotFound)
+ .map(|v| v.to_blob_val(blob_ref))
}
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
@@ -300,10 +316,13 @@ impl IStore for MemStore {
Ok(())
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- tracing::trace!(prefix=prefix, command="blob_list");
+ tracing::trace!(prefix = prefix, command = "blob_list");
let store = self.blob.read().or(Err(StorageError::Internal))?;
let last_bound = prefix_last_bound(prefix);
- let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>();
+ let blist = store
+ .range((Included(prefix.to_string()), last_bound))
+ .map(|(k, _)| BlobRef(k.to_string()))
+ .collect::<Vec<_>>();
Ok(blist)
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index c81ffe4..1f86f71 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -8,13 +8,13 @@
* into the object system so it is not exposed.
*/
-pub mod in_memory;
pub mod garage;
+pub mod in_memory;
-use std::sync::Arc;
-use std::hash::Hash;
-use std::collections::HashMap;
use async_trait::async_trait;
+use std::collections::HashMap;
+use std::hash::Hash;
+use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum Alternative {
@@ -52,14 +52,18 @@ pub struct RowRef {
}
impl std::fmt::Display for RowRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "RowRef({}, {}, {:?})", self.uid.shard, self.uid.sort, self.causality)
+ write!(
+ f,
+ "RowRef({}, {}, {:?})",
+ self.uid.shard, self.uid.sort, self.causality
+ )
}
}
impl RowRef {
pub fn new(shard: &str, sort: &str) -> Self {
Self {
- uid: RowUid {
+ uid: RowUid {
shard: shard.to_string(),
sort: sort.to_string(),
},
@@ -87,7 +91,6 @@ impl RowVal {
}
}
-
#[derive(Debug, Clone)]
pub struct BlobRef(pub String);
impl std::fmt::Display for BlobRef {
@@ -105,7 +108,8 @@ pub struct BlobVal {
impl BlobVal {
pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self {
Self {
- blob_ref, value,
+ blob_ref,
+ value,
meta: HashMap::new(),
}
}
@@ -118,16 +122,27 @@ impl BlobVal {
#[derive(Debug)]
pub enum Selector<'a> {
- Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
- List (Vec<RowRef>), // list of (shard_key, sort_key)
+ Range {
+ shard: &'a str,
+ sort_begin: &'a str,
+ sort_end: &'a str,
+ },
+ List(Vec<RowRef>), // list of (shard_key, sort_key)
#[allow(dead_code)]
- Prefix { shard: &'a str, sort_prefix: &'a str },
+ Prefix {
+ shard: &'a str,
+ sort_prefix: &'a str,
+ },
Single(&'a RowRef),
}
impl<'a> std::fmt::Display for Selector<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
- Self::Range { shard, sort_begin, sort_end } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end),
+ Self::Range {
+ shard,
+ sort_begin,
+ sort_end,
+ } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end),
Self::List(list) => write!(f, "List({:?})", list),
Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix),
Self::Single(row_ref) => write!(f, "Single({})", row_ref),
@@ -149,7 +164,7 @@ pub trait IStore {
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
}
-#[derive(Clone,Debug,PartialEq,Eq,Hash)]
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct UnicityBuffer(Vec<u8>);
#[async_trait]