aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/.server.rs.swobin0 -> 12288 bytes
-rw-r--r--src/.service.rs.swobin0 -> 12288 bytes
-rw-r--r--src/.session.rs.swobin0 -> 20480 bytes
-rw-r--r--src/command.rs112
-rw-r--r--src/config.rs12
-rw-r--r--src/lmtp.rs4
-rw-r--r--src/login/static_provider.rs6
-rw-r--r--src/mail_ident.rs (renamed from src/mail_uuid.rs)38
-rw-r--r--src/mailbox.rs52
-rw-r--r--src/mailstore.rs33
-rw-r--r--src/main.rs17
-rw-r--r--src/server.rs93
-rw-r--r--src/service.rs62
-rw-r--r--src/session.rs145
-rw-r--r--src/uidindex.rs281
15 files changed, 711 insertions, 144 deletions
diff --git a/src/.server.rs.swo b/src/.server.rs.swo
new file mode 100644
index 0000000..9e99bb3
--- /dev/null
+++ b/src/.server.rs.swo
Binary files differ
diff --git a/src/.service.rs.swo b/src/.service.rs.swo
new file mode 100644
index 0000000..a69e975
--- /dev/null
+++ b/src/.service.rs.swo
Binary files differ
diff --git a/src/.session.rs.swo b/src/.session.rs.swo
new file mode 100644
index 0000000..a6de20e
--- /dev/null
+++ b/src/.session.rs.swo
Binary files differ
diff --git a/src/command.rs b/src/command.rs
new file mode 100644
index 0000000..4a2723d
--- /dev/null
+++ b/src/command.rs
@@ -0,0 +1,112 @@
+use anyhow::Result;
+use boitalettres::errors::Error as BalError;
+use boitalettres::proto::{Request, Response};
+use imap_codec::types::core::{AString, Tag};
+use imap_codec::types::fetch_attributes::MacroOrFetchAttributes;
+use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
+use imap_codec::types::response::{Capability, Data};
+use imap_codec::types::sequence::SequenceSet;
+
+use crate::mailbox::Mailbox;
+use crate::session;
+
+pub struct Command<'a> {
+ tag: Tag,
+ session: &'a mut session::Instance,
+}
+
+impl<'a> Command<'a> {
+ pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self {
+ Self { tag, session }
+ }
+
+ pub async fn capability(&self) -> Result<Response> {
+ let capabilities = vec![Capability::Imap4Rev1, Capability::Idle];
+ let body = vec![Data::Capability(capabilities)];
+ let r = Response::ok("Pre-login capabilities listed, post-login capabilities have more.")?
+ .with_body(body);
+ Ok(r)
+ }
+
+ pub async fn login(&mut self, username: AString, password: AString) -> Result<Response> {
+ let (u, p) = (String::try_from(username)?, String::try_from(password)?);
+ tracing::info!(user = %u, "command.login");
+
+ let creds = match self.session.login_provider.login(&u, &p).await {
+ Err(_) => {
+ return Ok(Response::no(
+ "[AUTHENTICATIONFAILED] Authentication failed.",
+ )?)
+ }
+ Ok(c) => c,
+ };
+
+ self.session.user = Some(session::User {
+ creds,
+ name: u.clone(),
+ });
+
+ tracing::info!(username=%u, "connected");
+ Ok(Response::ok("Logged in")?)
+ }
+
+ pub async fn lsub(
+ &self,
+ reference: MailboxCodec,
+ mailbox_wildcard: ListMailbox,
+ ) -> Result<Response> {
+ Ok(Response::bad("Not implemented")?)
+ }
+
+ pub async fn list(
+ &self,
+ reference: MailboxCodec,
+ mailbox_wildcard: ListMailbox,
+ ) -> Result<Response> {
+ Ok(Response::bad("Not implemented")?)
+ }
+
+ /*
+ * TRACE BEGIN ---
+
+
+ Example: C: A142 SELECT INBOX
+ S: * 172 EXISTS
+ S: * 1 RECENT
+ S: * OK [UNSEEN 12] Message 12 is first unseen
+ S: * OK [UIDVALIDITY 3857529045] UIDs valid
+ S: * OK [UIDNEXT 4392] Predicted next UID
+ S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft)
+ S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited
+ S: A142 OK [READ-WRITE] SELECT completed
+
+ * TRACE END ---
+ */
+ pub async fn select(&mut self, mailbox: MailboxCodec) -> Result<Response> {
+ let name = String::try_from(mailbox)?;
+ let user = match self.session.user.as_ref() {
+ Some(u) => u,
+ _ => return Ok(Response::no("You must be connected to use SELECT")?),
+ };
+
+ let mut mb = Mailbox::new(&user.creds, name.clone())?;
+ tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected");
+
+ let sum = mb.summary().await?;
+ tracing::trace!(summary=%sum, "mailbox.summary");
+
+ let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)];
+
+ self.session.selected = Some(mb);
+ Ok(Response::ok("[READ-WRITE] Select completed")?.with_body(body))
+ }
+
+ pub async fn fetch(
+ &self,
+ sequence_set: SequenceSet,
+ attributes: MacroOrFetchAttributes,
+ uid: bool,
+ ) -> Result<Response> {
+ Ok(Response::bad("Not implemented")?)
+ }
+}
diff --git a/src/config.rs b/src/config.rs
index 9ec0ea1..5afcabd 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -4,9 +4,9 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use anyhow::Result;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Config {
pub s3_endpoint: String,
pub k2v_endpoint: String,
@@ -18,13 +18,13 @@ pub struct Config {
pub lmtp: Option<LmtpConfig>,
}
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LoginStaticConfig {
pub default_bucket: Option<String>,
pub users: HashMap<String, LoginStaticUser>,
}
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LoginStaticUser {
#[serde(default)]
pub email_addresses: Vec<String>,
@@ -42,7 +42,7 @@ pub struct LoginStaticUser {
pub secret_key: Option<String>,
}
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LoginLdapConfig {
pub ldap_server: String,
@@ -65,7 +65,7 @@ pub struct LoginLdapConfig {
pub bucket_attr: Option<String>,
}
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LmtpConfig {
pub bind_addr: SocketAddr,
pub hostname: String,
diff --git a/src/lmtp.rs b/src/lmtp.rs
index 4186d69..049e119 100644
--- a/src/lmtp.rs
+++ b/src/lmtp.rs
@@ -20,7 +20,7 @@ use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Pro
use crate::config::*;
use crate::cryptoblob::*;
use crate::login::*;
-use crate::mail_uuid::*;
+use crate::mail_ident::*;
pub struct LmtpServer {
bind_addr: SocketAddr,
@@ -249,7 +249,7 @@ impl EncryptedMessage {
let mut por = PutObjectRequest::default();
por.bucket = creds.storage.bucket.clone();
- por.key = format!("incoming/{}", gen_uuid().to_string());
+ por.key = format!("incoming/{}", gen_ident().to_string());
por.metadata = Some(
[("Message-Key".to_string(), key_header)]
.into_iter()
diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs
index aa5e499..6bbc717 100644
--- a/src/login/static_provider.rs
+++ b/src/login/static_provider.rs
@@ -48,14 +48,18 @@ impl StaticLoginProvider {
#[async_trait]
impl LoginProvider for StaticLoginProvider {
async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
+ tracing::debug!(user=%username, "login");
let user = match self.users.get(username) {
None => bail!("User {} does not exist", username),
Some(u) => u,
};
+ tracing::debug!(user=%username, "verify password");
if !verify_password(password, &user.password)? {
bail!("Wrong password");
}
+
+ tracing::debug!(user=%username, "fetch bucket");
let bucket = user
.bucket
.clone()
@@ -64,6 +68,7 @@ impl LoginProvider for StaticLoginProvider {
"No bucket configured and no default bucket specieid"
))?;
+ tracing::debug!(user=%username, "fetch keys");
let storage = StorageCredentials {
k2v_region: self.k2v_region.clone(),
s3_region: self.s3_region.clone(),
@@ -92,6 +97,7 @@ impl LoginProvider for StaticLoginProvider {
),
};
+ tracing::debug!(user=%username, "logged");
Ok(Credentials { storage, keys })
}
diff --git a/src/mail_uuid.rs b/src/mail_ident.rs
index ab76bce..07e053a 100644
--- a/src/mail_uuid.rs
+++ b/src/mail_ident.rs
@@ -7,20 +7,24 @@ use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use crate::time::now_msec;
-/// A Mail UUID is composed of two components:
+/// An internal Mail Identifier is composed of two components:
/// - a process identifier, 128 bits, itself composed of:
/// - the timestamp of when the process started, 64 bits
/// - a 64-bit random number
/// - a sequence number, 64 bits
-#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)]
-pub struct MailUuid(pub [u8; 24]);
-
-struct UuidGenerator {
+/// They are not part of the protocol but an internal representation
+/// required by Mailrage/Aerogramme.
+/// Their main property is to be unique without having to rely
+/// on synchronization between IMAP processes.
+#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)]
+pub struct MailIdent(pub [u8; 24]);
+
+struct IdentGenerator {
pid: u128,
sn: AtomicU64,
}
-impl UuidGenerator {
+impl IdentGenerator {
fn new() -> Self {
let time = now_msec() as u128;
let rand = thread_rng().gen::<u64>() as u128;
@@ -30,36 +34,36 @@ impl UuidGenerator {
}
}
- fn gen(&self) -> MailUuid {
+ fn gen(&self) -> MailIdent {
let sn = self.sn.fetch_add(1, Ordering::Relaxed);
let mut res = [0u8; 24];
res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid));
res[16..24].copy_from_slice(&u64::to_be_bytes(sn));
- MailUuid(res)
+ MailIdent(res)
}
}
lazy_static! {
- static ref GENERATOR: UuidGenerator = UuidGenerator::new();
+ static ref GENERATOR: IdentGenerator = IdentGenerator::new();
}
-pub fn gen_uuid() -> MailUuid {
+pub fn gen_ident() -> MailIdent {
GENERATOR.gen()
}
// -- serde --
-impl<'de> Deserialize<'de> for MailUuid {
+impl<'de> Deserialize<'de> for MailIdent {
fn deserialize<D>(d: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let v = String::deserialize(d)?;
- MailUuid::from_str(&v).map_err(D::Error::custom)
+ MailIdent::from_str(&v).map_err(D::Error::custom)
}
}
-impl Serialize for MailUuid {
+impl Serialize for MailIdent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
@@ -68,16 +72,16 @@ impl Serialize for MailUuid {
}
}
-impl ToString for MailUuid {
+impl ToString for MailIdent {
fn to_string(&self) -> String {
hex::encode(self.0)
}
}
-impl FromStr for MailUuid {
+impl FromStr for MailIdent {
type Err = &'static str;
- fn from_str(s: &str) -> Result<MailUuid, &'static str> {
+ fn from_str(s: &str) -> Result<MailIdent, &'static str> {
let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
if bytes.len() != 24 {
@@ -86,6 +90,6 @@ impl FromStr for MailUuid {
let mut tmp = [0u8; 24];
tmp[..].copy_from_slice(&bytes);
- Ok(MailUuid(tmp))
+ Ok(MailIdent(tmp))
}
}
diff --git a/src/mailbox.rs b/src/mailbox.rs
index 49d8e56..249d329 100644
--- a/src/mailbox.rs
+++ b/src/mailbox.rs
@@ -5,12 +5,27 @@ use rusoto_s3::S3Client;
use crate::bayou::Bayou;
use crate::cryptoblob::Key;
use crate::login::Credentials;
-use crate::mail_uuid::*;
+use crate::mail_ident::*;
use crate::uidindex::*;
+pub struct Summary {
+ pub validity: ImapUidvalidity,
+ pub next: ImapUid,
+ pub exists: usize,
+}
+impl std::fmt::Display for Summary {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "uidvalidity: {}, uidnext: {}, exists: {}",
+ self.validity, self.next, self.exists
+ )
+ }
+}
+
pub struct Mailbox {
bucket: String,
- name: String,
+ pub name: String,
key: Key,
k2v: K2vClient,
@@ -20,7 +35,7 @@ pub struct Mailbox {
}
impl Mailbox {
- pub async fn new(creds: &Credentials, name: String) -> Result<Self> {
+ pub fn new(creds: &Credentials, name: String) -> Result<Self> {
let uid_index = Bayou::<UidIndex>::new(creds, name.clone())?;
Ok(Self {
@@ -33,6 +48,17 @@ impl Mailbox {
})
}
+ pub async fn summary(&mut self) -> Result<Summary> {
+ self.uid_index.sync().await?;
+ let state = self.uid_index.state();
+
+ return Ok(Summary {
+ validity: state.uidvalidity,
+ next: state.uidnext,
+ exists: state.idx_by_uid.len(),
+ });
+ }
+
pub async fn test(&mut self) -> Result<()> {
self.uid_index.sync().await?;
@@ -41,22 +67,22 @@ impl Mailbox {
let add_mail_op = self
.uid_index
.state()
- .op_mail_add(gen_uuid(), vec!["\\Unseen".into()]);
+ .op_mail_add(gen_ident(), vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
dump(&self.uid_index);
- if self.uid_index.state().mails_by_uid.len() > 6 {
+ if self.uid_index.state().idx_by_uid.len() > 6 {
for i in 0..2 {
- let (_, uuid) = self
+ let (_, ident) = self
.uid_index
.state()
- .mails_by_uid
+ .idx_by_uid
.iter()
.skip(3 + i)
.next()
.unwrap();
- let del_mail_op = self.uid_index.state().op_mail_del(*uuid);
+ let del_mail_op = self.uid_index.state().op_mail_del(*ident);
self.uid_index.push(del_mail_op).await?;
dump(&self.uid_index);
@@ -73,16 +99,12 @@ fn dump(uid_index: &Bayou<UidIndex>) {
println!("UIDVALIDITY {}", s.uidvalidity);
println!("UIDNEXT {}", s.uidnext);
println!("INTERNALSEQ {}", s.internalseq);
- for (uid, uuid) in s.mails_by_uid.iter() {
+ for (uid, ident) in s.idx_by_uid.iter() {
println!(
"{} {} {}",
uid,
- hex::encode(uuid.0),
- s.mail_flags
- .get(uuid)
- .cloned()
- .unwrap_or_default()
- .join(", ")
+ hex::encode(ident.0),
+ s.table.get(ident).cloned().unwrap_or_default().1.join(", ")
);
}
println!("");
diff --git a/src/mailstore.rs b/src/mailstore.rs
new file mode 100644
index 0000000..2bcc592
--- /dev/null
+++ b/src/mailstore.rs
@@ -0,0 +1,33 @@
+use std::sync::Arc;
+
+use anyhow::{bail, Result};
+use rusoto_signature::Region;
+
+use crate::config::*;
+use crate::login::{ldap_provider::*, static_provider::*, *};
+
+pub struct Mailstore {
+ pub login_provider: Box<dyn LoginProvider + Send + Sync>,
+}
+impl Mailstore {
+ pub fn new(config: Config) -> Result<Arc<Self>> {
+ let s3_region = Region::Custom {
+ name: config.aws_region.clone(),
+ endpoint: config.s3_endpoint,
+ };
+ let k2v_region = Region::Custom {
+ name: config.aws_region,
+ endpoint: config.k2v_endpoint,
+ };
+ let login_provider: Box<dyn LoginProvider + Send + Sync> =
+ match (config.login_static, config.login_ldap) {
+ (Some(st), None) => Box::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
+ (None, Some(ld)) => Box::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
+ (Some(_), Some(_)) => {
+ bail!("A single login provider must be set up in config file")
+ }
+ (None, None) => bail!("No login provider is set up in config file"),
+ };
+ Ok(Arc::new(Self { login_provider }))
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 33d3188..9ec5af0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,11 +1,15 @@
mod bayou;
+mod command;
mod config;
mod cryptoblob;
mod lmtp;
mod login;
-mod mail_uuid;
+mod mail_ident;
mod mailbox;
+mod mailstore;
mod server;
+mod service;
+mod session;
mod time;
mod uidindex;
@@ -118,9 +122,10 @@ struct UserSecretsArgs {
#[tokio::main]
async fn main() -> Result<()> {
if std::env::var("RUST_LOG").is_err() {
- std::env::set_var("RUST_LOG", "mailrage=info,k2v_client=info")
+ std::env::set_var("RUST_LOG", "main=info,mailrage=info,k2v_client=info")
}
- pretty_env_logger::init();
+
+ tracing_subscriber::fmt::init();
let args = Args::parse();
@@ -128,14 +133,14 @@ async fn main() -> Result<()> {
Command::Server { config_file } => {
let config = read_config(config_file)?;
- let server = Server::new(config)?;
+ let server = Server::new(config).await?;
server.run().await?;
}
Command::Test { config_file } => {
let config = read_config(config_file)?;
- let server = Server::new(config)?;
- server.test().await?;
+ let server = Server::new(config).await?;
+ //server.test().await?;
}
Command::FirstLogin {
creds,
diff --git a/src/server.rs b/src/server.rs
index 1fd21b4..3abdfd1 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,78 +1,97 @@
use std::sync::Arc;
+
+
+use boitalettres::server::accept::addr::AddrIncoming;
+use boitalettres::server::accept::addr::AddrStream;
+use boitalettres::server::Server as ImapServer;
+
use anyhow::{bail, Result};
use futures::{try_join, StreamExt};
use log::*;
use rusoto_signature::Region;
use tokio::sync::watch;
+use tower::Service;
-use crate::config::*;
+use crate::mailstore;
+use crate::service;
use crate::lmtp::*;
+use crate::config::*;
use crate::login::{ldap_provider::*, static_provider::*, *};
use crate::mailbox::Mailbox;
pub struct Server {
- pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
- pub lmtp_server: Option<Arc<LmtpServer>>,
+ lmtp_server: Option<Arc<LmtpServer>>,
+ imap_server: ImapServer<AddrIncoming, service::Instance>,
}
impl Server {
- pub fn new(config: Config) -> Result<Self> {
- let s3_region = Region::Custom {
- name: config.aws_region.clone(),
- endpoint: config.s3_endpoint,
- };
- let k2v_region = Region::Custom {
- name: config.aws_region,
- endpoint: config.k2v_endpoint,
- };
- let login_provider: Arc<dyn LoginProvider + Send + Sync> =
- match (config.login_static, config.login_ldap) {
- (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
- (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
- (Some(_), Some(_)) => {
- bail!("A single login provider must be set up in config file")
- }
- (None, None) => bail!("No login provider is set up in config file"),
- };
-
- let lmtp_server = config
- .lmtp
- .map(|cfg| LmtpServer::new(cfg, login_provider.clone()));
+ pub async fn new(config: Config) -> Result<Self> {
+ let lmtp_config = config.lmtp.clone(); //@FIXME
+ let login = authenticator(config)?;
+
+ let lmtp = lmtp_config.map(|cfg| LmtpServer::new(cfg, login.clone()));
+
+ let incoming = AddrIncoming::new("127.0.0.1:4567").await?;
+ let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone()));
Ok(Self {
- login_provider,
- lmtp_server,
+ lmtp_server: lmtp,
+ imap_server: imap,
})
}
- pub async fn run(&self) -> Result<()> {
+
+ pub async fn run(self) -> Result<()> {
+ //tracing::info!("Starting server on {:#}", self.imap.incoming.local_addr);
+ tracing::info!("Starting Aerogramme...");
+
let (exit_signal, provoke_exit) = watch_ctrl_c();
let exit_on_err = move |err: anyhow::Error| {
error!("Error: {}", err);
let _ = provoke_exit.send(true);
};
+
try_join!(async {
match self.lmtp_server.as_ref() {
None => Ok(()),
Some(s) => s.run(exit_signal.clone()).await,
}
- })?;
- Ok(())
- }
-
- pub async fn test(&self) -> Result<()> {
- let creds = self.login_provider.login("lx", "plop").await?;
+ },
+ //@FIXME handle ctrl + c
+ async {
+ self.imap_server.await?;
+ Ok(())
+ }
+ )?;
- let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?;
-
- mailbox.test().await?;
Ok(())
}
}
+fn authenticator(config: Config) -> Result<Arc<dyn LoginProvider + Send + Sync>> {
+ let s3_region = Region::Custom {
+ name: config.aws_region.clone(),
+ endpoint: config.s3_endpoint,
+ };
+ let k2v_region = Region::Custom {
+ name: config.aws_region,
+ endpoint: config.k2v_endpoint,
+ };
+
+ let lp: Arc<dyn LoginProvider + Send + Sync> = match (config.login_static, config.login_ldap) {
+ (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
+ (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
+ (Some(_), Some(_)) => {
+ bail!("A single login provider must be set up in config file")
+ }
+ (None, None) => bail!("No login provider is set up in config file"),
+ };
+ Ok(lp)
+}
+
pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
let (send_cancel, watch_cancel) = watch::channel(false);
let send_cancel = Arc::new(send_cancel);
diff --git a/src/service.rs b/src/service.rs
new file mode 100644
index 0000000..ce272a3
--- /dev/null
+++ b/src/service.rs
@@ -0,0 +1,62 @@
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use anyhow::Result;
+use boitalettres::errors::Error as BalError;
+use boitalettres::proto::{Request, Response};
+use boitalettres::server::accept::addr::AddrStream;
+use futures::future::BoxFuture;
+use futures::future::FutureExt;
+use tower::Service;
+
+use crate::session;
+use crate::LoginProvider;
+
+pub struct Instance {
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+}
+impl Instance {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
+ Self { login_provider }
+ }
+}
+impl<'a> Service<&'a AddrStream> for Instance {
+ type Response = Connection;
+ type Error = anyhow::Error;
+ type Future = BoxFuture<'static, Result<Self::Response>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, addr: &'a AddrStream) -> Self::Future {
+ tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept");
+ let lp = self.login_provider.clone();
+ async { Ok(Connection::new(lp)) }.boxed()
+ }
+}
+
+pub struct Connection {
+ session: session::Manager,
+}
+impl Connection {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
+ Self {
+ session: session::Manager::new(login_provider),
+ }
+ }
+}
+impl Service<Request> for Connection {
+ type Response = Response;
+ type Error = BalError;
+ type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request) -> Self::Future {
+ tracing::debug!("Got request: {:#?}", req);
+ self.session.process(req)
+ }
+}
diff --git a/src/session.rs b/src/session.rs
new file mode 100644
index 0000000..a3e4e24
--- /dev/null
+++ b/src/session.rs
@@ -0,0 +1,145 @@
+use std::sync::Arc;
+
+use boitalettres::errors::Error as BalError;
+use boitalettres::proto::{Request, Response};
+use futures::future::BoxFuture;
+use futures::future::FutureExt;
+use imap_codec::types::command::CommandBody;
+use tokio::sync::mpsc::error::TrySendError;
+use tokio::sync::{mpsc, oneshot};
+
+use crate::command;
+use crate::login::Credentials;
+use crate::mailbox::Mailbox;
+use crate::LoginProvider;
+
+/* This constant configures backpressure in the system,
+ * or more specifically, how many pipelined messages are allowed
+ * before refusing them
+ */
+const MAX_PIPELINED_COMMANDS: usize = 10;
+
+struct Message {
+ req: Request,
+ tx: oneshot::Sender<Result<Response, BalError>>,
+}
+
+pub struct Manager {
+ tx: mpsc::Sender<Message>,
+}
+
+//@FIXME we should garbage collect the Instance when the Manager is destroyed.
+impl Manager {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
+ let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
+ tokio::spawn(async move {
+ let mut instance = Instance::new(login_provider, rx);
+ instance.start().await;
+ });
+ Self { tx }
+ }
+
+ pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
+ let (tx, rx) = oneshot::channel();
+ let msg = Message { req, tx };
+
+ // We use try_send on a bounded channel to protect the daemons from DoS.
+ // Pipelining requests in IMAP are a special case: they should not occure often
+ // and in a limited number (like 3 requests). Someone filling the channel
+ // will probably be malicious so we "rate limit" them.
+ match self.tx.try_send(msg) {
+ Ok(()) => (),
+ Err(TrySendError::Full(_)) => {
+ return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed()
+ }
+ Err(TrySendError::Closed(_)) => {
+ return async { Response::bad("The session task has exited") }.boxed()
+ }
+ };
+
+ // @FIXME add a timeout, handle a session that fails.
+ async {
+ match rx.await {
+ Ok(r) => r,
+ Err(e) => {
+ tracing::warn!("Got error {:#?}", e);
+ Response::bad("No response from the session handler")
+ }
+ }
+ }
+ .boxed()
+ }
+}
+
+pub struct User {
+ pub name: String,
+ pub creds: Credentials,
+}
+
+pub struct Instance {
+ rx: mpsc::Receiver<Message>,
+
+ pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
+ pub selected: Option<Mailbox>,
+ pub user: Option<User>,
+}
+impl Instance {
+ fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>, rx: mpsc::Receiver<Message>) -> Self {
+ Self {
+ login_provider,
+ rx,
+ selected: None,
+ user: None,
+ }
+ }
+
+ //@FIXME add a function that compute the runner's name from its local info
+ // to ease debug
+ // fn name(&self) -> String { }
+
+ async fn start(&mut self) {
+ //@FIXME add more info about the runner
+ tracing::debug!("starting runner");
+
+ while let Some(msg) = self.rx.recv().await {
+ let mut cmd = command::Command::new(msg.req.tag, self);
+ let res = match msg.req.body {
+ CommandBody::Capability => cmd.capability().await,
+ CommandBody::Login { username, password } => cmd.login(username, password).await,
+ CommandBody::Lsub {
+ reference,
+ mailbox_wildcard,
+ } => cmd.lsub(reference, mailbox_wildcard).await,
+ CommandBody::List {
+ reference,
+ mailbox_wildcard,
+ } => cmd.list(reference, mailbox_wildcard).await,
+ CommandBody::Select { mailbox } => cmd.select(mailbox).await,
+ CommandBody::Fetch {
+ sequence_set,
+ attributes,
+ uid,
+ } => cmd.fetch(sequence_set, attributes, uid).await,
+ _ => Response::bad("Error in IMAP command received by server.")
+ .map_err(anyhow::Error::new),
+ };
+
+ let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() {
+ Ok(be) => Err(be),
+ Err(ae) => {
+ tracing::warn!(error=%ae, "internal.error");
+ Response::bad("Internal error")
+ }
+ });
+
+ //@FIXME I think we should quit this thread on error and having our manager watch it,
+ // and then abort the session as it is corrupted.
+ msg.tx.send(wrapped_res).unwrap_or_else(|e| {
+ tracing::warn!("failed to send imap response to manager: {:#?}", e)
+ });
+ }
+
+ //@FIXME add more info about the runner
+ tracing::debug!("exiting runner");
+ }
+}
diff --git a/src/uidindex.rs b/src/uidindex.rs
index ecd52ff..8e4a189 100644
--- a/src/uidindex.rs
+++ b/src/uidindex.rs
@@ -1,19 +1,28 @@
-use im::OrdMap;
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
+use im::{HashMap, HashSet, OrdMap, OrdSet};
+use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use crate::bayou::*;
-use crate::mail_uuid::MailUuid;
+use crate::mail_ident::MailIdent;
-type ImapUid = u32;
-type ImapUidvalidity = u32;
+pub type ImapUid = u32;
+pub type ImapUidvalidity = u32;
+pub type Flag = String;
#[derive(Clone)]
+/// A UidIndex handles the mutable part of a mailbox
+/// It is built by running the event log on it
+/// Each applied log generates a new UidIndex by cloning the previous one
+/// and applying the event. This is why we use immutable datastructures:
+/// they are cheap to clone.
pub struct UidIndex {
- pub mail_uid: OrdMap<MailUuid, ImapUid>,
- pub mail_flags: OrdMap<MailUuid, Vec<String>>,
+ // Source of trust
+ pub table: OrdMap<MailIdent, (ImapUid, Vec<Flag>)>,
- pub mails_by_uid: OrdMap<ImapUid, MailUuid>,
+ // Indexes optimized for queries
+ pub idx_by_uid: OrdMap<ImapUid, MailIdent>,
+ pub idx_by_flag: FlagIndex,
+ // Counters
pub uidvalidity: ImapUidvalidity,
pub uidnext: ImapUid,
pub internalseq: ImapUid,
@@ -21,40 +30,66 @@ pub struct UidIndex {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum UidIndexOp {
- MailAdd(MailUuid, ImapUid, Vec<String>),
- MailDel(MailUuid),
- FlagAdd(MailUuid, Vec<String>),
- FlagDel(MailUuid, Vec<String>),
+ MailAdd(MailIdent, ImapUid, Vec<Flag>),
+ MailDel(MailIdent),
+ FlagAdd(MailIdent, Vec<Flag>),
+ FlagDel(MailIdent, Vec<Flag>),
}
impl UidIndex {
#[must_use]
- pub fn op_mail_add(&self, uuid: MailUuid, flags: Vec<String>) -> UidIndexOp {
- UidIndexOp::MailAdd(uuid, self.internalseq, flags)
+ pub fn op_mail_add(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::MailAdd(ident, self.internalseq, flags)
}
#[must_use]
- pub fn op_mail_del(&self, uuid: MailUuid) -> UidIndexOp {
- UidIndexOp::MailDel(uuid)
+ pub fn op_mail_del(&self, ident: MailIdent) -> UidIndexOp {
+ UidIndexOp::MailDel(ident)
}
#[must_use]
- pub fn op_flag_add(&self, uuid: MailUuid, flags: Vec<String>) -> UidIndexOp {
- UidIndexOp::FlagAdd(uuid, flags)
+ pub fn op_flag_add(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagAdd(ident, flags)
}
#[must_use]
- pub fn op_flag_del(&self, uuid: MailUuid, flags: Vec<String>) -> UidIndexOp {
- UidIndexOp::FlagDel(uuid, flags)
+ pub fn op_flag_del(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp {
+ UidIndexOp::FlagDel(ident, flags)
+ }
+
+ // INTERNAL functions to keep state consistent
+
+ fn reg_email(&mut self, ident: MailIdent, uid: ImapUid, flags: &Vec<Flag>) {
+ // Insert the email in our table
+ self.table.insert(ident, (uid, flags.clone()));
+
+ // Update the indexes/caches
+ self.idx_by_uid.insert(uid, ident);
+ self.idx_by_flag.insert(uid, flags);
+ }
+
+ fn unreg_email(&mut self, ident: &MailIdent) {
+ // We do nothing if the mail does not exist
+ let (uid, flags) = match self.table.get(ident) {
+ Some(v) => v,
+ None => return,
+ };
+
+ // Delete all cache entries
+ self.idx_by_uid.remove(uid);
+ self.idx_by_flag.remove(*uid, flags);
+
+ // Remove from source of trust
+ self.table.remove(ident);
}
}
impl Default for UidIndex {
fn default() -> Self {
Self {
- mail_flags: OrdMap::new(),
- mail_uid: OrdMap::new(),
- mails_by_uid: OrdMap::new(),
+ table: OrdMap::new(),
+ idx_by_uid: OrdMap::new(),
+ idx_by_flag: FlagIndex::new(),
uidvalidity: 1,
uidnext: 1,
internalseq: 1,
@@ -68,42 +103,53 @@ impl BayouState for UidIndex {
fn apply(&self, op: &UidIndexOp) -> Self {
let mut new = self.clone();
match op {
- UidIndexOp::MailAdd(uuid, uid, flags) => {
+ UidIndexOp::MailAdd(ident, uid, flags) => {
+ // Change UIDValidity if there is a conflict
if *uid < new.internalseq {
new.uidvalidity += new.internalseq - *uid;
}
+
+ // Assign the real uid of the email
let new_uid = new.internalseq;
- if let Some(prev_uid) = new.mail_uid.get(uuid) {
- new.mails_by_uid.remove(prev_uid);
- } else {
- new.mail_flags.insert(*uuid, flags.clone());
- }
- new.mails_by_uid.insert(new_uid, *uuid);
- new.mail_uid.insert(*uuid, new_uid);
+ // Delete the previous entry if any.
+ // Our proof has no assumption on `ident` uniqueness,
+ // so we must handle this case even it is very unlikely
+ // In this case, we overwrite the email.
+ // Note: assigning a new UID is mandatory.
+ new.unreg_email(ident);
+
+ // We record our email and update ou caches
+ new.reg_email(*ident, new_uid, flags);
+ // Update counters
new.internalseq += 1;
new.uidnext = new.internalseq;
}
- UidIndexOp::MailDel(uuid) => {
- if let Some(uid) = new.mail_uid.get(uuid) {
- new.mails_by_uid.remove(uid);
- new.mail_uid.remove(uuid);
- new.mail_flags.remove(uuid);
- }
+ UidIndexOp::MailDel(ident) => {
+ // If the email is known locally, we remove its references in all our indexes
+ new.unreg_email(ident);
+
+ // We update the counter
new.internalseq += 1;
}
- UidIndexOp::FlagAdd(uuid, new_flags) => {
- let mail_flags = new.mail_flags.entry(*uuid).or_insert(vec![]);
- for flag in new_flags {
- if !mail_flags.contains(flag) {
- mail_flags.push(flag.to_string());
- }
+ UidIndexOp::FlagAdd(ident, new_flags) => {
+ if let Some((uid, existing_flags)) = new.table.get_mut(ident) {
+ // Add flags to the source of trust and the cache
+ let mut to_add: Vec<Flag> = new_flags
+ .iter()
+ .filter(|f| !existing_flags.contains(f))
+ .cloned()
+ .collect();
+ new.idx_by_flag.insert(*uid, &to_add);
+ existing_flags.append(&mut to_add);
}
}
- UidIndexOp::FlagDel(uuid, rm_flags) => {
- if let Some(mail_flags) = new.mail_flags.get_mut(uuid) {
- mail_flags.retain(|x| !rm_flags.contains(x));
+ UidIndexOp::FlagDel(ident, rm_flags) => {
+ if let Some((uid, existing_flags)) = new.table.get_mut(ident) {
+ // Remove flags from the source of trust and the cache
+ existing_flags.retain(|x| !rm_flags.contains(x));
+ new.idx_by_flag.remove(*uid, rm_flags);
}
}
}
@@ -111,11 +157,34 @@ impl BayouState for UidIndex {
}
}
+// ---- FlagIndex implementation ----
+#[derive(Clone)]
+pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>);
+
+impl FlagIndex {
+ fn new() -> Self {
+ Self(HashMap::new())
+ }
+ fn insert(&mut self, uid: ImapUid, flags: &Vec<Flag>) {
+ flags.iter().for_each(|flag| {
+ self.0
+ .entry(flag.clone())
+ .or_insert(OrdSet::new())
+ .insert(uid);
+ });
+ }
+ fn remove(&mut self, uid: ImapUid, flags: &Vec<Flag>) -> () {
+ flags.iter().for_each(|flag| {
+ self.0.get_mut(flag).and_then(|set| set.remove(&uid));
+ });
+ }
+}
+
// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ----
#[derive(Serialize, Deserialize)]
struct UidIndexSerializedRepr {
- mails: Vec<(ImapUid, MailUuid, Vec<String>)>,
+ mails: Vec<(ImapUid, MailIdent, Vec<Flag>)>,
uidvalidity: ImapUidvalidity,
uidnext: ImapUid,
internalseq: ImapUid,
@@ -129,19 +198,17 @@ impl<'de> Deserialize<'de> for UidIndex {
let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?;
let mut uidindex = UidIndex {
- mail_flags: OrdMap::new(),
- mail_uid: OrdMap::new(),
- mails_by_uid: OrdMap::new(),
+ table: OrdMap::new(),
+ idx_by_uid: OrdMap::new(),
+ idx_by_flag: FlagIndex::new(),
uidvalidity: val.uidvalidity,
uidnext: val.uidnext,
internalseq: val.internalseq,
};
- for (uid, uuid, flags) in val.mails {
- uidindex.mail_flags.insert(uuid, flags);
- uidindex.mail_uid.insert(uuid, uid);
- uidindex.mails_by_uid.insert(uid, uuid);
- }
+ val.mails
+ .iter()
+ .for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f));
Ok(uidindex)
}
@@ -153,12 +220,8 @@ impl Serialize for UidIndex {
S: Serializer,
{
let mut mails = vec![];
- for (uid, uuid) in self.mails_by_uid.iter() {
- mails.push((
- *uid,
- *uuid,
- self.mail_flags.get(uuid).cloned().unwrap_or_default(),
- ));
+ for (ident, (uid, flags)) in self.table.iter() {
+ mails.push((*uid, *ident, flags.clone()));
}
let val = UidIndexSerializedRepr {
@@ -171,3 +234,99 @@ impl Serialize for UidIndex {
val.serialize(serializer)
}
}
+
+// ---- TESTS ----
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_uidindex() {
+ let mut state = UidIndex::default();
+
+ // Add message 1
+ {
+ let m = MailIdent([0x01; 24]);
+ let f = vec!["\\Recent".to_string(), "\\Archive".to_string()];
+ let ev = state.op_mail_add(m, f);
+ state = state.apply(&ev);
+
+ // Early checks
+ assert_eq!(state.table.len(), 1);
+ let (uid, flags) = state.table.get(&m).unwrap();
+ assert_eq!(*uid, 1);
+ assert_eq!(flags.len(), 2);
+ let ident = state.idx_by_uid.get(&1).unwrap();
+ assert_eq!(&m, ident);
+ let recent = state.idx_by_flag.0.get("\\Recent").unwrap();
+ assert_eq!(recent.len(), 1);
+ assert_eq!(recent.iter().next().unwrap(), &1);
+ assert_eq!(state.uidnext, 2);
+ assert_eq!(state.uidvalidity, 1);
+ }
+
+ // Add message 2
+ {
+ let m = MailIdent([0x02; 24]);
+ let f = vec!["\\Seen".to_string(), "\\Archive".to_string()];
+ let ev = state.op_mail_add(m, f);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ }
+
+ // Add flags to message 1
+ {
+ let m = MailIdent([0x01; 24]);
+ let f = vec!["Important".to_string(), "$cl_1".to_string()];
+ let ev = state.op_flag_add(m, f);
+ state = state.apply(&ev);
+ }
+
+ // Delete flags from message 1
+ {
+ let m = MailIdent([0x01; 24]);
+ let f = vec!["\\Recent".to_string()];
+ let ev = state.op_flag_del(m, f);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ }
+
+ // Delete message 2
+ {
+ let m = MailIdent([0x02; 24]);
+ let ev = state.op_mail_del(m);
+ state = state.apply(&ev);
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 1);
+ }
+
+ // Add a message 3 concurrent to message 1 (trigger a uid validity change)
+ {
+ let m = MailIdent([0x03; 24]);
+ let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
+ let ev = UidIndexOp::MailAdd(m, 1, f);
+ state = state.apply(&ev);
+ }
+
+ // Checks
+ {
+ assert_eq!(state.table.len(), 2);
+ assert!(state.uidvalidity > 1);
+
+ let (last_uid, ident) = state.idx_by_uid.get_max().unwrap();
+ assert_eq!(ident, &MailIdent([0x03; 24]));
+
+ let archive = state.idx_by_flag.0.get("\\Archive").unwrap();
+ assert_eq!(archive.len(), 2);
+ let mut iter = archive.iter();
+ assert_eq!(iter.next().unwrap(), &1);
+ assert_eq!(iter.next().unwrap(), last_uid);
+ }
+ }
+}