aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2022-06-15 18:40:39 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2022-06-15 18:40:39 +0200
commit6b5b53916efb4897643172fdf461291b4effcf48 (patch)
tree9030270809e8b2fd56bc91b19d82915ea8c85641 /src
parent2bbcb119c437a3d8ba5d747f76898faa5ad32d93 (diff)
parent0700e27127e4644dbd323b9a22d994209143fa2a (diff)
downloadaerogramme-6b5b53916efb4897643172fdf461291b4effcf48.tar.gz
aerogramme-6b5b53916efb4897643172fdf461291b4effcf48.zip
Add LMTP support
Diffstat (limited to 'src')
-rw-r--r--src/.server.rs.swobin0 -> 12288 bytes
-rw-r--r--src/.service.rs.swobin0 -> 12288 bytes
-rw-r--r--src/.session.rs.swobin0 -> 20480 bytes
-rw-r--r--src/bayou.rs38
-rw-r--r--src/command.rs3
-rw-r--r--src/config.rs11
-rw-r--r--src/lmtp.rs263
-rw-r--r--src/login/ldap_provider.rs120
-rw-r--r--src/login/mod.rs12
-rw-r--r--src/login/static_provider.rs144
-rw-r--r--src/mail_ident.rs95
-rw-r--r--src/mailbox.rs10
-rw-r--r--src/main.rs13
-rw-r--r--src/server.rs107
-rw-r--r--src/service.rs16
-rw-r--r--src/session.rs12
-rw-r--r--src/test.rs32
-rw-r--r--src/uidindex.rs38
18 files changed, 709 insertions, 205 deletions
diff --git a/src/.server.rs.swo b/src/.server.rs.swo
new file mode 100644
index 0000000..9e99bb3
--- /dev/null
+++ b/src/.server.rs.swo
Binary files differ
diff --git a/src/.service.rs.swo b/src/.service.rs.swo
new file mode 100644
index 0000000..a69e975
--- /dev/null
+++ b/src/.service.rs.swo
Binary files differ
diff --git a/src/.session.rs.swo b/src/.session.rs.swo
new file mode 100644
index 0000000..a6de20e
--- /dev/null
+++ b/src/.session.rs.swo
Binary files differ
diff --git a/src/bayou.rs b/src/bayou.rs
index c9ae67f..7a76222 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -1,3 +1,4 @@
+use std::str::FromStr;
use std::time::{Duration, Instant};
use anyhow::{anyhow, bail, Result};
@@ -123,7 +124,7 @@ impl<S: BayouState> Bayou<S> {
.collect();
// 3. List all operations starting from checkpoint
- let ts_ser = self.checkpoint.0.serialize();
+ let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self
.k2v
@@ -148,8 +149,9 @@ impl<S: BayouState> Bayou<S> {
let mut ops = vec![];
for (tsstr, val) in ops_map {
- let ts = Timestamp::parse(&tsstr)
- .ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?;
+ let ts = tsstr
+ .parse::<Timestamp>()
+ .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?;
if val.value.len() != 1 {
bail!("Invalid operation, has {} values", val.value.len());
}
@@ -251,7 +253,7 @@ impl<S: BayouState> Bayou<S> {
self.k2v
.insert_item(
&self.path,
- &ts.serialize(),
+ &ts.to_string(),
seal_serialize(&op, &self.key)?,
None,
)
@@ -316,7 +318,7 @@ impl<S: BayouState> Bayou<S> {
let ts_cp = self.history[i_cp].0;
debug!(
"(cp) we could checkpoint at time {} (index {} in history)",
- ts_cp.serialize(),
+ ts_cp.to_string(),
i_cp
);
@@ -330,13 +332,13 @@ impl<S: BayouState> Bayou<S> {
{
debug!(
"(cp) last checkpoint is too recent: {}, not checkpointing",
- last_cp.0.serialize()
+ last_cp.0.to_string()
);
return Ok(());
}
}
- debug!("(cp) saving checkpoint at {}", ts_cp.serialize());
+ debug!("(cp) saving checkpoint at {}", ts_cp.to_string());
// Calculate state at time of checkpoint
let mut last_known_state = (0, &self.checkpoint.1);
@@ -356,7 +358,7 @@ impl<S: BayouState> Bayou<S> {
let mut por = PutObjectRequest::default();
por.bucket = self.bucket.clone();
- por.key = format!("{}/checkpoint/{}", self.path, ts_cp.serialize());
+ por.key = format!("{}/checkpoint/{}", self.path, ts_cp.to_string());
por.body = Some(cryptoblob.into());
self.s3.put_object(por).await?;
@@ -375,7 +377,7 @@ impl<S: BayouState> Bayou<S> {
}
// Delete corresponding range of operations
- let ts_ser = existing_checkpoints[last_to_keep].0.serialize();
+ let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
self.k2v
.delete_batch(&[BatchDeleteOp {
partition_key: &self.path,
@@ -414,7 +416,7 @@ impl<S: BayouState> Bayou<S> {
for object in checkpoints_res.contents.unwrap_or_default() {
if let Some(key) = object.key {
if let Some(ckid) = key.strip_prefix(&prefix) {
- if let Some(ts) = Timestamp::parse(ckid) {
+ if let Ok(ts) = ckid.parse::<Timestamp>() {
checkpoints.push((ts, key));
}
}
@@ -451,20 +453,26 @@ impl Timestamp {
pub fn zero() -> Self {
Self { msec: 0, rand: 0 }
}
+}
- pub fn serialize(&self) -> String {
+impl ToString for Timestamp {
+ fn to_string(&self) -> String {
let mut bytes = [0u8; 16];
bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
hex::encode(&bytes)
}
+}
- pub fn parse(v: &str) -> Option<Self> {
- let bytes = hex::decode(v).ok()?;
+impl FromStr for Timestamp {
+ type Err = &'static str;
+
+ fn from_str(s: &str) -> Result<Timestamp, &'static str> {
+ let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
if bytes.len() != 16 {
- return None;
+ return Err("bad length");
}
- Some(Self {
+ Ok(Self {
msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
})
diff --git a/src/command.rs b/src/command.rs
index 2c61227..4a2723d 100644
--- a/src/command.rs
+++ b/src/command.rs
@@ -8,7 +8,6 @@ use imap_codec::types::response::{Capability, Data};
use imap_codec::types::sequence::SequenceSet;
use crate::mailbox::Mailbox;
-use crate::mailstore::Mailstore;
use crate::session;
pub struct Command<'a> {
@@ -33,7 +32,7 @@ impl<'a> Command<'a> {
let (u, p) = (String::try_from(username)?, String::try_from(password)?);
tracing::info!(user = %u, "command.login");
- let creds = match self.session.mailstore.login_provider.login(&u, &p).await {
+ let creds = match self.session.login_provider.login(&u, &p).await {
Err(_) => {
return Ok(Response::no(
"[AUTHENTICATIONFAILED] Authentication failed.",
diff --git a/src/config.rs b/src/config.rs
index 3ffc553..5afcabd 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::io::Read;
+use std::net::SocketAddr;
use std::path::PathBuf;
use anyhow::Result;
@@ -13,6 +14,8 @@ pub struct Config {
pub login_static: Option<LoginStaticConfig>,
pub login_ldap: Option<LoginLdapConfig>,
+
+ pub lmtp: Option<LmtpConfig>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -23,6 +26,8 @@ pub struct LoginStaticConfig {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LoginStaticUser {
+ #[serde(default)]
+ pub email_addresses: Vec<String>,
pub password: String,
pub aws_access_key_id: String,
@@ -60,6 +65,12 @@ pub struct LoginLdapConfig {
pub bucket_attr: Option<String>,
}
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct LmtpConfig {
+ pub bind_addr: SocketAddr,
+ pub hostname: String,
+}
+
pub fn read_config(config_file: PathBuf) -> Result<Config> {
let mut file = std::fs::OpenOptions::new()
.read(true)
diff --git a/src/lmtp.rs b/src/lmtp.rs
new file mode 100644
index 0000000..049e119
--- /dev/null
+++ b/src/lmtp.rs
@@ -0,0 +1,263 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::{pin::Pin, sync::Arc};
+
+use anyhow::{bail, Result};
+use async_trait::async_trait;
+use duplexify::Duplex;
+use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite};
+use futures::{stream, stream::FuturesUnordered, StreamExt};
+use log::*;
+use rusoto_s3::{PutObjectRequest, S3Client, S3};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::select;
+use tokio::sync::watch;
+use tokio_util::compat::*;
+
+use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode};
+use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Protocol};
+
+use crate::config::*;
+use crate::cryptoblob::*;
+use crate::login::*;
+use crate::mail_ident::*;
+
+pub struct LmtpServer {
+ bind_addr: SocketAddr,
+ hostname: String,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+}
+
+impl LmtpServer {
+ pub fn new(
+ config: LmtpConfig,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+ ) -> Arc<Self> {
+ Arc::new(Self {
+ bind_addr: config.bind_addr,
+ hostname: config.hostname,
+ login_provider,
+ })
+ }
+
+ pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> {
+ let tcp = TcpListener::bind(self.bind_addr).await?;
+ let mut connections = FuturesUnordered::new();
+
+ while !*must_exit.borrow() {
+ let wait_conn_finished = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ let (socket, remote_addr) = select! {
+ a = tcp.accept() => a?,
+ _ = wait_conn_finished => continue,
+ _ = must_exit.changed() => continue,
+ };
+
+ let conn = tokio::spawn(smtp_server::interact(
+ socket.compat(),
+ smtp_server::IsAlreadyTls::No,
+ Conn { remote_addr },
+ self.clone(),
+ ));
+
+ connections.push(conn);
+ }
+ drop(tcp);
+
+ info!("LMTP server shutting down, draining remaining connections...");
+ while connections.next().await.is_some() {}
+
+ Ok(())
+ }
+}
+
+// ----
+
+pub struct Conn {
+ remote_addr: SocketAddr,
+}
+
+pub struct Message {
+ to: Vec<PublicCredentials>,
+}
+
+#[async_trait]
+impl Config for LmtpServer {
+ const PROTOCOL: Protocol = Protocol::Lmtp;
+
+ type ConnectionUserMeta = Conn;
+ type MailUserMeta = Message;
+
+ fn hostname(&self, _conn_meta: &ConnectionMetadata<Conn>) -> &str {
+ &self.hostname
+ }
+
+ async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<Conn>) -> Message {
+ Message { to: vec![] }
+ }
+
+ async fn tls_accept<IO>(
+ &self,
+ _io: IO,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>>
+ where
+ IO: Send + AsyncRead + AsyncWrite,
+ {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "TLS not implemented for LMTP server",
+ ))
+ }
+
+ async fn filter_from(
+ &self,
+ from: Option<Email>,
+ meta: &mut MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> Decision<Option<Email>> {
+ Decision::Accept {
+ reply: reply::okay_from().convert(),
+ res: from,
+ }
+ }
+
+ async fn filter_to(
+ &self,
+ to: Email,
+ meta: &mut MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> Decision<Email> {
+ let to_str = match to.hostname.as_ref() {
+ Some(h) => format!("{}@{}", to.localpart, h),
+ None => to.localpart.to_string(),
+ };
+ match self.login_provider.public_login(&to_str).await {
+ Ok(creds) => {
+ meta.user.to.push(creds);
+ Decision::Accept {
+ reply: reply::okay_to().convert(),
+ res: to,
+ }
+ }
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+
+ async fn handle_mail<'a, R>(
+ &self,
+ reader: &mut EscapedDataReader<'a, R>,
+ _mail: MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> Decision<()>
+ where
+ R: Send + Unpin + AsyncRead,
+ {
+ unreachable!();
+ }
+
+ async fn handle_mail_multi<'a, 'slife0, 'slife1, 'stream, R>(
+ &'slife0 self,
+ reader: &mut EscapedDataReader<'a, R>,
+ meta: MailMetadata<Message>,
+ conn_meta: &'slife1 mut ConnectionMetadata<Conn>,
+ ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'stream>>
+ where
+ R: Send + Unpin + AsyncRead,
+ 'slife0: 'stream,
+ 'slife1: 'stream,
+ Self: 'stream,
+ {
+ let err_response_stream = |meta: MailMetadata<Message>, msg: String| {
+ Box::pin(
+ stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())],
+ },
+ }),
+ )
+ };
+
+ let mut text = Vec::new();
+ if reader.read_to_end(&mut text).await.is_err() {
+ return err_response_stream(meta, "io error".into());
+ }
+ reader.complete();
+
+ let encrypted_message = match EncryptedMessage::new(text) {
+ Ok(x) => Arc::new(x),
+ Err(e) => return err_response_stream(meta, e.to_string()),
+ };
+
+ Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| {
+ let encrypted_message = encrypted_message.clone();
+ async move {
+ match encrypted_message.deliver_to(creds).await {
+ Ok(()) => Decision::Accept {
+ reply: reply::okay_mail().convert(),
+ res: (),
+ },
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+ }))
+ }
+}
+
+// ----
+
+struct EncryptedMessage {
+ key: Key,
+ encrypted_body: Vec<u8>,
+}
+
+impl EncryptedMessage {
+ fn new(body: Vec<u8>) -> Result<Self> {
+ let key = gen_key();
+ let encrypted_body = seal(&body, &key)?;
+ Ok(Self {
+ key,
+ encrypted_body,
+ })
+ }
+
+ async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
+ let s3_client = creds.storage.s3_client()?;
+
+ let encrypted_key =
+ sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
+ let key_header = base64::encode(&encrypted_key);
+
+ let mut por = PutObjectRequest::default();
+ por.bucket = creds.storage.bucket.clone();
+ por.key = format!("incoming/{}", gen_ident().to_string());
+ por.metadata = Some(
+ [("Message-Key".to_string(), key_header)]
+ .into_iter()
+ .collect::<HashMap<_, _>>(),
+ );
+ por.body = Some(self.encrypted_body.clone().into());
+ s3_client.put_object(por).await?;
+
+ Ok(())
+ }
+}
diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs
index c9d23a0..9310e55 100644
--- a/src/login/ldap_provider.rs
+++ b/src/login/ldap_provider.rs
@@ -84,11 +84,30 @@ impl LdapLoginProvider {
bucket_source,
})
}
+
+ fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<StorageCredentials> {
+ let aws_access_key_id = get_attr(user, &self.aws_access_key_id_attr)?;
+ let aws_secret_access_key = get_attr(user, &self.aws_secret_access_key_attr)?;
+ let bucket = match &self.bucket_source {
+ BucketSource::Constant(b) => b.clone(),
+ BucketSource::Attr(a) => get_attr(user, a)?,
+ };
+
+ Ok(StorageCredentials {
+ k2v_region: self.k2v_region.clone(),
+ s3_region: self.s3_region.clone(),
+ aws_access_key_id,
+ aws_secret_access_key,
+ bucket,
+ })
+ }
}
#[async_trait]
impl LoginProvider for LdapLoginProvider {
async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
+ check_identifier(username)?;
+
let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?;
ldap3::drive!(conn);
@@ -97,13 +116,6 @@ impl LoginProvider for LdapLoginProvider {
ldap.simple_bind(dn, pw).await?.success()?;
}
- let username_is_ok = username
- .chars()
- .all(|c| c.is_alphanumeric() || "-+_.@".contains(c));
- if !username_is_ok {
- bail!("Invalid username, must contain only a-z A-Z 0-9 - + _ . @");
- }
-
let (matches, _res) = ldap
.search(
&self.search_base,
@@ -137,32 +149,9 @@ impl LoginProvider for LdapLoginProvider {
.context("Invalid password")?;
debug!("Ldap login with user name {} successfull", username);
- let get_attr = |attr: &str| -> Result<String> {
- Ok(user
- .attrs
- .get(attr)
- .ok_or(anyhow!("Missing attr: {}", attr))?
- .iter()
- .next()
- .ok_or(anyhow!("No value for attr: {}", attr))?
- .clone())
- };
- let aws_access_key_id = get_attr(&self.aws_access_key_id_attr)?;
- let aws_secret_access_key = get_attr(&self.aws_secret_access_key_attr)?;
- let bucket = match &self.bucket_source {
- BucketSource::Constant(b) => b.clone(),
- BucketSource::Attr(a) => get_attr(a)?,
- };
+ let storage = self.storage_creds_from_ldap_user(&user)?;
- let storage = StorageCredentials {
- k2v_region: self.k2v_region.clone(),
- s3_region: self.s3_region.clone(),
- aws_access_key_id,
- aws_secret_access_key,
- bucket,
- };
-
- let user_secret = get_attr(&self.user_secret_attr)?;
+ let user_secret = get_attr(&user, &self.user_secret_attr)?;
let alternate_user_secrets = match &self.alternate_user_secrets_attr {
None => vec![],
Some(a) => user.attrs.get(a).cloned().unwrap_or_default(),
@@ -178,4 +167,71 @@ impl LoginProvider for LdapLoginProvider {
Ok(Credentials { storage, keys })
}
+
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials> {
+ check_identifier(email)?;
+
+ let (dn, pw) = match self.bind_dn_and_pw.as_ref() {
+ Some(x) => x,
+ None => bail!("Missing bind_dn and bind_password in LDAP login provider config"),
+ };
+
+ let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?;
+ ldap3::drive!(conn);
+ ldap.simple_bind(dn, pw).await?.success()?;
+
+ let (matches, _res) = ldap
+ .search(
+ &self.search_base,
+ Scope::Subtree,
+ &format!(
+ "(&(objectClass=inetOrgPerson)({}={}))",
+ self.mail_attr, email
+ ),
+ &self.attrs_to_retrieve,
+ )
+ .await?
+ .success()?;
+
+ if matches.is_empty() {
+ bail!("No such user account");
+ }
+ if matches.len() > 1 {
+ bail!("Multiple matching user accounts");
+ }
+ let user = SearchEntry::construct(matches.into_iter().next().unwrap());
+ debug!("Found matching LDAP user for email {}: {}", email, user.dn);
+
+ let storage = self.storage_creds_from_ldap_user(&user)?;
+ drop(ldap);
+
+ let k2v_client = storage.k2v_client()?;
+ let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?;
+
+ Ok(PublicCredentials {
+ storage,
+ public_key,
+ })
+ }
+}
+
+fn get_attr(user: &SearchEntry, attr: &str) -> Result<String> {
+ Ok(user
+ .attrs
+ .get(attr)
+ .ok_or(anyhow!("Missing attr: {}", attr))?
+ .iter()
+ .next()
+ .ok_or(anyhow!("No value for attr: {}", attr))?
+ .clone())
+}
+
+fn check_identifier(id: &str) -> Result<()> {
+ let is_ok = id
+ .chars()
+ .all(|c| c.is_alphanumeric() || "-+_.@".contains(c));
+ if !is_ok {
+ bail!("Invalid username/email address, must contain only a-z A-Z 0-9 - + _ . @");
+ }
+ Ok(())
}
diff --git a/src/login/mod.rs b/src/login/mod.rs
index 2640a58..c0e9032 100644
--- a/src/login/mod.rs
+++ b/src/login/mod.rs
@@ -24,6 +24,9 @@ pub trait LoginProvider {
/// The login method takes an account's password as an input to decypher
/// decryption keys and obtain full access to the user's account.
async fn login(&self, username: &str, password: &str) -> Result<Credentials>;
+ /// The public_login method takes an account's email address and returns
+ /// public credentials for adding mails to the user's inbox.
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials>;
}
/// The struct Credentials represent all of the necessary information to interact
@@ -36,6 +39,13 @@ pub struct Credentials {
pub keys: CryptoKeys,
}
+#[derive(Clone, Debug)]
+pub struct PublicCredentials {
+ /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
+ pub storage: StorageCredentials,
+ pub public_key: PublicKey,
+}
+
/// The struct StorageCredentials contains access key to an S3 and K2V bucket
#[derive(Clone, Debug)]
pub struct StorageCredentials {
@@ -396,7 +406,7 @@ impl CryptoKeys {
Ok((salt_ct, public_ct))
}
- async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> {
+ pub async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> {
let mut params = k2v
.read_batch(&[
k2v_read_single_key("keys", "salt", false),
diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs
index a95ab24..6bbc717 100644
--- a/src/login/static_provider.rs
+++ b/src/login/static_provider.rs
@@ -1,4 +1,5 @@
use std::collections::HashMap;
+use std::sync::Arc;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
@@ -10,16 +11,34 @@ use crate::login::*;
pub struct StaticLoginProvider {
default_bucket: Option<String>,
- users: HashMap<String, LoginStaticUser>,
+ users: HashMap<String, Arc<LoginStaticUser>>,
+ users_by_email: HashMap<String, Arc<LoginStaticUser>>,
+
k2v_region: Region,
s3_region: Region,
}
impl StaticLoginProvider {
pub fn new(config: LoginStaticConfig, k2v_region: Region, s3_region: Region) -> Result<Self> {
+ let users = config
+ .users
+ .into_iter()
+ .map(|(k, v)| (k, Arc::new(v)))
+ .collect::<HashMap<_, _>>();
+ let mut users_by_email = HashMap::new();
+ for (_, u) in users.iter() {
+ for m in u.email_addresses.iter() {
+ if users_by_email.contains_key(m) {
+ bail!("Several users have same email address: {}", m);
+ }
+ users_by_email.insert(m.clone(), u.clone());
+ }
+ }
+
Ok(Self {
default_bucket: config.default_bucket,
- users: config.users,
+ users,
+ users_by_email,
k2v_region,
s3_region,
})
@@ -30,54 +49,87 @@ impl StaticLoginProvider {
impl LoginProvider for StaticLoginProvider {
async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
tracing::debug!(user=%username, "login");
- match self.users.get(username) {
+ let user = match self.users.get(username) {
None => bail!("User {} does not exist", username),
- Some(u) => {
- tracing::debug!(user=%username, "verify password");
- if !verify_password(password, &u.password)? {
- bail!("Wrong password");
- }
- tracing::debug!(user=%username, "fetch bucket");
- let bucket = u
- .bucket
- .clone()
- .or_else(|| self.default_bucket.clone())
- .ok_or(anyhow!(
- "No bucket configured and no default bucket specieid"
- ))?;
-
- tracing::debug!(user=%username, "fetch configuration");
- let storage = StorageCredentials {
- k2v_region: self.k2v_region.clone(),
- s3_region: self.s3_region.clone(),
- aws_access_key_id: u.aws_access_key_id.clone(),
- aws_secret_access_key: u.aws_secret_access_key.clone(),
- bucket,
- };
+ Some(u) => u,
+ };
- tracing::debug!(user=%username, "fetch keys");
- let keys = match (&u.master_key, &u.secret_key) {
- (Some(m), Some(s)) => {
- let master_key = Key::from_slice(&base64::decode(m)?)
- .ok_or(anyhow!("Invalid master key"))?;
- let secret_key = SecretKey::from_slice(&base64::decode(s)?)
- .ok_or(anyhow!("Invalid secret key"))?;
- CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await?
- }
- (None, None) => {
- let user_secrets = UserSecrets {
- user_secret: u.user_secret.clone(),
- alternate_user_secrets: u.alternate_user_secrets.clone(),
- };
- CryptoKeys::open(&storage, &user_secrets, password).await?
- }
- _ => bail!("Either both master and secret key or none of them must be specified for user"),
- };
+ tracing::debug!(user=%username, "verify password");
+ if !verify_password(password, &user.password)? {
+ bail!("Wrong password");
+ }
+
+ tracing::debug!(user=%username, "fetch bucket");
+ let bucket = user
+ .bucket
+ .clone()
+ .or_else(|| self.default_bucket.clone())
+ .ok_or(anyhow!(
+ "No bucket configured and no default bucket specieid"
+ ))?;
+
+ tracing::debug!(user=%username, "fetch keys");
+ let storage = StorageCredentials {
+ k2v_region: self.k2v_region.clone(),
+ s3_region: self.s3_region.clone(),
+ aws_access_key_id: user.aws_access_key_id.clone(),
+ aws_secret_access_key: user.aws_secret_access_key.clone(),
+ bucket,
+ };
- tracing::debug!(user=%username, "logged");
- Ok(Credentials { storage, keys })
+ let keys = match (&user.master_key, &user.secret_key) {
+ (Some(m), Some(s)) => {
+ let master_key =
+ Key::from_slice(&base64::decode(m)?).ok_or(anyhow!("Invalid master key"))?;
+ let secret_key = SecretKey::from_slice(&base64::decode(s)?)
+ .ok_or(anyhow!("Invalid secret key"))?;
+ CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await?
}
- }
+ (None, None) => {
+ let user_secrets = UserSecrets {
+ user_secret: user.user_secret.clone(),
+ alternate_user_secrets: user.alternate_user_secrets.clone(),
+ };
+ CryptoKeys::open(&storage, &user_secrets, password).await?
+ }
+ _ => bail!(
+ "Either both master and secret key or none of them must be specified for user"
+ ),
+ };
+
+ tracing::debug!(user=%username, "logged");
+ Ok(Credentials { storage, keys })
+ }
+
+ async fn public_login(&self, email: &str) -> Result<PublicCredentials> {
+ let user = match self.users_by_email.get(email) {
+ None => bail!("No user for email address {}", email),
+ Some(u) => u,
+ };
+
+ let bucket = user
+ .bucket
+ .clone()
+ .or_else(|| self.default_bucket.clone())
+ .ok_or(anyhow!(
+ "No bucket configured and no default bucket specieid"
+ ))?;
+
+ let storage = StorageCredentials {
+ k2v_region: self.k2v_region.clone(),
+ s3_region: self.s3_region.clone(),
+ aws_access_key_id: user.aws_access_key_id.clone(),
+ aws_secret_access_key: user.aws_secret_access_key.clone(),
+ bucket,
+ };
+
+ let k2v_client = storage.k2v_client()?;
+ let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?;
+
+ Ok(PublicCredentials {
+ storage,
+ public_key,
+ })
}
}
diff --git a/src/mail_ident.rs b/src/mail_ident.rs
new file mode 100644
index 0000000..07e053a
--- /dev/null
+++ b/src/mail_ident.rs
@@ -0,0 +1,95 @@
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+
+use lazy_static::lazy_static;
+use rand::prelude::*;
+use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::time::now_msec;
+
+/// An internal Mail Identifier is composed of two components:
+/// - a process identifier, 128 bits, itself composed of:
+/// - the timestamp of when the process started, 64 bits
+/// - a 64-bit random number
+/// - a sequence number, 64 bits
+/// They are not part of the protocol but an internal representation
+/// required by Mailrage/Aerogramme.
+/// Their main property is to be unique without having to rely
+/// on synchronization between IMAP processes.
+#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)]
+pub struct MailIdent(pub [u8; 24]);
+
+struct IdentGenerator {
+ pid: u128,
+ sn: AtomicU64,
+}
+
+impl IdentGenerator {
+ fn new() -> Self {
+ let time = now_msec() as u128;
+ let rand = thread_rng().gen::<u64>() as u128;
+ Self {
+ pid: (time << 64) | rand,
+ sn: AtomicU64::new(0),
+ }
+ }
+
+ fn gen(&self) -> MailIdent {
+ let sn = self.sn.fetch_add(1, Ordering::Relaxed);
+ let mut res = [0u8; 24];
+ res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid));
+ res[16..24].copy_from_slice(&u64::to_be_bytes(sn));
+ MailIdent(res)
+ }
+}
+
+lazy_static! {
+ static ref GENERATOR: IdentGenerator = IdentGenerator::new();
+}
+
+pub fn gen_ident() -> MailIdent {
+ GENERATOR.gen()
+}
+
+// -- serde --
+
+impl<'de> Deserialize<'de> for MailIdent {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let v = String::deserialize(d)?;
+ MailIdent::from_str(&v).map_err(D::Error::custom)
+ }
+}
+
+impl Serialize for MailIdent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&self.to_string())
+ }
+}
+
+impl ToString for MailIdent {
+ fn to_string(&self) -> String {
+ hex::encode(self.0)
+ }
+}
+
+impl FromStr for MailIdent {
+ type Err = &'static str;
+
+ fn from_str(s: &str) -> Result<MailIdent, &'static str> {
+ let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
+
+ if bytes.len() != 24 {
+ return Err("bad length");
+ }
+
+ let mut tmp = [0u8; 24];
+ tmp[..].copy_from_slice(&bytes);
+ Ok(MailIdent(tmp))
+ }
+}
diff --git a/src/mailbox.rs b/src/mailbox.rs
index 349a13b..249d329 100644
--- a/src/mailbox.rs
+++ b/src/mailbox.rs
@@ -1,11 +1,11 @@
use anyhow::Result;
use k2v_client::K2vClient;
-use rand::prelude::*;
use rusoto_s3::S3Client;
use crate::bayou::Bayou;
use crate::cryptoblob::Key;
use crate::login::Credentials;
+use crate::mail_ident::*;
use crate::uidindex::*;
pub struct Summary {
@@ -64,19 +64,17 @@ impl Mailbox {
dump(&self.uid_index);
- let mut rand_id = [0u8; 24];
- rand_id[..16].copy_from_slice(&u128::to_be_bytes(thread_rng().gen()));
let add_mail_op = self
.uid_index
.state()
- .op_mail_add(MailIdent(rand_id), vec!["\\Unseen".into()]);
+ .op_mail_add(gen_ident(), vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
dump(&self.uid_index);
if self.uid_index.state().idx_by_uid.len() > 6 {
for i in 0..2 {
- let (_, uuid) = self
+ let (_, ident) = self
.uid_index
.state()
.idx_by_uid
@@ -84,7 +82,7 @@ impl Mailbox {
.skip(3 + i)
.next()
.unwrap();
- let del_mail_op = self.uid_index.state().op_mail_del(*uuid);
+ let del_mail_op = self.uid_index.state().op_mail_del(*ident);
self.uid_index.push(del_mail_op).await?;
dump(&self.uid_index);
diff --git a/src/main.rs b/src/main.rs
index 1391e7a..9ec5af0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,7 +2,9 @@ mod bayou;
mod command;
mod config;
mod cryptoblob;
+mod lmtp;
mod login;
+mod mail_ident;
mod mailbox;
mod mailstore;
mod server;
@@ -38,6 +40,11 @@ enum Command {
#[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
config_file: PathBuf,
},
+ /// TEST TEST TEST
+ Test {
+ #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
+ config_file: PathBuf,
+ },
/// Initializes key pairs for a user and adds a key decryption password
FirstLogin {
#[clap(flatten)]
@@ -129,6 +136,12 @@ async fn main() -> Result<()> {
let server = Server::new(config).await?;
server.run().await?;
}
+ Command::Test { config_file } => {
+ let config = read_config(config_file)?;
+
+ let server = Server::new(config).await?;
+ //server.test().await?;
+ }
Command::FirstLogin {
creds,
user_secrets,
diff --git a/src/server.rs b/src/server.rs
index 365bc0f..3abdfd1 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,40 +1,107 @@
-use anyhow::Result;
use std::sync::Arc;
-use crate::config::*;
-use crate::mailstore;
-use crate::service;
+
use boitalettres::server::accept::addr::AddrIncoming;
+use boitalettres::server::accept::addr::AddrStream;
use boitalettres::server::Server as ImapServer;
+use anyhow::{bail, Result};
+use futures::{try_join, StreamExt};
+use log::*;
+use rusoto_signature::Region;
+use tokio::sync::watch;
+use tower::Service;
+
+use crate::mailstore;
+use crate::service;
+use crate::lmtp::*;
+use crate::config::*;
+use crate::login::{ldap_provider::*, static_provider::*, *};
+use crate::mailbox::Mailbox;
+
pub struct Server {
- pub incoming: AddrIncoming,
- pub mailstore: Arc<mailstore::Mailstore>,
+ lmtp_server: Option<Arc<LmtpServer>>,
+ imap_server: ImapServer<AddrIncoming, service::Instance>,
}
+
impl Server {
pub async fn new(config: Config) -> Result<Self> {
+ let lmtp_config = config.lmtp.clone(); //@FIXME
+ let login = authenticator(config)?;
+
+ let lmtp = lmtp_config.map(|cfg| LmtpServer::new(cfg, login.clone()));
+
+ let incoming = AddrIncoming::new("127.0.0.1:4567").await?;
+ let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone()));
+
Ok(Self {
- incoming: AddrIncoming::new("127.0.0.1:4567").await?,
- mailstore: mailstore::Mailstore::new(config)?,
+ lmtp_server: lmtp,
+ imap_server: imap,
})
}
- pub async fn run(self: Self) -> Result<()> {
- tracing::info!("Starting server on {:#}", self.incoming.local_addr);
- /*let creds = self
- .mailstore
- .login_provider
- .login("quentin", "poupou")
- .await?;*/
- //let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?;
- //mailbox.test().await?;
+ pub async fn run(self) -> Result<()> {
+ //tracing::info!("Starting server on {:#}", self.imap.incoming.local_addr);
+ tracing::info!("Starting Aerogramme...");
+
+ let (exit_signal, provoke_exit) = watch_ctrl_c();
+ let exit_on_err = move |err: anyhow::Error| {
+ error!("Error: {}", err);
+ let _ = provoke_exit.send(true);
+ };
+
+
+ try_join!(async {
+ match self.lmtp_server.as_ref() {
+ None => Ok(()),
+ Some(s) => s.run(exit_signal.clone()).await,
+ }
+ },
+ //@FIXME handle ctrl + c
+ async {
+ self.imap_server.await?;
+ Ok(())
+ }
+ )?;
- let server =
- ImapServer::new(self.incoming).serve(service::Instance::new(self.mailstore.clone()));
- let _ = server.await?;
Ok(())
}
}
+
+fn authenticator(config: Config) -> Result<Arc<dyn LoginProvider + Send + Sync>> {
+ let s3_region = Region::Custom {
+ name: config.aws_region.clone(),
+ endpoint: config.s3_endpoint,
+ };
+ let k2v_region = Region::Custom {
+ name: config.aws_region,
+ endpoint: config.k2v_endpoint,
+ };
+
+ let lp: Arc<dyn LoginProvider + Send + Sync> = match (config.login_static, config.login_ldap) {
+ (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
+ (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
+ (Some(_), Some(_)) => {
+ bail!("A single login provider must be set up in config file")
+ }
+ (None, None) => bail!("No login provider is set up in config file"),
+ };
+ Ok(lp)
+}
+
+pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ let send_cancel = Arc::new(send_cancel);
+ let send_cancel_2 = send_cancel.clone();
+ tokio::spawn(async move {
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+ info!("Received CTRL+C, shutting down.");
+ send_cancel.send(true).unwrap();
+ });
+ (watch_cancel, send_cancel_2)
+}
diff --git a/src/service.rs b/src/service.rs
index f99ba7a..ce272a3 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -9,15 +9,15 @@ use futures::future::BoxFuture;
use futures::future::FutureExt;
use tower::Service;
-use crate::mailstore::Mailstore;
use crate::session;
+use crate::LoginProvider;
pub struct Instance {
- pub mailstore: Arc<Mailstore>,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
}
impl Instance {
- pub fn new(mailstore: Arc<Mailstore>) -> Self {
- Self { mailstore }
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
+ Self { login_provider }
}
}
impl<'a> Service<&'a AddrStream> for Instance {
@@ -31,8 +31,8 @@ impl<'a> Service<&'a AddrStream> for Instance {
fn call(&mut self, addr: &'a AddrStream) -> Self::Future {
tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept");
- let ms = self.mailstore.clone();
- async { Ok(Connection::new(ms)) }.boxed()
+ let lp = self.login_provider.clone();
+ async { Ok(Connection::new(lp)) }.boxed()
}
}
@@ -40,9 +40,9 @@ pub struct Connection {
session: session::Manager,
}
impl Connection {
- pub fn new(mailstore: Arc<Mailstore>) -> Self {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
Self {
- session: session::Manager::new(mailstore),
+ session: session::Manager::new(login_provider),
}
}
}
diff --git a/src/session.rs b/src/session.rs
index 8ad44dd..a3e4e24 100644
--- a/src/session.rs
+++ b/src/session.rs
@@ -11,7 +11,7 @@ use tokio::sync::{mpsc, oneshot};
use crate::command;
use crate::login::Credentials;
use crate::mailbox::Mailbox;
-use crate::mailstore::Mailstore;
+use crate::LoginProvider;
/* This constant configures backpressure in the system,
* or more specifically, how many pipelined messages are allowed
@@ -30,10 +30,10 @@ pub struct Manager {
//@FIXME we should garbage collect the Instance when the Manager is destroyed.
impl Manager {
- pub fn new(mailstore: Arc<Mailstore>) -> Self {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
tokio::spawn(async move {
- let mut instance = Instance::new(mailstore, rx);
+ let mut instance = Instance::new(login_provider, rx);
instance.start().await;
});
Self { tx }
@@ -79,14 +79,14 @@ pub struct User {
pub struct Instance {
rx: mpsc::Receiver<Message>,
- pub mailstore: Arc<Mailstore>,
+ pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
pub selected: Option<Mailbox>,
pub user: Option<User>,
}
impl Instance {
- fn new(mailstore: Arc<Mailstore>, rx: mpsc::Receiver<Message>) -> Self {
+ fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>, rx: mpsc::Receiver<Message>) -> Self {
Self {
- mailstore,
+ login_provider,
rx,
selected: None,
user: None,
diff --git a/src/test.rs b/src/test.rs
deleted file mode 100644
index 29db928..0000000
--- a/src/test.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-mod config;
-
-use serde::Serialize;
-use std::collections::HashMap;
-
-fn main() {
- let config = config::Config {
- s3_endpoint: "http://127.0.0.1:3900".to_string(),
- k2v_endpoint: "http://127.0.0.1:3904".to_string(),
- aws_region: "garage".to_string(),
- login_static: Some(config::LoginStaticConfig {
- default_bucket: Some("mailrage".to_string()),
- users: HashMap::from([(
- "quentin".to_string(),
- config::LoginStaticUser {
- password: "toto".to_string(),
- aws_access_key_id: "GKxxx".to_string(),
- aws_secret_access_key: "ffff".to_string(),
- bucket: Some("mailrage-quentin".to_string()),
- user_secret: "xxx".to_string(),
- alternate_user_secrets: vec![],
- master_key: None,
- secret_key: None,
- },
- )]),
- }),
- login_ldap: None,
- };
-
- let ser = toml::to_string(&config).unwrap();
- println!("{}", ser);
-}
diff --git a/src/uidindex.rs b/src/uidindex.rs
index d7c8285..8e4a189 100644
--- a/src/uidindex.rs
+++ b/src/uidindex.rs
@@ -2,21 +2,12 @@ use im::{HashMap, HashSet, OrdMap, OrdSet};
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use crate::bayou::*;
+use crate::mail_ident::MailIdent;
pub type ImapUid = u32;
pub type ImapUidvalidity = u32;
pub type Flag = String;
-/// Mail Identifier (MailIdent) are composed of two components:
-/// - a process identifier, 128 bits
-/// - a sequence number, 64 bits
-/// They are not part of the protocol but an internal representation
-/// required by Mailrage/Aerogramme.
-/// Their main property is to be unique without having to rely
-/// on synchronization between IMAP processes.
-#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)]
-pub struct MailIdent(pub [u8; 24]);
-
#[derive(Clone)]
/// A UidIndex handles the mutable part of a mailbox
/// It is built by running the event log on it
@@ -244,33 +235,6 @@ impl Serialize for UidIndex {
}
}
-impl<'de> Deserialize<'de> for MailIdent {
- fn deserialize<D>(d: D) -> Result<Self, D::Error>
- where
- D: Deserializer<'de>,
- {
- let v = String::deserialize(d)?;
- let bytes = hex::decode(v).map_err(|_| D::Error::custom("invalid hex"))?;
-
- if bytes.len() != 24 {
- return Err(D::Error::custom("bad length"));
- }
-
- let mut tmp = [0u8; 24];
- tmp[..].copy_from_slice(&bytes);
- Ok(Self(tmp))
- }
-}
-
-impl Serialize for MailIdent {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- serializer.serialize_str(&hex::encode(self.0))
- }
-}
-
// ---- TESTS ----
#[cfg(test)]