diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bayou.rs | 54 | ||||
-rw-r--r-- | src/login/ldap_provider.rs | 53 | ||||
-rw-r--r-- | src/login/mod.rs | 70 | ||||
-rw-r--r-- | src/login/static_provider.rs | 63 | ||||
-rw-r--r-- | src/mail/incoming.rs | 21 | ||||
-rw-r--r-- | src/mail/mailbox.rs | 64 | ||||
-rw-r--r-- | src/mail/user.rs | 6 | ||||
-rw-r--r-- | src/main.rs | 179 | ||||
-rw-r--r-- | src/server.rs | 19 | ||||
-rw-r--r-- | src/storage/garage.rs | 252 | ||||
-rw-r--r-- | src/storage/in_memory.rs | 71 | ||||
-rw-r--r-- | src/storage/mod.rs | 41 |
12 files changed, 546 insertions, 347 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 1361e49..7253a30 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -9,9 +9,8 @@ use tokio::sync::{watch, Notify}; use crate::cryptoblob::*; use crate::login::Credentials; -use crate::timestamp::*; use crate::storage; - +use crate::timestamp::*; const KEEP_STATE_EVERY: usize = 64; @@ -94,7 +93,11 @@ impl<S: BayouState> Bayou<S> { } else { debug!("(sync) loading checkpoint: {}", key); - let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value; + let buf = self + .storage + .blob_fetch(&storage::BlobRef(key.to_string())) + .await? + .value; debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; @@ -125,17 +128,22 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self.storage.row_fetch(&storage::Selector::Range { - shard: &self.path, - sort_begin: &ts_ser, - sort_end: WATCH_SK - }).await?; + let ops_map = self + .storage + .row_fetch(&storage::Selector::Range { + shard: &self.path, + sort_begin: &ts_ser, + sort_end: WATCH_SK, + }) + .await?; let mut ops = vec![]; for row_value in ops_map { let row = row_value.row_ref; let sort_key = row.uid.sort; - let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; + let ts = sort_key + .parse::<Timestamp>() + .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; let val = row_value.value; if val.len() != 1 { @@ -211,7 +219,7 @@ impl<S: BayouState> Bayou<S> { // Save info that sync has been done self.last_sync = new_last_sync; - self.last_sync_watch_ct = new_last_sync_watch_ct; + self.last_sync_watch_ct = new_last_sync_watch_ct; Ok(()) } @@ -362,16 +370,20 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?; + self.storage + .blob_rm(&storage::BlobRef(key.to_string())) + .await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); - self.storage.row_rm(&storage::Selector::Range { - shard: &self.path, - sort_begin: "", - sort_end: &ts_ser - }).await? + self.storage + .row_rm(&storage::Selector::Range { + shard: &self.path, + sort_begin: "", + sort_end: &ts_ser, + }) + .await? } Ok(()) @@ -426,11 +438,7 @@ impl K2vWatch { let watch = Arc::new(K2vWatch { target, rx, notify }); - tokio::spawn(Self::background_task( - Arc::downgrade(&watch), - storage, - tx, - )); + tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); Ok(watch) } @@ -444,8 +452,8 @@ impl K2vWatch { Some(this) => this.target.clone(), None => { error!("can't start loop"); - return - }, + return; + } }; while let Some(this) = Weak::upgrade(&self_weak) { diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index a7f56e4..81e5879 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -30,7 +30,10 @@ enum BucketSource { enum StorageSpecific { InMemory, - Garage { from_config: LdapGarageConfig, bucket_source: BucketSource }, + Garage { + from_config: LdapGarageConfig, + bucket_source: BucketSource, + }, } impl LdapLoginProvider { @@ -57,22 +60,24 @@ impl LdapLoginProvider { let specific = match config.storage { LdapStorage::InMemory => StorageSpecific::InMemory, LdapStorage::Garage(grgconf) => { - let bucket_source = match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) { - (Some(b), None) => BucketSource::Constant(b), - (None, Some(a)) => BucketSource::Attr(a), - _ => bail!("Must set `bucket` or `bucket_attr`, but not both"), - }; + let bucket_source = + match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) { + (Some(b), None) => BucketSource::Constant(b), + (None, Some(a)) => BucketSource::Attr(a), + _ => bail!("Must set `bucket` or `bucket_attr`, but not both"), + }; if let BucketSource::Attr(a) = &bucket_source { attrs_to_retrieve.push(a.clone()); } - StorageSpecific::Garage { from_config: grgconf, bucket_source } - }, + StorageSpecific::Garage { + from_config: grgconf, + bucket_source, + } + } }; - - Ok(Self { ldap_server: config.ldap_server, pre_bind_on_login: config.pre_bind_on_login, @@ -89,27 +94,32 @@ impl LdapLoginProvider { async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> { let storage: Builder = match &self.storage_specific { - StorageSpecific::InMemory => self.in_memory_store.builder( - &get_attr(user, &self.username_attr)? - ).await, - StorageSpecific::Garage { from_config, bucket_source } => { + StorageSpecific::InMemory => { + self.in_memory_store + .builder(&get_attr(user, &self.username_attr)?) + .await + } + StorageSpecific::Garage { + from_config, + bucket_source, + } => { let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; - let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?; + let aws_secret_access_key = + get_attr(user, &from_config.aws_secret_access_key_attr)?; let bucket = match bucket_source { BucketSource::Constant(b) => b.clone(), BucketSource::Attr(a) => get_attr(user, &a)?, }; - storage::garage::GarageBuilder::new(storage::garage::GarageConf { - region: from_config.aws_region.clone(), + region: from_config.aws_region.clone(), s3_endpoint: from_config.s3_endpoint.clone(), k2v_endpoint: from_config.k2v_endpoint.clone(), - aws_access_key_id, - aws_secret_access_key, + aws_access_key_id, + aws_secret_access_key, bucket, })? - }, + } }; Ok(storage) @@ -172,7 +182,6 @@ impl LoginProvider for LdapLoginProvider { drop(ldap); - Ok(Credentials { storage, keys }) } @@ -215,7 +224,7 @@ impl LoginProvider for LdapLoginProvider { let cr = CryptoRoot(crstr); let public_key = cr.public_key()?; - // storage + // storage let storage = self.storage_creds_from_ldap_user(&user).await?; drop(ldap); diff --git a/src/login/mod.rs b/src/login/mod.rs index 3369ac2..2926738 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,8 +1,8 @@ pub mod ldap_provider; pub mod static_provider; -use std::sync::Arc; use base64::Engine; +use std::sync::Arc; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; @@ -45,7 +45,7 @@ pub struct PublicCredentials { pub public_key: PublicKey, } -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct CryptoRoot(pub String); @@ -73,47 +73,59 @@ impl CryptoRoot { pub fn public_key(&self) -> Result<PublicKey> { match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { - [ "aero", "cryptoroot", "pass", b64blob ] => { + ["aero", "cryptoroot", "pass", b64blob] => { let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; if blob.len() < 32 { - bail!("Decoded data is {} bytes long, expect at least 32 bytes", blob.len()); + bail!( + "Decoded data is {} bytes long, expect at least 32 bytes", + blob.len() + ); } PublicKey::from_slice(&blob[..32]).context("must be a valid public key") - }, - [ "aero", "cryptoroot", "cleartext", b64blob ] => { + } + ["aero", "cryptoroot", "cleartext", b64blob] => { let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; Ok(CryptoKeys::deserialize(&blob)?.public) - }, - [ "aero", "cryptoroot", "incoming", b64blob ] => { + } + ["aero", "cryptoroot", "incoming", b64blob] => { let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; if blob.len() < 32 { - bail!("Decoded data is {} bytes long, expect at least 32 bytes", blob.len()); + bail!( + "Decoded data is {} bytes long, expect at least 32 bytes", + blob.len() + ); } PublicKey::from_slice(&blob[..32]).context("must be a valid public key") - }, - [ "aero", "cryptoroot", "keyring", _ ] => { + } + ["aero", "cryptoroot", "keyring", _] => { bail!("keyring is not yet implemented!") - }, - _ => bail!(format!("passed string '{}' is not a valid cryptoroot", self.0)), + } + _ => bail!(format!( + "passed string '{}' is not a valid cryptoroot", + self.0 + )), } } pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> { match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { - [ "aero", "cryptoroot", "pass", b64blob ] => { + ["aero", "cryptoroot", "pass", b64blob] => { let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; CryptoKeys::password_open(password, &blob) - }, - [ "aero", "cryptoroot", "cleartext", b64blob ] => { + } + ["aero", "cryptoroot", "cleartext", b64blob] => { let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; CryptoKeys::deserialize(&blob) - }, - [ "aero", "cryptoroot", "incoming", _ ] => { + } + ["aero", "cryptoroot", "incoming", _] => { bail!("incoming cryptoroot does not contain a crypto key!") - }, - [ "aero", "cryptoroot", "keyring", _ ] =>{ + } + ["aero", "cryptoroot", "keyring", _] => { bail!("keyring is not yet implemented!") - }, - _ => bail!(format!("passed string '{}' is not a valid cryptoroot", self.0)), + } + _ => bail!(format!( + "passed string '{}' is not a valid cryptoroot", + self.0 + )), } } } @@ -132,9 +144,6 @@ pub struct CryptoKeys { // ---- - - - impl CryptoKeys { /// Initialize a new cryptography root pub fn init() -> Self { @@ -202,7 +211,11 @@ fn derive_password_key(kdf_salt: &[u8], password: &str) -> Result<Key> { Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap()) } -fn try_open_encrypted_keys(kdf_salt: &[u8], password: &str, encrypted_keys: &[u8]) -> Result<Vec<u8>> { +fn try_open_encrypted_keys( + kdf_salt: &[u8], + password: &str, + encrypted_keys: &[u8], +) -> Result<Vec<u8>> { let password_key = derive_password_key(kdf_salt, password)?; open(encrypted_keys, &password_key) } @@ -210,7 +223,7 @@ fn try_open_encrypted_keys(kdf_salt: &[u8], password: &str, encrypted_keys: &[u8 // ---- UTIL ---- pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> { - use argon2::{Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version, password_hash}; + use argon2::{password_hash, Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version}; let params = ParamsBuilder::new() .output_len(output_len) @@ -219,7 +232,8 @@ pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec let argon2 = Argon2::new(Algorithm::default(), Version::default(), params); let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt); - let valid_salt = password_hash::Salt::from_b64(&b64_salt).map_err(|e| anyhow!("Invalid salt, error {}", e))?; + let valid_salt = password_hash::Salt::from_b64(&b64_salt) + .map_err(|e| anyhow!("Invalid salt, error {}", e))?; let hash = argon2 .hash_password(password, valid_salt) .map_err(|e| anyhow!("Unable to hash: {}", e))?; diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index b11123c..1e1ecbf 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use std::sync::Arc; use std::path::PathBuf; -use tokio::sync::watch; +use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::watch; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; @@ -28,7 +28,8 @@ pub struct StaticLoginProvider { } pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> { - let mut stream = signal(SignalKind::user_defined1()).expect("failed to install SIGUSR1 signal hander for reload"); + let mut stream = signal(SignalKind::user_defined1()) + .expect("failed to install SIGUSR1 signal hander for reload"); loop { let ulist: UserList = match read_config(config.clone()) { @@ -42,7 +43,12 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) let users = ulist .into_iter() - .map(|(username, config)| (username.clone() , Arc::new(ContextualUserEntry { username, config }))) + .map(|(username, config)| { + ( + username.clone(), + Arc::new(ContextualUserEntry { username, config }), + ) + }) .collect::<HashMap<_, _>>(); let mut users_by_email = HashMap::new(); @@ -51,14 +57,18 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) if users_by_email.contains_key(m) { tracing::warn!("Several users have the same email address: {}", m); stream.recv().await; - continue + continue; } users_by_email.insert(m.clone(), u.clone()); } } tracing::info!("{} users loaded", users.len()); - up.send(UserDatabase { users, users_by_email }).context("update user db config")?; + up.send(UserDatabase { + users, + users_by_email, + }) + .context("update user db config")?; stream.recv().await; tracing::info!("Received SIGUSR1, reloading"); } @@ -71,7 +81,10 @@ impl StaticLoginProvider { tokio::spawn(update_user_list(config.user_list, tx)); rx.changed().await?; - Ok(Self { user_db: rx, in_memory_store: storage::in_memory::MemDb::new() }) + Ok(Self { + user_db: rx, + in_memory_store: storage::in_memory::MemDb::new(), + }) } } @@ -95,14 +108,16 @@ impl LoginProvider for StaticLoginProvider { tracing::debug!(user=%username, "fetch keys"); let storage: storage::Builder = match &user.config.storage { StaticStorage::InMemory => self.in_memory_store.builder(username).await, - StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf { - region: grgconf.aws_region.clone(), - k2v_endpoint: grgconf.k2v_endpoint.clone(), - s3_endpoint: grgconf.s3_endpoint.clone(), - aws_access_key_id: grgconf.aws_access_key_id.clone(), - aws_secret_access_key: grgconf.aws_secret_access_key.clone(), - bucket: grgconf.bucket.clone(), - })?, + StaticStorage::Garage(grgconf) => { + storage::garage::GarageBuilder::new(storage::garage::GarageConf { + region: grgconf.aws_region.clone(), + k2v_endpoint: grgconf.k2v_endpoint.clone(), + s3_endpoint: grgconf.s3_endpoint.clone(), + aws_access_key_id: grgconf.aws_access_key_id.clone(), + aws_secret_access_key: grgconf.aws_secret_access_key.clone(), + bucket: grgconf.bucket.clone(), + })? + } }; let cr = CryptoRoot(user.config.crypto_root.clone()); @@ -124,14 +139,16 @@ impl LoginProvider for StaticLoginProvider { let storage: storage::Builder = match &user.config.storage { StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await, - StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf { - region: grgconf.aws_region.clone(), - k2v_endpoint: grgconf.k2v_endpoint.clone(), - s3_endpoint: grgconf.s3_endpoint.clone(), - aws_access_key_id: grgconf.aws_access_key_id.clone(), - aws_secret_access_key: grgconf.aws_secret_access_key.clone(), - bucket: grgconf.bucket.clone(), - })?, + StaticStorage::Garage(grgconf) => { + storage::garage::GarageBuilder::new(storage::garage::GarageConf { + region: grgconf.aws_region.clone(), + k2v_endpoint: grgconf.k2v_endpoint.clone(), + s3_endpoint: grgconf.s3_endpoint.clone(), + aws_access_key_id: grgconf.aws_access_key_id.clone(), + aws_secret_access_key: grgconf.aws_secret_access_key.clone(), + bucket: grgconf.bucket.clone(), + })? + } }; let cr = CryptoRoot(user.config.crypto_root.clone()); diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 7e33a9a..04d2ef1 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -51,7 +51,10 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); let storage = creds.storage.build().await?; let mut inbox: Option<Arc<Mailbox>> = None; @@ -63,8 +66,7 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match storage.row_poll(&incoming_key).await - { + match storage.row_poll(&incoming_key).await { Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); @@ -360,7 +362,10 @@ async fn k2v_lock_loop_internal( Some(existing) => existing, None => row_ref.clone(), }; - if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -428,14 +433,12 @@ impl EncryptedMessage { let blob_val = storage::BlobVal::new( storage::BlobRef(format!("incoming/{}", gen_ident())), self.encrypted_body.clone().into(), - ).with_meta(MESSAGE_KEY.to_string(), key_header); + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); storage.blob_insert(blob_val).await?; // Update watch key to signal new mail - let watch_val = storage::RowVal::new( - watch_ct.clone(), - gen_ident().0.to_vec(), - ); + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); storage.row_insert(vec![watch_val]).await?; Ok(()) } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 6fb7dea..e424ba3 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -196,7 +196,10 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); - let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>(); + let ops = ids + .iter() + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) + .collect::<Vec<_>>(); let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; @@ -231,7 +234,10 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { - let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; let body = obj_res.value; cryptoblob::open(&body, message_key) } @@ -266,10 +272,12 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.storage.blob_insert(BlobVal::new( - BlobRef(format!("{}/{}", self.mail_path, ident)), - message_blob, - )).await?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; Ok::<_, anyhow::Error>(()) }, async { @@ -281,10 +289,12 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &ident.to_string()), - meta_blob, - )]).await?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -328,10 +338,12 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( + self.storage + .row_insert(vec![RowVal::new( RowRef::new(&self.mail_path, &ident.to_string()), meta_blob, - )]).await?; + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -355,17 +367,25 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let res = self.storage - .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) .await?; if let Some(row_val) = res.into_iter().next() { - self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?; + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; } Ok::<_, anyhow::Error>(()) } @@ -421,10 +441,12 @@ impl MailboxInternal { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.storage.row_insert(vec![RowVal::new( - RowRef::new(&self.mail_path, &new_id.to_string()), - meta_blob, - )]).await?; + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) + .await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/mail/user.rs b/src/mail/user.rs index 8d12c58..da0d509 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -226,7 +226,11 @@ impl User { async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> { let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); - let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), Ok(rv) => { diff --git a/src/main.rs b/src/main.rs index f08f1a3..3221c2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ #![feature(async_fn_in_trait)] -mod timestamp; mod bayou; mod config; mod cryptoblob; @@ -11,17 +10,18 @@ mod login; mod mail; mod server; mod storage; +mod timestamp; -use std::path::PathBuf; use std::io::Read; +use std::path::PathBuf; -use anyhow::{bail, Result, Context}; +use anyhow::{bail, Context, Result}; use clap::{Parser, Subcommand}; -use nix::{unistd::Pid, sys::signal}; +use nix::{sys::signal, unistd::Pid}; use config::*; -use server::Server; use login::{static_provider::*, *}; +use server::Server; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -58,7 +58,7 @@ enum ToolsCommand { PasswordHash { #[clap(env = "AEROGRAMME_PASSWORD")] maybe_password: Option<String>, - } + }, } #[derive(Subcommand, Debug)] @@ -138,7 +138,7 @@ enum AccountManagement { maybe_new_password: Option<String>, #[clap(short, long)] - login: String + login: String, }, } @@ -165,11 +165,11 @@ async fn main() -> Result<()> { CompanionCommand::Daemon => { let server = Server::from_companion_config(config).await?; server.run().await?; - }, + } CompanionCommand::Reload { pid } => reload(*pid, config.pid)?, CompanionCommand::Wizard => { unimplemented!(); - }, + } CompanionCommand::Account(cmd) => { let user_file = config.users.user_list; account_management(&args.command, cmd, user_file)?; @@ -179,22 +179,24 @@ async fn main() -> Result<()> { ProviderCommand::Daemon => { let server = Server::from_provider_config(config).await?; server.run().await?; - }, + } ProviderCommand::Reload { pid } => reload(*pid, config.pid)?, ProviderCommand::Account(cmd) => { let user_file = match config.users { UserManagement::Static(conf) => conf.user_list, - UserManagement::Ldap(_) => panic!("LDAP account management is not supported from Aerogramme.") + UserManagement::Ldap(_) => { + panic!("LDAP account management is not supported from Aerogramme.") + } }; account_management(&args.command, cmd, user_file)?; } }, (Command::Provider(_), AnyConfig::Companion(_)) => { bail!("Your want to run a 'Provider' command but your configuration file has role 'Companion'."); - }, + } (Command::Companion(_), AnyConfig::Provider(_)) => { bail!("Your want to run a 'Companion' command but your configuration file has role 'Provider'."); - }, + } (Command::Tools(subcommand), _) => match subcommand { ToolsCommand::PasswordHash { maybe_password } => { let password = match maybe_password { @@ -202,60 +204,64 @@ async fn main() -> Result<()> { None => rpassword::prompt_password("Enter password: ")?, }; println!("{}", hash_password(&password)?); - }, - ToolsCommand::CryptoRoot(crcommand) => { - match crcommand { - CryptoRootCommand::New { maybe_password } => { - let password = match maybe_password { - Some(pwd) => pwd.clone(), - None => { - let password = rpassword::prompt_password("Enter password: ")?; - let password_confirm = rpassword::prompt_password("Confirm password: ")?; - if password != password_confirm { - bail!("Passwords don't match."); - } - password + } + ToolsCommand::CryptoRoot(crcommand) => match crcommand { + CryptoRootCommand::New { maybe_password } => { + let password = match maybe_password { + Some(pwd) => pwd.clone(), + None => { + let password = rpassword::prompt_password("Enter password: ")?; + let password_confirm = + rpassword::prompt_password("Confirm password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); } - }; - let crypto_keys = CryptoKeys::init(); - let cr = CryptoRoot::create_pass(&password, &crypto_keys)?; - println!("{}", cr.0); - }, - CryptoRootCommand::NewClearText => { - let crypto_keys = CryptoKeys::init(); - let cr = CryptoRoot::create_cleartext(&crypto_keys); - println!("{}", cr.0); - }, - CryptoRootCommand::ChangePassword { maybe_old_password, maybe_new_password, crypto_root } => { - let old_password = match maybe_old_password { - Some(pwd) => pwd.to_string(), - None => rpassword::prompt_password("Enter old password: ")?, - }; - - let new_password = match maybe_new_password { - Some(pwd) => pwd.to_string(), - None => { - let password = rpassword::prompt_password("Enter new password: ")?; - let password_confirm = rpassword::prompt_password("Confirm new password: ")?; - if password != password_confirm { - bail!("Passwords don't match."); - } - password + password + } + }; + let crypto_keys = CryptoKeys::init(); + let cr = CryptoRoot::create_pass(&password, &crypto_keys)?; + println!("{}", cr.0); + } + CryptoRootCommand::NewClearText => { + let crypto_keys = CryptoKeys::init(); + let cr = CryptoRoot::create_cleartext(&crypto_keys); + println!("{}", cr.0); + } + CryptoRootCommand::ChangePassword { + maybe_old_password, + maybe_new_password, + crypto_root, + } => { + let old_password = match maybe_old_password { + Some(pwd) => pwd.to_string(), + None => rpassword::prompt_password("Enter old password: ")?, + }; + + let new_password = match maybe_new_password { + Some(pwd) => pwd.to_string(), + None => { + let password = rpassword::prompt_password("Enter new password: ")?; + let password_confirm = + rpassword::prompt_password("Confirm new password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); } - }; - - let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?; - let cr = CryptoRoot::create_pass(&new_password, &keys)?; - println!("{}", cr.0); - }, - CryptoRootCommand::DeriveIncoming { crypto_root } => { - let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?; - let cr = CryptoRoot::create_incoming(&pubkey); - println!("{}", cr.0); - }, + password + } + }; + + let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?; + let cr = CryptoRoot::create_pass(&new_password, &keys)?; + println!("{}", cr.0); + } + CryptoRootCommand::DeriveIncoming { crypto_root } => { + let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?; + let cr = CryptoRoot::create_incoming(&pubkey); + println!("{}", cr.0); } }, - } + }, } Ok(()) @@ -264,12 +270,12 @@ async fn main() -> Result<()> { fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> { let final_pid = match (pid, pid_path) { (Some(pid), _) => pid, - (_, Some(path)) => { + (_, Some(path)) => { let mut f = std::fs::OpenOptions::new().read(true).open(path)?; let mut pidstr = String::new(); f.read_to_string(&mut pidstr)?; pidstr.parse::<i32>()? - }, + } _ => bail!("Unable to infer your daemon's PID"), }; let pid = Pid::from_raw(final_pid); @@ -278,13 +284,15 @@ fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> { } fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -> Result<()> { - let mut ulist: UserList = read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?; + let mut ulist: UserList = + read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?; match cmd { AccountManagement::Add { login, setup } => { - tracing::debug!(user=login, "will-create"); - let stp: SetupEntry = read_config(setup.clone()).context(format!("'{:?}' must be a setup file", setup))?; - tracing::debug!(user=login, "loaded setup entry"); + tracing::debug!(user = login, "will-create"); + let stp: SetupEntry = read_config(setup.clone()) + .context(format!("'{:?}' must be a setup file", setup))?; + tracing::debug!(user = login, "loaded setup entry"); let password = match stp.clear_password { Some(pwd) => pwd, @@ -307,21 +315,28 @@ fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) - let hash = hash_password(password.as_str()).context("unable to hash password")?; - ulist.insert(login.clone(), UserEntry { - email_addresses: stp.email_addresses, - password: hash, - crypto_root: crypto_root.0, - storage: stp.storage, - }); + ulist.insert( + login.clone(), + UserEntry { + email_addresses: stp.email_addresses, + password: hash, + crypto_root: crypto_root.0, + storage: stp.storage, + }, + ); write_config(users.clone(), &ulist)?; - }, + } AccountManagement::Delete { login } => { - tracing::debug!(user=login, "will-delete"); + tracing::debug!(user = login, "will-delete"); ulist.remove(login); write_config(users.clone(), &ulist)?; - }, - AccountManagement::ChangePassword { maybe_old_password, maybe_new_password, login } => { + } + AccountManagement::ChangePassword { + maybe_old_password, + maybe_new_password, + login, + } => { let mut user = ulist.remove(login).context("user must exist first")?; let old_password = match maybe_old_password { @@ -345,16 +360,16 @@ fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) - } password } - }; + }; let new_hash = hash_password(&new_password)?; let new_crypto_root = CryptoRoot::create_pass(&new_password, &crypto_keys)?; - + user.password = new_hash; user.crypto_root = new_crypto_root.0; ulist.insert(login.clone(), user); write_config(users.clone(), &ulist)?; - }, + } }; Ok(()) diff --git a/src/server.rs b/src/server.rs index 552a0e6..28e0b27 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; -use std::path::PathBuf; use std::io::Write; +use std::path::PathBuf; +use std::sync::Arc; use anyhow::Result; use futures::try_join; @@ -26,7 +26,11 @@ impl Server { let lmtp_server = None; let imap_server = Some(imap::new(config.imap, login.clone()).await?); - Ok(Self { lmtp_server, imap_server, pid_file: config.pid }) + Ok(Self { + lmtp_server, + imap_server, + pid_file: config.pid, + }) } pub async fn from_provider_config(config: ProviderConfig) -> Result<Self> { @@ -39,12 +43,16 @@ impl Server { let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone())); let imap_server = Some(imap::new(config.imap, login.clone()).await?); - Ok(Self { lmtp_server, imap_server, pid_file: config.pid }) + Ok(Self { + lmtp_server, + imap_server, + pid_file: config.pid, + }) } pub async fn run(self) -> Result<()> { let pid = std::process::id(); - tracing::info!(pid=pid, "Starting main loops"); + tracing::info!(pid = pid, "Starting main loops"); // write the pid file if let Some(pid_file) = self.pid_file { @@ -57,7 +65,6 @@ impl Server { drop(file); } - let (exit_signal, provoke_exit) = watch_ctrl_c(); let _exit_on_err = move |err: anyhow::Error| { error!("Error: {}", err); diff --git a/src/storage/garage.rs b/src/storage/garage.rs index f9ba756..d08585f 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,10 +1,6 @@ use crate::storage::*; +use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; use serde::Serialize; -use aws_sdk_s3::{ - self as s3, - error::SdkError, - operation::get_object::GetObjectError, -}; #[derive(Clone, Debug, Serialize)] pub struct GarageConf { @@ -28,18 +24,18 @@ impl GarageBuilder { unicity.extend_from_slice(file!().as_bytes()); unicity.append(&mut rmp_serde::to_vec(&conf)?); Ok(Arc::new(Self { conf, unicity })) - } + } } #[async_trait] impl IBuilder for GarageBuilder { async fn build(&self) -> Result<Store, StorageError> { let s3_creds = s3::config::Credentials::new( - self.conf.aws_access_key_id.clone(), - self.conf.aws_secret_access_key.clone(), - None, - None, - "aerogramme" + self.conf.aws_access_key_id.clone(), + self.conf.aws_secret_access_key.clone(), + None, + None, + "aerogramme", ); let s3_config = aws_config::from_env() @@ -51,12 +47,12 @@ impl IBuilder for GarageBuilder { let s3_client = aws_sdk_s3::Client::new(&s3_config); let k2v_config = k2v_client::K2vClientConfig { - endpoint: self.conf.k2v_endpoint.clone(), - region: self.conf.region.clone(), - aws_access_key_id: self.conf.aws_access_key_id.clone(), - aws_secret_access_key: self.conf.aws_secret_access_key.clone(), - bucket: self.conf.bucket.clone(), - user_agent: None, + endpoint: self.conf.k2v_endpoint.clone(), + region: self.conf.region.clone(), + aws_access_key_id: self.conf.aws_access_key_id.clone(), + aws_secret_access_key: self.conf.aws_secret_access_key.clone(), + bucket: self.conf.bucket.clone(), + user_agent: None, }; let k2v_client = match k2v_client::K2vClient::new(k2v_config) { @@ -67,7 +63,7 @@ impl IBuilder for GarageBuilder { Ok(v) => v, }; - Ok(Box::new(GarageStore { + Ok(Box::new(GarageStore { bucket: self.conf.bucket.clone(), s3: s3_client, k2v: k2v_client, @@ -86,19 +82,30 @@ pub struct GarageStore { fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { let new_row_ref = row_ref.with_causality(causal_value.causality.into()); - let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value { - k2v_client::K2vValue::Tombstone => Alternative::Tombstone, - k2v_client::K2vValue::Value(v) => Alternative::Value(v), - }).collect::<Vec<_>>(); + let row_values = causal_value + .value + .into_iter() + .map(|k2v_value| match k2v_value { + k2v_client::K2vValue::Tombstone => Alternative::Tombstone, + k2v_client::K2vValue::Value(v) => Alternative::Value(v), + }) + .collect::<Vec<_>>(); - RowVal { row_ref: new_row_ref, value: row_values } + RowVal { + row_ref: new_row_ref, + value: row_values, + } } #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { let (pk_list, batch_op) = match select { - Selector::Range { shard, sort_begin, sort_end } => ( + Selector::Range { + shard, + sort_begin, + sort_end, + } => ( vec![shard.to_string()], vec![k2v_client::BatchReadOp { partition_key: shard, @@ -108,49 +115,71 @@ impl IStore for GarageStore { ..k2v_client::Filter::default() }, ..k2v_client::BatchReadOp::default() - }] + }], ), Selector::List(row_ref_list) => ( - row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::<Vec<_>>(), - row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { - partition_key: &row_ref.uid.shard, + row_ref_list + .iter() + .map(|row_ref| row_ref.uid.shard.to_string()) + .collect::<Vec<_>>(), + row_ref_list + .iter() + .map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }) + .collect::<Vec<_>>(), + ), + Selector::Prefix { shard, sort_prefix } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, filter: k2v_client::Filter { - start: Some(&row_ref.uid.sort), + prefix: Some(sort_prefix), ..k2v_client::Filter::default() }, - single_item: true, ..k2v_client::BatchReadOp::default() - }).collect::<Vec<_>>() + }], ), - Selector::Prefix { shard, sort_prefix } => ( - vec![shard.to_string()], - vec![k2v_client::BatchReadOp { - partition_key: shard, - filter: k2v_client::Filter { - prefix: Some(sort_prefix), - ..k2v_client::Filter::default() - }, - ..k2v_client::BatchReadOp::default() - }]), Selector::Single(row_ref) => { - let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await { + let causal_value = match self + .k2v + .read_item(&row_ref.uid.shard, &row_ref.uid.sort) + .await + { Err(e) => { - tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e); + tracing::error!( + "K2V read item shard={}, sort={}, bucket={} failed: {}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + e + ); return Err(StorageError::Internal); - }, + } Ok(v) => v, }; let row_val = causal_to_row_val((*row_ref).clone(), causal_value); - return Ok(vec![row_val]) - }, + return Ok(vec![row_val]); + } }; let all_raw_res = match self.k2v.read_batch(&batch_op).await { Err(e) => { - tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); + tracing::error!( + "k2v read batch failed for {:?}, bucket {} with err: {}", + select, + self.bucket, + e + ); return Err(StorageError::Internal); - }, + } Ok(v) => v, }; @@ -163,13 +192,17 @@ impl IStore for GarageStore { .into_iter() .zip(pk_list.into_iter()) .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) - .collect::<Vec<_>>(); + .collect::<Vec<_>>(); Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { let del_op = match select { - Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { + Selector::Range { + shard, + sort_begin, + sort_end, + } => vec![k2v_client::BatchDeleteOp { partition_key: shard, prefix: None, start: Some(sort_begin), @@ -178,21 +211,24 @@ impl IStore for GarageStore { }], Selector::List(row_ref_list) => { // Insert null values with causality token = delete - let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { - partition_key: &v.uid.shard, - sort_key: &v.uid.sort, - causality: v.causality.clone().map(|ct| ct.into()), - value: k2v_client::K2vValue::Tombstone, - }).collect::<Vec<_>>(); - + let batch_op = row_ref_list + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }) + .collect::<Vec<_>>(); + return match self.k2v.insert_batch(&batch_op).await { Err(e) => { tracing::error!("Unable to delete the list of values: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), }; - }, + } Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { partition_key: shard, prefix: Some(sort_prefix), @@ -208,15 +244,15 @@ impl IStore for GarageStore { causality: row_ref.causality.clone().map(|ct| ct.into()), value: k2v_client::K2vValue::Tombstone, }]; - + return match self.k2v.insert_batch(&batch_op).await { Err(e) => { tracing::error!("Unable to delete the list of values: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), }; - }, + } }; // Finally here we only have prefix & range @@ -224,34 +260,46 @@ impl IStore for GarageStore { Err(e) => { tracing::error!("delete batch error: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), } } async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { - let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp { - partition_key: &v.row_ref.uid.shard, - sort_key: &v.row_ref.uid.sort, - causality: v.row_ref.causality.clone().map(|ct| ct.into()), - value: v.value.iter().next().map(|cv| match cv { - Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), - Alternative::Tombstone => k2v_client::K2vValue::Tombstone, - }).unwrap_or(k2v_client::K2vValue::Tombstone) - }).collect::<Vec<_>>(); + let batch_ops = values + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.row_ref.uid.shard, + sort_key: &v.row_ref.uid.sort, + causality: v.row_ref.causality.clone().map(|ct| ct.into()), + value: v + .value + .iter() + .next() + .map(|cv| match cv { + Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), + Alternative::Tombstone => k2v_client::K2vValue::Tombstone, + }) + .unwrap_or(k2v_client::K2vValue::Tombstone), + }) + .collect::<Vec<_>>(); match self.k2v.insert_batch(&batch_ops).await { Err(e) => { tracing::error!("k2v can't insert some value: {}", e); Err(StorageError::Internal) - }, + } Ok(v) => Ok(v), } } async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { loop { if let Some(ct) = &value.causality { - match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await { + match self + .k2v + .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None) + .await + { Err(e) => { tracing::error!("Unable to poll item: {}", e); return Err(StorageError::Internal); @@ -262,8 +310,7 @@ impl IStore for GarageStore { } else { match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { Err(k2v_client::Error::NotFound) => { - self - .k2v + self.k2v .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) .await .map_err(|e| { @@ -273,8 +320,8 @@ impl IStore for GarageStore { } Err(e) => { tracing::error!("Unable to read item in polling logic: {}", e); - return Err(StorageError::Internal) - }, + return Err(StorageError::Internal); + } Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), } } @@ -282,7 +329,8 @@ impl IStore for GarageStore { } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { - let maybe_out = self.s3 + let maybe_out = self + .s3 .get_object() .bucket(self.bucket.to_string()) .key(blob_ref.0.to_string()) @@ -296,12 +344,12 @@ impl IStore for GarageStore { e => { tracing::warn!("Blob Fetch Error, Service Error: {}", e); return Err(StorageError::Internal); - }, + } }, Err(e) => { tracing::warn!("Blob Fetch Error, {}", e); return Err(StorageError::Internal); - }, + } }; let buffer = match object_output.body.collect().await { @@ -316,9 +364,10 @@ impl IStore for GarageStore { Ok(BlobVal::new(blob_ref.clone(), buffer)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { - let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); - let maybe_send = self.s3 + let maybe_send = self + .s3 .put_object() .bucket(self.bucket.to_string()) .key(blob_val.blob_ref.0.to_string()) @@ -338,7 +387,8 @@ impl IStore for GarageStore { } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - let maybe_copy = self.s3 + let maybe_copy = self + .s3 .copy_object() .bucket(self.bucket.to_string()) .key(dst.0.clone()) @@ -348,18 +398,24 @@ impl IStore for GarageStore { match maybe_copy { Err(e) => { - tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e); + tracing::error!( + "unable to copy object {} to {} (bucket: {}), error: {}", + src.0, + dst.0, + self.bucket, + e + ); Err(StorageError::Internal) - }, + } Ok(_) => { tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); Ok(()) } } - } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - let maybe_list = self.s3 + let maybe_list = self + .s3 .list_objects_v2() .bucket(self.bucket.to_string()) .prefix(prefix) @@ -370,7 +426,12 @@ impl IStore for GarageStore { match maybe_list { Err(e) => { - tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e); + tracing::error!( + "listing prefix {} on bucket {} failed: {}", + prefix, + self.bucket, + e + ); Err(StorageError::Internal) } Ok(pagin_list_out) => Ok(pagin_list_out @@ -382,7 +443,8 @@ impl IStore for GarageStore { } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - let maybe_delete = self.s3 + let maybe_delete = self + .s3 .delete_object() .bucket(self.bucket.to_string()) .key(blob_ref.0.clone()) @@ -391,9 +453,14 @@ impl IStore for GarageStore { match maybe_delete { Err(e) => { - tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e); + tracing::error!( + "unable to delete {} (bucket: {}), error {}", + blob_ref.0, + self.bucket, + e + ); Err(StorageError::Internal) - }, + } Ok(_) => { tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); Ok(()) @@ -401,4 +468,3 @@ impl IStore for GarageStore { } } } - diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index ee7c9a6..3c3a94c 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -1,6 +1,6 @@ use crate::storage::*; -use std::collections::{HashMap, BTreeMap}; -use std::ops::Bound::{Included, Unbounded, Excluded, self}; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::sync::{Arc, RwLock}; use tokio::sync::Notify; @@ -16,7 +16,7 @@ impl MemDb { Self(tokio::sync::Mutex::new(HashMap::new())) } - pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { + pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { let mut global_storage = self.0.lock().await; global_storage .entry(username.to_string()) @@ -60,8 +60,8 @@ impl InternalRowVal { } fn to_row_val(&self, row_ref: RowRef) -> RowVal { - RowVal{ - row_ref: row_ref.with_causality(self.version.to_string()), + RowVal { + row_ref: row_ref.with_causality(self.version.to_string()), value: self.concurrent_values(), } } @@ -75,7 +75,7 @@ struct InternalBlobVal { impl InternalBlobVal { fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { BlobVal { - blob_ref: bref.clone(), + blob_ref: bref.clone(), meta: self.metadata.clone(), value: self.data.clone(), } @@ -113,7 +113,7 @@ impl IBuilder for MemBuilder { row: self.row.clone(), blob: self.blob.clone(), })) - } + } fn unique(&self) -> UnicityBuffer { UnicityBuffer(self.unicity.clone()) @@ -170,24 +170,32 @@ impl IStore for MemStore { let store = self.row.read().or(Err(StorageError::Internal))?; match select { - Selector::Range { shard, sort_begin, sort_end } => { - Ok(store - .get(*shard) - .unwrap_or(&BTreeMap::new()) - .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) - .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) - .collect::<Vec<_>>()) - }, + Selector::Range { + shard, + sort_begin, + sort_end, + } => Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range(( + Included(sort_begin.to_string()), + Excluded(sort_end.to_string()), + )) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()), Selector::List(rlist) => { let mut acc = vec![]; for row_ref in rlist { - let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten(); + let maybe_intval = store + .get(&row_ref.uid.shard) + .map(|v| v.get(&row_ref.uid.sort)) + .flatten(); if let Some(intval) = maybe_intval { acc.push(intval.to_row_val(row_ref.clone())); } } Ok(acc) - }, + } Selector::Prefix { shard, sort_prefix } => { let last_bound = prefix_last_bound(sort_prefix); @@ -197,13 +205,13 @@ impl IStore for MemStore { .range((Included(sort_prefix.to_string()), last_bound)) .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::<Vec<_>>()) - }, + } Selector::Single(row_ref) => { let intval = store - .get(&row_ref.uid.shard) - .ok_or(StorageError::NotFound)? - .get(&row_ref.uid.sort) - .ok_or(StorageError::NotFound)?; + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; Ok(vec![intval.to_row_val((*row_ref).clone())]) } } @@ -213,7 +221,12 @@ impl IStore for MemStore { tracing::trace!(select=%select, command="row_rm"); let values = match select { - Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(), + Selector::Range { .. } | Selector::Prefix { .. } => self + .row_fetch(select) + .await? + .into_iter() + .map(|rv| rv.row_ref) + .collect::<Vec<_>>(), Selector::List(rlist) => rlist.clone(), Selector::Single(row_ref) => vec![(*row_ref).clone()], }; @@ -282,7 +295,10 @@ impl IStore for MemStore { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { tracing::trace!(entry=%blob_ref, command="blob_fetch"); let store = self.blob.read().or(Err(StorageError::Internal))?; - store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) + store + .get(&blob_ref.0) + .ok_or(StorageError::NotFound) + .map(|v| v.to_blob_val(blob_ref)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); @@ -300,10 +316,13 @@ impl IStore for MemStore { Ok(()) } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - tracing::trace!(prefix=prefix, command="blob_list"); + tracing::trace!(prefix = prefix, command = "blob_list"); let store = self.blob.read().or(Err(StorageError::Internal))?; let last_bound = prefix_last_bound(prefix); - let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>(); + let blist = store + .range((Included(prefix.to_string()), last_bound)) + .map(|(k, _)| BlobRef(k.to_string())) + .collect::<Vec<_>>(); Ok(blist) } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c81ffe4..1f86f71 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,13 +8,13 @@ * into the object system so it is not exposed. */ -pub mod in_memory; pub mod garage; +pub mod in_memory; -use std::sync::Arc; -use std::hash::Hash; -use std::collections::HashMap; use async_trait::async_trait; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; #[derive(Debug, Clone)] pub enum Alternative { @@ -52,14 +52,18 @@ pub struct RowRef { } impl std::fmt::Display for RowRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "RowRef({}, {}, {:?})", self.uid.shard, self.uid.sort, self.causality) + write!( + f, + "RowRef({}, {}, {:?})", + self.uid.shard, self.uid.sort, self.causality + ) } } impl RowRef { pub fn new(shard: &str, sort: &str) -> Self { Self { - uid: RowUid { + uid: RowUid { shard: shard.to_string(), sort: sort.to_string(), }, @@ -87,7 +91,6 @@ impl RowVal { } } - #[derive(Debug, Clone)] pub struct BlobRef(pub String); impl std::fmt::Display for BlobRef { @@ -105,7 +108,8 @@ pub struct BlobVal { impl BlobVal { pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self { Self { - blob_ref, value, + blob_ref, + value, meta: HashMap::new(), } } @@ -118,16 +122,27 @@ impl BlobVal { #[derive(Debug)] pub enum Selector<'a> { - Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, - List (Vec<RowRef>), // list of (shard_key, sort_key) + Range { + shard: &'a str, + sort_begin: &'a str, + sort_end: &'a str, + }, + List(Vec<RowRef>), // list of (shard_key, sort_key) #[allow(dead_code)] - Prefix { shard: &'a str, sort_prefix: &'a str }, + Prefix { + shard: &'a str, + sort_prefix: &'a str, + }, Single(&'a RowRef), } impl<'a> std::fmt::Display for Selector<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Range { shard, sort_begin, sort_end } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), + Self::Range { + shard, + sort_begin, + sort_end, + } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), Self::List(list) => write!(f, "List({:?})", list), Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), Self::Single(row_ref) => write!(f, "Single({})", row_ref), @@ -149,7 +164,7 @@ pub trait IStore { async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; } -#[derive(Clone,Debug,PartialEq,Eq,Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct UnicityBuffer(Vec<u8>); #[async_trait] |