diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/.server.rs.swo | bin | 0 -> 12288 bytes | |||
-rw-r--r-- | src/.service.rs.swo | bin | 0 -> 12288 bytes | |||
-rw-r--r-- | src/.session.rs.swo | bin | 0 -> 20480 bytes | |||
-rw-r--r-- | src/bayou.rs | 38 | ||||
-rw-r--r-- | src/command.rs | 3 | ||||
-rw-r--r-- | src/config.rs | 11 | ||||
-rw-r--r-- | src/lmtp.rs | 263 | ||||
-rw-r--r-- | src/login/ldap_provider.rs | 120 | ||||
-rw-r--r-- | src/login/mod.rs | 12 | ||||
-rw-r--r-- | src/login/static_provider.rs | 144 | ||||
-rw-r--r-- | src/mail_ident.rs | 95 | ||||
-rw-r--r-- | src/mailbox.rs | 10 | ||||
-rw-r--r-- | src/main.rs | 13 | ||||
-rw-r--r-- | src/server.rs | 107 | ||||
-rw-r--r-- | src/service.rs | 16 | ||||
-rw-r--r-- | src/session.rs | 12 | ||||
-rw-r--r-- | src/test.rs | 32 | ||||
-rw-r--r-- | src/uidindex.rs | 38 |
18 files changed, 709 insertions, 205 deletions
diff --git a/src/.server.rs.swo b/src/.server.rs.swo Binary files differnew file mode 100644 index 0000000..9e99bb3 --- /dev/null +++ b/src/.server.rs.swo diff --git a/src/.service.rs.swo b/src/.service.rs.swo Binary files differnew file mode 100644 index 0000000..a69e975 --- /dev/null +++ b/src/.service.rs.swo diff --git a/src/.session.rs.swo b/src/.session.rs.swo Binary files differnew file mode 100644 index 0000000..a6de20e --- /dev/null +++ b/src/.session.rs.swo diff --git a/src/bayou.rs b/src/bayou.rs index c9ae67f..7a76222 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,3 +1,4 @@ +use std::str::FromStr; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; @@ -123,7 +124,7 @@ impl<S: BayouState> Bayou<S> { .collect(); // 3. List all operations starting from checkpoint - let ts_ser = self.checkpoint.0.serialize(); + let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .k2v @@ -148,8 +149,9 @@ impl<S: BayouState> Bayou<S> { let mut ops = vec![]; for (tsstr, val) in ops_map { - let ts = Timestamp::parse(&tsstr) - .ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?; + let ts = tsstr + .parse::<Timestamp>() + .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?; if val.value.len() != 1 { bail!("Invalid operation, has {} values", val.value.len()); } @@ -251,7 +253,7 @@ impl<S: BayouState> Bayou<S> { self.k2v .insert_item( &self.path, - &ts.serialize(), + &ts.to_string(), seal_serialize(&op, &self.key)?, None, ) @@ -316,7 +318,7 @@ impl<S: BayouState> Bayou<S> { let ts_cp = self.history[i_cp].0; debug!( "(cp) we could checkpoint at time {} (index {} in history)", - ts_cp.serialize(), + ts_cp.to_string(), i_cp ); @@ -330,13 +332,13 @@ impl<S: BayouState> Bayou<S> { { debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", - last_cp.0.serialize() + last_cp.0.to_string() ); return Ok(()); } } - debug!("(cp) saving checkpoint at {}", ts_cp.serialize()); + debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -356,7 +358,7 @@ impl<S: BayouState> Bayou<S> { let mut por = PutObjectRequest::default(); por.bucket = self.bucket.clone(); - por.key = format!("{}/checkpoint/{}", self.path, ts_cp.serialize()); + por.key = format!("{}/checkpoint/{}", self.path, ts_cp.to_string()); por.body = Some(cryptoblob.into()); self.s3.put_object(por).await?; @@ -375,7 +377,7 @@ impl<S: BayouState> Bayou<S> { } // Delete corresponding range of operations - let ts_ser = existing_checkpoints[last_to_keep].0.serialize(); + let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); self.k2v .delete_batch(&[BatchDeleteOp { partition_key: &self.path, @@ -414,7 +416,7 @@ impl<S: BayouState> Bayou<S> { for object in checkpoints_res.contents.unwrap_or_default() { if let Some(key) = object.key { if let Some(ckid) = key.strip_prefix(&prefix) { - if let Some(ts) = Timestamp::parse(ckid) { + if let Ok(ts) = ckid.parse::<Timestamp>() { checkpoints.push((ts, key)); } } @@ -451,20 +453,26 @@ impl Timestamp { pub fn zero() -> Self { Self { msec: 0, rand: 0 } } +} - pub fn serialize(&self) -> String { +impl ToString for Timestamp { + fn to_string(&self) -> String { let mut bytes = [0u8; 16]; bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); hex::encode(&bytes) } +} - pub fn parse(v: &str) -> Option<Self> { - let bytes = hex::decode(v).ok()?; +impl FromStr for Timestamp { + type Err = &'static str; + + fn from_str(s: &str) -> Result<Timestamp, &'static str> { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; if bytes.len() != 16 { - return None; + return Err("bad length"); } - Some(Self { + Ok(Self { msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), }) diff --git a/src/command.rs b/src/command.rs index 2c61227..4a2723d 100644 --- a/src/command.rs +++ b/src/command.rs @@ -8,7 +8,6 @@ use imap_codec::types::response::{Capability, Data}; use imap_codec::types::sequence::SequenceSet; use crate::mailbox::Mailbox; -use crate::mailstore::Mailstore; use crate::session; pub struct Command<'a> { @@ -33,7 +32,7 @@ impl<'a> Command<'a> { let (u, p) = (String::try_from(username)?, String::try_from(password)?); tracing::info!(user = %u, "command.login"); - let creds = match self.session.mailstore.login_provider.login(&u, &p).await { + let creds = match self.session.login_provider.login(&u, &p).await { Err(_) => { return Ok(Response::no( "[AUTHENTICATIONFAILED] Authentication failed.", diff --git a/src/config.rs b/src/config.rs index 3ffc553..5afcabd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::io::Read; +use std::net::SocketAddr; use std::path::PathBuf; use anyhow::Result; @@ -13,6 +14,8 @@ pub struct Config { pub login_static: Option<LoginStaticConfig>, pub login_ldap: Option<LoginLdapConfig>, + + pub lmtp: Option<LmtpConfig>, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -23,6 +26,8 @@ pub struct LoginStaticConfig { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LoginStaticUser { + #[serde(default)] + pub email_addresses: Vec<String>, pub password: String, pub aws_access_key_id: String, @@ -60,6 +65,12 @@ pub struct LoginLdapConfig { pub bucket_attr: Option<String>, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LmtpConfig { + pub bind_addr: SocketAddr, + pub hostname: String, +} + pub fn read_config(config_file: PathBuf) -> Result<Config> { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/lmtp.rs b/src/lmtp.rs new file mode 100644 index 0000000..049e119 --- /dev/null +++ b/src/lmtp.rs @@ -0,0 +1,263 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::{pin::Pin, sync::Arc}; + +use anyhow::{bail, Result}; +use async_trait::async_trait; +use duplexify::Duplex; +use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite}; +use futures::{stream, stream::FuturesUnordered, StreamExt}; +use log::*; +use rusoto_s3::{PutObjectRequest, S3Client, S3}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::sync::watch; +use tokio_util::compat::*; + +use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode}; +use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Protocol}; + +use crate::config::*; +use crate::cryptoblob::*; +use crate::login::*; +use crate::mail_ident::*; + +pub struct LmtpServer { + bind_addr: SocketAddr, + hostname: String, + login_provider: Arc<dyn LoginProvider + Send + Sync>, +} + +impl LmtpServer { + pub fn new( + config: LmtpConfig, + login_provider: Arc<dyn LoginProvider + Send + Sync>, + ) -> Arc<Self> { + Arc::new(Self { + bind_addr: config.bind_addr, + hostname: config.hostname, + login_provider, + }) + } + + pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + + let conn = tokio::spawn(smtp_server::interact( + socket.compat(), + smtp_server::IsAlreadyTls::No, + Conn { remote_addr }, + self.clone(), + )); + + connections.push(conn); + } + drop(tcp); + + info!("LMTP server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} + + Ok(()) + } +} + +// ---- + +pub struct Conn { + remote_addr: SocketAddr, +} + +pub struct Message { + to: Vec<PublicCredentials>, +} + +#[async_trait] +impl Config for LmtpServer { + const PROTOCOL: Protocol = Protocol::Lmtp; + + type ConnectionUserMeta = Conn; + type MailUserMeta = Message; + + fn hostname(&self, _conn_meta: &ConnectionMetadata<Conn>) -> &str { + &self.hostname + } + + async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<Conn>) -> Message { + Message { to: vec![] } + } + + async fn tls_accept<IO>( + &self, + _io: IO, + _conn_meta: &mut ConnectionMetadata<Conn>, + ) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>> + where + IO: Send + AsyncRead + AsyncWrite, + { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "TLS not implemented for LMTP server", + )) + } + + async fn filter_from( + &self, + from: Option<Email>, + meta: &mut MailMetadata<Message>, + _conn_meta: &mut ConnectionMetadata<Conn>, + ) -> Decision<Option<Email>> { + Decision::Accept { + reply: reply::okay_from().convert(), + res: from, + } + } + + async fn filter_to( + &self, + to: Email, + meta: &mut MailMetadata<Message>, + _conn_meta: &mut ConnectionMetadata<Conn>, + ) -> Decision<Email> { + let to_str = match to.hostname.as_ref() { + Some(h) => format!("{}@{}", to.localpart, h), + None => to.localpart.to_string(), + }; + match self.login_provider.public_login(&to_str).await { + Ok(creds) => { + meta.user.to.push(creds); + Decision::Accept { + reply: reply::okay_to().convert(), + res: to, + } + } + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + + async fn handle_mail<'a, R>( + &self, + reader: &mut EscapedDataReader<'a, R>, + _mail: MailMetadata<Message>, + _conn_meta: &mut ConnectionMetadata<Conn>, + ) -> Decision<()> + where + R: Send + Unpin + AsyncRead, + { + unreachable!(); + } + + async fn handle_mail_multi<'a, 'slife0, 'slife1, 'stream, R>( + &'slife0 self, + reader: &mut EscapedDataReader<'a, R>, + meta: MailMetadata<Message>, + conn_meta: &'slife1 mut ConnectionMetadata<Conn>, + ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'stream>> + where + R: Send + Unpin + AsyncRead, + 'slife0: 'stream, + 'slife1: 'stream, + Self: 'stream, + { + let err_response_stream = |meta: MailMetadata<Message>, msg: String| { + Box::pin( + stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())], + }, + }), + ) + }; + + let mut text = Vec::new(); + if reader.read_to_end(&mut text).await.is_err() { + return err_response_stream(meta, "io error".into()); + } + reader.complete(); + + let encrypted_message = match EncryptedMessage::new(text) { + Ok(x) => Arc::new(x), + Err(e) => return err_response_stream(meta, e.to_string()), + }; + + Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| { + let encrypted_message = encrypted_message.clone(); + async move { + match encrypted_message.deliver_to(creds).await { + Ok(()) => Decision::Accept { + reply: reply::okay_mail().convert(), + res: (), + }, + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + })) + } +} + +// ---- + +struct EncryptedMessage { + key: Key, + encrypted_body: Vec<u8>, +} + +impl EncryptedMessage { + fn new(body: Vec<u8>) -> Result<Self> { + let key = gen_key(); + let encrypted_body = seal(&body, &key)?; + Ok(Self { + key, + encrypted_body, + }) + } + + async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> { + let s3_client = creds.storage.s3_client()?; + + let encrypted_key = + sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); + let key_header = base64::encode(&encrypted_key); + + let mut por = PutObjectRequest::default(); + por.bucket = creds.storage.bucket.clone(); + por.key = format!("incoming/{}", gen_ident().to_string()); + por.metadata = Some( + [("Message-Key".to_string(), key_header)] + .into_iter() + .collect::<HashMap<_, _>>(), + ); + por.body = Some(self.encrypted_body.clone().into()); + s3_client.put_object(por).await?; + + Ok(()) + } +} diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index c9d23a0..9310e55 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -84,11 +84,30 @@ impl LdapLoginProvider { bucket_source, }) } + + fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<StorageCredentials> { + let aws_access_key_id = get_attr(user, &self.aws_access_key_id_attr)?; + let aws_secret_access_key = get_attr(user, &self.aws_secret_access_key_attr)?; + let bucket = match &self.bucket_source { + BucketSource::Constant(b) => b.clone(), + BucketSource::Attr(a) => get_attr(user, a)?, + }; + + Ok(StorageCredentials { + k2v_region: self.k2v_region.clone(), + s3_region: self.s3_region.clone(), + aws_access_key_id, + aws_secret_access_key, + bucket, + }) + } } #[async_trait] impl LoginProvider for LdapLoginProvider { async fn login(&self, username: &str, password: &str) -> Result<Credentials> { + check_identifier(username)?; + let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?; ldap3::drive!(conn); @@ -97,13 +116,6 @@ impl LoginProvider for LdapLoginProvider { ldap.simple_bind(dn, pw).await?.success()?; } - let username_is_ok = username - .chars() - .all(|c| c.is_alphanumeric() || "-+_.@".contains(c)); - if !username_is_ok { - bail!("Invalid username, must contain only a-z A-Z 0-9 - + _ . @"); - } - let (matches, _res) = ldap .search( &self.search_base, @@ -137,32 +149,9 @@ impl LoginProvider for LdapLoginProvider { .context("Invalid password")?; debug!("Ldap login with user name {} successfull", username); - let get_attr = |attr: &str| -> Result<String> { - Ok(user - .attrs - .get(attr) - .ok_or(anyhow!("Missing attr: {}", attr))? - .iter() - .next() - .ok_or(anyhow!("No value for attr: {}", attr))? - .clone()) - }; - let aws_access_key_id = get_attr(&self.aws_access_key_id_attr)?; - let aws_secret_access_key = get_attr(&self.aws_secret_access_key_attr)?; - let bucket = match &self.bucket_source { - BucketSource::Constant(b) => b.clone(), - BucketSource::Attr(a) => get_attr(a)?, - }; + let storage = self.storage_creds_from_ldap_user(&user)?; - let storage = StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id, - aws_secret_access_key, - bucket, - }; - - let user_secret = get_attr(&self.user_secret_attr)?; + let user_secret = get_attr(&user, &self.user_secret_attr)?; let alternate_user_secrets = match &self.alternate_user_secrets_attr { None => vec![], Some(a) => user.attrs.get(a).cloned().unwrap_or_default(), @@ -178,4 +167,71 @@ impl LoginProvider for LdapLoginProvider { Ok(Credentials { storage, keys }) } + + async fn public_login(&self, email: &str) -> Result<PublicCredentials> { + check_identifier(email)?; + + let (dn, pw) = match self.bind_dn_and_pw.as_ref() { + Some(x) => x, + None => bail!("Missing bind_dn and bind_password in LDAP login provider config"), + }; + + let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?; + ldap3::drive!(conn); + ldap.simple_bind(dn, pw).await?.success()?; + + let (matches, _res) = ldap + .search( + &self.search_base, + Scope::Subtree, + &format!( + "(&(objectClass=inetOrgPerson)({}={}))", + self.mail_attr, email + ), + &self.attrs_to_retrieve, + ) + .await? + .success()?; + + if matches.is_empty() { + bail!("No such user account"); + } + if matches.len() > 1 { + bail!("Multiple matching user accounts"); + } + let user = SearchEntry::construct(matches.into_iter().next().unwrap()); + debug!("Found matching LDAP user for email {}: {}", email, user.dn); + + let storage = self.storage_creds_from_ldap_user(&user)?; + drop(ldap); + + let k2v_client = storage.k2v_client()?; + let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?; + + Ok(PublicCredentials { + storage, + public_key, + }) + } +} + +fn get_attr(user: &SearchEntry, attr: &str) -> Result<String> { + Ok(user + .attrs + .get(attr) + .ok_or(anyhow!("Missing attr: {}", attr))? + .iter() + .next() + .ok_or(anyhow!("No value for attr: {}", attr))? + .clone()) +} + +fn check_identifier(id: &str) -> Result<()> { + let is_ok = id + .chars() + .all(|c| c.is_alphanumeric() || "-+_.@".contains(c)); + if !is_ok { + bail!("Invalid username/email address, must contain only a-z A-Z 0-9 - + _ . @"); + } + Ok(()) } diff --git a/src/login/mod.rs b/src/login/mod.rs index 2640a58..c0e9032 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -24,6 +24,9 @@ pub trait LoginProvider { /// The login method takes an account's password as an input to decypher /// decryption keys and obtain full access to the user's account. async fn login(&self, username: &str, password: &str) -> Result<Credentials>; + /// The public_login method takes an account's email address and returns + /// public credentials for adding mails to the user's inbox. + async fn public_login(&self, email: &str) -> Result<PublicCredentials>; } /// The struct Credentials represent all of the necessary information to interact @@ -36,6 +39,13 @@ pub struct Credentials { pub keys: CryptoKeys, } +#[derive(Clone, Debug)] +pub struct PublicCredentials { + /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) + pub storage: StorageCredentials, + pub public_key: PublicKey, +} + /// The struct StorageCredentials contains access key to an S3 and K2V bucket #[derive(Clone, Debug)] pub struct StorageCredentials { @@ -396,7 +406,7 @@ impl CryptoKeys { Ok((salt_ct, public_ct)) } - async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> { + pub async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> { let mut params = k2v .read_batch(&[ k2v_read_single_key("keys", "salt", false), diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index a95ab24..6bbc717 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; @@ -10,16 +11,34 @@ use crate::login::*; pub struct StaticLoginProvider { default_bucket: Option<String>, - users: HashMap<String, LoginStaticUser>, + users: HashMap<String, Arc<LoginStaticUser>>, + users_by_email: HashMap<String, Arc<LoginStaticUser>>, + k2v_region: Region, s3_region: Region, } impl StaticLoginProvider { pub fn new(config: LoginStaticConfig, k2v_region: Region, s3_region: Region) -> Result<Self> { + let users = config + .users + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect::<HashMap<_, _>>(); + let mut users_by_email = HashMap::new(); + for (_, u) in users.iter() { + for m in u.email_addresses.iter() { + if users_by_email.contains_key(m) { + bail!("Several users have same email address: {}", m); + } + users_by_email.insert(m.clone(), u.clone()); + } + } + Ok(Self { default_bucket: config.default_bucket, - users: config.users, + users, + users_by_email, k2v_region, s3_region, }) @@ -30,54 +49,87 @@ impl StaticLoginProvider { impl LoginProvider for StaticLoginProvider { async fn login(&self, username: &str, password: &str) -> Result<Credentials> { tracing::debug!(user=%username, "login"); - match self.users.get(username) { + let user = match self.users.get(username) { None => bail!("User {} does not exist", username), - Some(u) => { - tracing::debug!(user=%username, "verify password"); - if !verify_password(password, &u.password)? { - bail!("Wrong password"); - } - tracing::debug!(user=%username, "fetch bucket"); - let bucket = u - .bucket - .clone() - .or_else(|| self.default_bucket.clone()) - .ok_or(anyhow!( - "No bucket configured and no default bucket specieid" - ))?; - - tracing::debug!(user=%username, "fetch configuration"); - let storage = StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id: u.aws_access_key_id.clone(), - aws_secret_access_key: u.aws_secret_access_key.clone(), - bucket, - }; + Some(u) => u, + }; - tracing::debug!(user=%username, "fetch keys"); - let keys = match (&u.master_key, &u.secret_key) { - (Some(m), Some(s)) => { - let master_key = Key::from_slice(&base64::decode(m)?) - .ok_or(anyhow!("Invalid master key"))?; - let secret_key = SecretKey::from_slice(&base64::decode(s)?) - .ok_or(anyhow!("Invalid secret key"))?; - CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await? - } - (None, None) => { - let user_secrets = UserSecrets { - user_secret: u.user_secret.clone(), - alternate_user_secrets: u.alternate_user_secrets.clone(), - }; - CryptoKeys::open(&storage, &user_secrets, password).await? - } - _ => bail!("Either both master and secret key or none of them must be specified for user"), - }; + tracing::debug!(user=%username, "verify password"); + if !verify_password(password, &user.password)? { + bail!("Wrong password"); + } + + tracing::debug!(user=%username, "fetch bucket"); + let bucket = user + .bucket + .clone() + .or_else(|| self.default_bucket.clone()) + .ok_or(anyhow!( + "No bucket configured and no default bucket specieid" + ))?; + + tracing::debug!(user=%username, "fetch keys"); + let storage = StorageCredentials { + k2v_region: self.k2v_region.clone(), + s3_region: self.s3_region.clone(), + aws_access_key_id: user.aws_access_key_id.clone(), + aws_secret_access_key: user.aws_secret_access_key.clone(), + bucket, + }; - tracing::debug!(user=%username, "logged"); - Ok(Credentials { storage, keys }) + let keys = match (&user.master_key, &user.secret_key) { + (Some(m), Some(s)) => { + let master_key = + Key::from_slice(&base64::decode(m)?).ok_or(anyhow!("Invalid master key"))?; + let secret_key = SecretKey::from_slice(&base64::decode(s)?) + .ok_or(anyhow!("Invalid secret key"))?; + CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await? } - } + (None, None) => { + let user_secrets = UserSecrets { + user_secret: user.user_secret.clone(), + alternate_user_secrets: user.alternate_user_secrets.clone(), + }; + CryptoKeys::open(&storage, &user_secrets, password).await? + } + _ => bail!( + "Either both master and secret key or none of them must be specified for user" + ), + }; + + tracing::debug!(user=%username, "logged"); + Ok(Credentials { storage, keys }) + } + + async fn public_login(&self, email: &str) -> Result<PublicCredentials> { + let user = match self.users_by_email.get(email) { + None => bail!("No user for email address {}", email), + Some(u) => u, + }; + + let bucket = user + .bucket + .clone() + .or_else(|| self.default_bucket.clone()) + .ok_or(anyhow!( + "No bucket configured and no default bucket specieid" + ))?; + + let storage = StorageCredentials { + k2v_region: self.k2v_region.clone(), + s3_region: self.s3_region.clone(), + aws_access_key_id: user.aws_access_key_id.clone(), + aws_secret_access_key: user.aws_secret_access_key.clone(), + bucket, + }; + + let k2v_client = storage.k2v_client()?; + let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?; + + Ok(PublicCredentials { + storage, + public_key, + }) } } diff --git a/src/mail_ident.rs b/src/mail_ident.rs new file mode 100644 index 0000000..07e053a --- /dev/null +++ b/src/mail_ident.rs @@ -0,0 +1,95 @@ +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; + +use lazy_static::lazy_static; +use rand::prelude::*; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use crate::time::now_msec; + +/// An internal Mail Identifier is composed of two components: +/// - a process identifier, 128 bits, itself composed of: +/// - the timestamp of when the process started, 64 bits +/// - a 64-bit random number +/// - a sequence number, 64 bits +/// They are not part of the protocol but an internal representation +/// required by Mailrage/Aerogramme. +/// Their main property is to be unique without having to rely +/// on synchronization between IMAP processes. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)] +pub struct MailIdent(pub [u8; 24]); + +struct IdentGenerator { + pid: u128, + sn: AtomicU64, +} + +impl IdentGenerator { + fn new() -> Self { + let time = now_msec() as u128; + let rand = thread_rng().gen::<u64>() as u128; + Self { + pid: (time << 64) | rand, + sn: AtomicU64::new(0), + } + } + + fn gen(&self) -> MailIdent { + let sn = self.sn.fetch_add(1, Ordering::Relaxed); + let mut res = [0u8; 24]; + res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); + res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); + MailIdent(res) + } +} + +lazy_static! { + static ref GENERATOR: IdentGenerator = IdentGenerator::new(); +} + +pub fn gen_ident() -> MailIdent { + GENERATOR.gen() +} + +// -- serde -- + +impl<'de> Deserialize<'de> for MailIdent { + fn deserialize<D>(d: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + MailIdent::from_str(&v).map_err(D::Error::custom) + } +} + +impl Serialize for MailIdent { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl ToString for MailIdent { + fn to_string(&self) -> String { + hex::encode(self.0) + } +} + +impl FromStr for MailIdent { + type Err = &'static str; + + fn from_str(s: &str) -> Result<MailIdent, &'static str> { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + + if bytes.len() != 24 { + return Err("bad length"); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(MailIdent(tmp)) + } +} diff --git a/src/mailbox.rs b/src/mailbox.rs index 349a13b..249d329 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -1,11 +1,11 @@ use anyhow::Result; use k2v_client::K2vClient; -use rand::prelude::*; use rusoto_s3::S3Client; use crate::bayou::Bayou; use crate::cryptoblob::Key; use crate::login::Credentials; +use crate::mail_ident::*; use crate::uidindex::*; pub struct Summary { @@ -64,19 +64,17 @@ impl Mailbox { dump(&self.uid_index); - let mut rand_id = [0u8; 24]; - rand_id[..16].copy_from_slice(&u128::to_be_bytes(thread_rng().gen())); let add_mail_op = self .uid_index .state() - .op_mail_add(MailIdent(rand_id), vec!["\\Unseen".into()]); + .op_mail_add(gen_ident(), vec!["\\Unseen".into()]); self.uid_index.push(add_mail_op).await?; dump(&self.uid_index); if self.uid_index.state().idx_by_uid.len() > 6 { for i in 0..2 { - let (_, uuid) = self + let (_, ident) = self .uid_index .state() .idx_by_uid @@ -84,7 +82,7 @@ impl Mailbox { .skip(3 + i) .next() .unwrap(); - let del_mail_op = self.uid_index.state().op_mail_del(*uuid); + let del_mail_op = self.uid_index.state().op_mail_del(*ident); self.uid_index.push(del_mail_op).await?; dump(&self.uid_index); diff --git a/src/main.rs b/src/main.rs index 1391e7a..9ec5af0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,9 @@ mod bayou; mod command; mod config; mod cryptoblob; +mod lmtp; mod login; +mod mail_ident; mod mailbox; mod mailstore; mod server; @@ -38,6 +40,11 @@ enum Command { #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")] config_file: PathBuf, }, + /// TEST TEST TEST + Test { + #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")] + config_file: PathBuf, + }, /// Initializes key pairs for a user and adds a key decryption password FirstLogin { #[clap(flatten)] @@ -129,6 +136,12 @@ async fn main() -> Result<()> { let server = Server::new(config).await?; server.run().await?; } + Command::Test { config_file } => { + let config = read_config(config_file)?; + + let server = Server::new(config).await?; + //server.test().await?; + } Command::FirstLogin { creds, user_secrets, diff --git a/src/server.rs b/src/server.rs index 365bc0f..3abdfd1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,40 +1,107 @@ -use anyhow::Result; use std::sync::Arc; -use crate::config::*; -use crate::mailstore; -use crate::service; + use boitalettres::server::accept::addr::AddrIncoming; +use boitalettres::server::accept::addr::AddrStream; use boitalettres::server::Server as ImapServer; +use anyhow::{bail, Result}; +use futures::{try_join, StreamExt}; +use log::*; +use rusoto_signature::Region; +use tokio::sync::watch; +use tower::Service; + +use crate::mailstore; +use crate::service; +use crate::lmtp::*; +use crate::config::*; +use crate::login::{ldap_provider::*, static_provider::*, *}; +use crate::mailbox::Mailbox; + pub struct Server { - pub incoming: AddrIncoming, - pub mailstore: Arc<mailstore::Mailstore>, + lmtp_server: Option<Arc<LmtpServer>>, + imap_server: ImapServer<AddrIncoming, service::Instance>, } + impl Server { pub async fn new(config: Config) -> Result<Self> { + let lmtp_config = config.lmtp.clone(); //@FIXME + let login = authenticator(config)?; + + let lmtp = lmtp_config.map(|cfg| LmtpServer::new(cfg, login.clone())); + + let incoming = AddrIncoming::new("127.0.0.1:4567").await?; + let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone())); + Ok(Self { - incoming: AddrIncoming::new("127.0.0.1:4567").await?, - mailstore: mailstore::Mailstore::new(config)?, + lmtp_server: lmtp, + imap_server: imap, }) } - pub async fn run(self: Self) -> Result<()> { - tracing::info!("Starting server on {:#}", self.incoming.local_addr); - /*let creds = self - .mailstore - .login_provider - .login("quentin", "poupou") - .await?;*/ - //let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?; - //mailbox.test().await?; + pub async fn run(self) -> Result<()> { + //tracing::info!("Starting server on {:#}", self.imap.incoming.local_addr); + tracing::info!("Starting Aerogramme..."); + + let (exit_signal, provoke_exit) = watch_ctrl_c(); + let exit_on_err = move |err: anyhow::Error| { + error!("Error: {}", err); + let _ = provoke_exit.send(true); + }; + + + try_join!(async { + match self.lmtp_server.as_ref() { + None => Ok(()), + Some(s) => s.run(exit_signal.clone()).await, + } + }, + //@FIXME handle ctrl + c + async { + self.imap_server.await?; + Ok(()) + } + )?; - let server = - ImapServer::new(self.incoming).serve(service::Instance::new(self.mailstore.clone())); - let _ = server.await?; Ok(()) } } + +fn authenticator(config: Config) -> Result<Arc<dyn LoginProvider + Send + Sync>> { + let s3_region = Region::Custom { + name: config.aws_region.clone(), + endpoint: config.s3_endpoint, + }; + let k2v_region = Region::Custom { + name: config.aws_region, + endpoint: config.k2v_endpoint, + }; + + let lp: Arc<dyn LoginProvider + Send + Sync> = match (config.login_static, config.login_ldap) { + (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), + (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), + (Some(_), Some(_)) => { + bail!("A single login provider must be set up in config file") + } + (None, None) => bail!("No login provider is set up in config file"), + }; + Ok(lp) +} + +pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) { + let (send_cancel, watch_cancel) = watch::channel(false); + let send_cancel = Arc::new(send_cancel); + let send_cancel_2 = send_cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + (watch_cancel, send_cancel_2) +} diff --git a/src/service.rs b/src/service.rs index f99ba7a..ce272a3 100644 --- a/src/service.rs +++ b/src/service.rs @@ -9,15 +9,15 @@ use futures::future::BoxFuture; use futures::future::FutureExt; use tower::Service; -use crate::mailstore::Mailstore; use crate::session; +use crate::LoginProvider; pub struct Instance { - pub mailstore: Arc<Mailstore>, + login_provider: Arc<dyn LoginProvider + Send + Sync>, } impl Instance { - pub fn new(mailstore: Arc<Mailstore>) -> Self { - Self { mailstore } + pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { + Self { login_provider } } } impl<'a> Service<&'a AddrStream> for Instance { @@ -31,8 +31,8 @@ impl<'a> Service<&'a AddrStream> for Instance { fn call(&mut self, addr: &'a AddrStream) -> Self::Future { tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); - let ms = self.mailstore.clone(); - async { Ok(Connection::new(ms)) }.boxed() + let lp = self.login_provider.clone(); + async { Ok(Connection::new(lp)) }.boxed() } } @@ -40,9 +40,9 @@ pub struct Connection { session: session::Manager, } impl Connection { - pub fn new(mailstore: Arc<Mailstore>) -> Self { + pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { Self { - session: session::Manager::new(mailstore), + session: session::Manager::new(login_provider), } } } diff --git a/src/session.rs b/src/session.rs index 8ad44dd..a3e4e24 100644 --- a/src/session.rs +++ b/src/session.rs @@ -11,7 +11,7 @@ use tokio::sync::{mpsc, oneshot}; use crate::command; use crate::login::Credentials; use crate::mailbox::Mailbox; -use crate::mailstore::Mailstore; +use crate::LoginProvider; /* This constant configures backpressure in the system, * or more specifically, how many pipelined messages are allowed @@ -30,10 +30,10 @@ pub struct Manager { //@FIXME we should garbage collect the Instance when the Manager is destroyed. impl Manager { - pub fn new(mailstore: Arc<Mailstore>) -> Self { + pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); tokio::spawn(async move { - let mut instance = Instance::new(mailstore, rx); + let mut instance = Instance::new(login_provider, rx); instance.start().await; }); Self { tx } @@ -79,14 +79,14 @@ pub struct User { pub struct Instance { rx: mpsc::Receiver<Message>, - pub mailstore: Arc<Mailstore>, + pub login_provider: Arc<dyn LoginProvider + Send + Sync>, pub selected: Option<Mailbox>, pub user: Option<User>, } impl Instance { - fn new(mailstore: Arc<Mailstore>, rx: mpsc::Receiver<Message>) -> Self { + fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>, rx: mpsc::Receiver<Message>) -> Self { Self { - mailstore, + login_provider, rx, selected: None, user: None, diff --git a/src/test.rs b/src/test.rs deleted file mode 100644 index 29db928..0000000 --- a/src/test.rs +++ /dev/null @@ -1,32 +0,0 @@ -mod config; - -use serde::Serialize; -use std::collections::HashMap; - -fn main() { - let config = config::Config { - s3_endpoint: "http://127.0.0.1:3900".to_string(), - k2v_endpoint: "http://127.0.0.1:3904".to_string(), - aws_region: "garage".to_string(), - login_static: Some(config::LoginStaticConfig { - default_bucket: Some("mailrage".to_string()), - users: HashMap::from([( - "quentin".to_string(), - config::LoginStaticUser { - password: "toto".to_string(), - aws_access_key_id: "GKxxx".to_string(), - aws_secret_access_key: "ffff".to_string(), - bucket: Some("mailrage-quentin".to_string()), - user_secret: "xxx".to_string(), - alternate_user_secrets: vec![], - master_key: None, - secret_key: None, - }, - )]), - }), - login_ldap: None, - }; - - let ser = toml::to_string(&config).unwrap(); - println!("{}", ser); -} diff --git a/src/uidindex.rs b/src/uidindex.rs index d7c8285..8e4a189 100644 --- a/src/uidindex.rs +++ b/src/uidindex.rs @@ -2,21 +2,12 @@ use im::{HashMap, HashSet, OrdMap, OrdSet}; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use crate::bayou::*; +use crate::mail_ident::MailIdent; pub type ImapUid = u32; pub type ImapUidvalidity = u32; pub type Flag = String; -/// Mail Identifier (MailIdent) are composed of two components: -/// - a process identifier, 128 bits -/// - a sequence number, 64 bits -/// They are not part of the protocol but an internal representation -/// required by Mailrage/Aerogramme. -/// Their main property is to be unique without having to rely -/// on synchronization between IMAP processes. -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)] -pub struct MailIdent(pub [u8; 24]); - #[derive(Clone)] /// A UidIndex handles the mutable part of a mailbox /// It is built by running the event log on it @@ -244,33 +235,6 @@ impl Serialize for UidIndex { } } -impl<'de> Deserialize<'de> for MailIdent { - fn deserialize<D>(d: D) -> Result<Self, D::Error> - where - D: Deserializer<'de>, - { - let v = String::deserialize(d)?; - let bytes = hex::decode(v).map_err(|_| D::Error::custom("invalid hex"))?; - - if bytes.len() != 24 { - return Err(D::Error::custom("bad length")); - } - - let mut tmp = [0u8; 24]; - tmp[..].copy_from_slice(&bytes); - Ok(Self(tmp)) - } -} - -impl Serialize for MailIdent { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - serializer.serialize_str(&hex::encode(self.0)) - } -} - // ---- TESTS ---- #[cfg(test)] |