diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/.server.rs.swo | bin | 0 -> 12288 bytes | |||
-rw-r--r-- | src/.service.rs.swo | bin | 0 -> 12288 bytes | |||
-rw-r--r-- | src/.session.rs.swo | bin | 0 -> 20480 bytes | |||
-rw-r--r-- | src/command.rs | 112 | ||||
-rw-r--r-- | src/config.rs | 12 | ||||
-rw-r--r-- | src/lmtp.rs | 4 | ||||
-rw-r--r-- | src/login/static_provider.rs | 6 | ||||
-rw-r--r-- | src/mail_ident.rs (renamed from src/mail_uuid.rs) | 38 | ||||
-rw-r--r-- | src/mailbox.rs | 52 | ||||
-rw-r--r-- | src/mailstore.rs | 33 | ||||
-rw-r--r-- | src/main.rs | 17 | ||||
-rw-r--r-- | src/server.rs | 93 | ||||
-rw-r--r-- | src/service.rs | 62 | ||||
-rw-r--r-- | src/session.rs | 145 | ||||
-rw-r--r-- | src/uidindex.rs | 281 |
15 files changed, 711 insertions, 144 deletions
diff --git a/src/.server.rs.swo b/src/.server.rs.swo Binary files differnew file mode 100644 index 0000000..9e99bb3 --- /dev/null +++ b/src/.server.rs.swo diff --git a/src/.service.rs.swo b/src/.service.rs.swo Binary files differnew file mode 100644 index 0000000..a69e975 --- /dev/null +++ b/src/.service.rs.swo diff --git a/src/.session.rs.swo b/src/.session.rs.swo Binary files differnew file mode 100644 index 0000000..a6de20e --- /dev/null +++ b/src/.session.rs.swo diff --git a/src/command.rs b/src/command.rs new file mode 100644 index 0000000..4a2723d --- /dev/null +++ b/src/command.rs @@ -0,0 +1,112 @@ +use anyhow::Result; +use boitalettres::errors::Error as BalError; +use boitalettres::proto::{Request, Response}; +use imap_codec::types::core::{AString, Tag}; +use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; +use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; +use imap_codec::types::response::{Capability, Data}; +use imap_codec::types::sequence::SequenceSet; + +use crate::mailbox::Mailbox; +use crate::session; + +pub struct Command<'a> { + tag: Tag, + session: &'a mut session::Instance, +} + +impl<'a> Command<'a> { + pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self { + Self { tag, session } + } + + pub async fn capability(&self) -> Result<Response> { + let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; + let body = vec![Data::Capability(capabilities)]; + let r = Response::ok("Pre-login capabilities listed, post-login capabilities have more.")? + .with_body(body); + Ok(r) + } + + pub async fn login(&mut self, username: AString, password: AString) -> Result<Response> { + let (u, p) = (String::try_from(username)?, String::try_from(password)?); + tracing::info!(user = %u, "command.login"); + + let creds = match self.session.login_provider.login(&u, &p).await { + Err(_) => { + return Ok(Response::no( + "[AUTHENTICATIONFAILED] Authentication failed.", + )?) + } + Ok(c) => c, + }; + + self.session.user = Some(session::User { + creds, + name: u.clone(), + }); + + tracing::info!(username=%u, "connected"); + Ok(Response::ok("Logged in")?) + } + + pub async fn lsub( + &self, + reference: MailboxCodec, + mailbox_wildcard: ListMailbox, + ) -> Result<Response> { + Ok(Response::bad("Not implemented")?) + } + + pub async fn list( + &self, + reference: MailboxCodec, + mailbox_wildcard: ListMailbox, + ) -> Result<Response> { + Ok(Response::bad("Not implemented")?) + } + + /* + * TRACE BEGIN --- + + + Example: C: A142 SELECT INBOX + S: * 172 EXISTS + S: * 1 RECENT + S: * OK [UNSEEN 12] Message 12 is first unseen + S: * OK [UIDVALIDITY 3857529045] UIDs valid + S: * OK [UIDNEXT 4392] Predicted next UID + S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) + S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited + S: A142 OK [READ-WRITE] SELECT completed + + * TRACE END --- + */ + pub async fn select(&mut self, mailbox: MailboxCodec) -> Result<Response> { + let name = String::try_from(mailbox)?; + let user = match self.session.user.as_ref() { + Some(u) => u, + _ => return Ok(Response::no("You must be connected to use SELECT")?), + }; + + let mut mb = Mailbox::new(&user.creds, name.clone())?; + tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected"); + + let sum = mb.summary().await?; + tracing::trace!(summary=%sum, "mailbox.summary"); + + let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)]; + + self.session.selected = Some(mb); + Ok(Response::ok("[READ-WRITE] Select completed")?.with_body(body)) + } + + pub async fn fetch( + &self, + sequence_set: SequenceSet, + attributes: MacroOrFetchAttributes, + uid: bool, + ) -> Result<Response> { + Ok(Response::bad("Not implemented")?) + } +} diff --git a/src/config.rs b/src/config.rs index 9ec0ea1..5afcabd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,9 +4,9 @@ use std::net::SocketAddr; use std::path::PathBuf; use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Config { pub s3_endpoint: String, pub k2v_endpoint: String, @@ -18,13 +18,13 @@ pub struct Config { pub lmtp: Option<LmtpConfig>, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LoginStaticConfig { pub default_bucket: Option<String>, pub users: HashMap<String, LoginStaticUser>, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LoginStaticUser { #[serde(default)] pub email_addresses: Vec<String>, @@ -42,7 +42,7 @@ pub struct LoginStaticUser { pub secret_key: Option<String>, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LoginLdapConfig { pub ldap_server: String, @@ -65,7 +65,7 @@ pub struct LoginLdapConfig { pub bucket_attr: Option<String>, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LmtpConfig { pub bind_addr: SocketAddr, pub hostname: String, diff --git a/src/lmtp.rs b/src/lmtp.rs index 4186d69..049e119 100644 --- a/src/lmtp.rs +++ b/src/lmtp.rs @@ -20,7 +20,7 @@ use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Pro use crate::config::*; use crate::cryptoblob::*; use crate::login::*; -use crate::mail_uuid::*; +use crate::mail_ident::*; pub struct LmtpServer { bind_addr: SocketAddr, @@ -249,7 +249,7 @@ impl EncryptedMessage { let mut por = PutObjectRequest::default(); por.bucket = creds.storage.bucket.clone(); - por.key = format!("incoming/{}", gen_uuid().to_string()); + por.key = format!("incoming/{}", gen_ident().to_string()); por.metadata = Some( [("Message-Key".to_string(), key_header)] .into_iter() diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index aa5e499..6bbc717 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -48,14 +48,18 @@ impl StaticLoginProvider { #[async_trait] impl LoginProvider for StaticLoginProvider { async fn login(&self, username: &str, password: &str) -> Result<Credentials> { + tracing::debug!(user=%username, "login"); let user = match self.users.get(username) { None => bail!("User {} does not exist", username), Some(u) => u, }; + tracing::debug!(user=%username, "verify password"); if !verify_password(password, &user.password)? { bail!("Wrong password"); } + + tracing::debug!(user=%username, "fetch bucket"); let bucket = user .bucket .clone() @@ -64,6 +68,7 @@ impl LoginProvider for StaticLoginProvider { "No bucket configured and no default bucket specieid" ))?; + tracing::debug!(user=%username, "fetch keys"); let storage = StorageCredentials { k2v_region: self.k2v_region.clone(), s3_region: self.s3_region.clone(), @@ -92,6 +97,7 @@ impl LoginProvider for StaticLoginProvider { ), }; + tracing::debug!(user=%username, "logged"); Ok(Credentials { storage, keys }) } diff --git a/src/mail_uuid.rs b/src/mail_ident.rs index ab76bce..07e053a 100644 --- a/src/mail_uuid.rs +++ b/src/mail_ident.rs @@ -7,20 +7,24 @@ use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use crate::time::now_msec; -/// A Mail UUID is composed of two components: +/// An internal Mail Identifier is composed of two components: /// - a process identifier, 128 bits, itself composed of: /// - the timestamp of when the process started, 64 bits /// - a 64-bit random number /// - a sequence number, 64 bits -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)] -pub struct MailUuid(pub [u8; 24]); - -struct UuidGenerator { +/// They are not part of the protocol but an internal representation +/// required by Mailrage/Aerogramme. +/// Their main property is to be unique without having to rely +/// on synchronization between IMAP processes. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)] +pub struct MailIdent(pub [u8; 24]); + +struct IdentGenerator { pid: u128, sn: AtomicU64, } -impl UuidGenerator { +impl IdentGenerator { fn new() -> Self { let time = now_msec() as u128; let rand = thread_rng().gen::<u64>() as u128; @@ -30,36 +34,36 @@ impl UuidGenerator { } } - fn gen(&self) -> MailUuid { + fn gen(&self) -> MailIdent { let sn = self.sn.fetch_add(1, Ordering::Relaxed); let mut res = [0u8; 24]; res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); - MailUuid(res) + MailIdent(res) } } lazy_static! { - static ref GENERATOR: UuidGenerator = UuidGenerator::new(); + static ref GENERATOR: IdentGenerator = IdentGenerator::new(); } -pub fn gen_uuid() -> MailUuid { +pub fn gen_ident() -> MailIdent { GENERATOR.gen() } // -- serde -- -impl<'de> Deserialize<'de> for MailUuid { +impl<'de> Deserialize<'de> for MailIdent { fn deserialize<D>(d: D) -> Result<Self, D::Error> where D: Deserializer<'de>, { let v = String::deserialize(d)?; - MailUuid::from_str(&v).map_err(D::Error::custom) + MailIdent::from_str(&v).map_err(D::Error::custom) } } -impl Serialize for MailUuid { +impl Serialize for MailIdent { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer, @@ -68,16 +72,16 @@ impl Serialize for MailUuid { } } -impl ToString for MailUuid { +impl ToString for MailIdent { fn to_string(&self) -> String { hex::encode(self.0) } } -impl FromStr for MailUuid { +impl FromStr for MailIdent { type Err = &'static str; - fn from_str(s: &str) -> Result<MailUuid, &'static str> { + fn from_str(s: &str) -> Result<MailIdent, &'static str> { let bytes = hex::decode(s).map_err(|_| "invalid hex")?; if bytes.len() != 24 { @@ -86,6 +90,6 @@ impl FromStr for MailUuid { let mut tmp = [0u8; 24]; tmp[..].copy_from_slice(&bytes); - Ok(MailUuid(tmp)) + Ok(MailIdent(tmp)) } } diff --git a/src/mailbox.rs b/src/mailbox.rs index 49d8e56..249d329 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -5,12 +5,27 @@ use rusoto_s3::S3Client; use crate::bayou::Bayou; use crate::cryptoblob::Key; use crate::login::Credentials; -use crate::mail_uuid::*; +use crate::mail_ident::*; use crate::uidindex::*; +pub struct Summary { + pub validity: ImapUidvalidity, + pub next: ImapUid, + pub exists: usize, +} +impl std::fmt::Display for Summary { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "uidvalidity: {}, uidnext: {}, exists: {}", + self.validity, self.next, self.exists + ) + } +} + pub struct Mailbox { bucket: String, - name: String, + pub name: String, key: Key, k2v: K2vClient, @@ -20,7 +35,7 @@ pub struct Mailbox { } impl Mailbox { - pub async fn new(creds: &Credentials, name: String) -> Result<Self> { + pub fn new(creds: &Credentials, name: String) -> Result<Self> { let uid_index = Bayou::<UidIndex>::new(creds, name.clone())?; Ok(Self { @@ -33,6 +48,17 @@ impl Mailbox { }) } + pub async fn summary(&mut self) -> Result<Summary> { + self.uid_index.sync().await?; + let state = self.uid_index.state(); + + return Ok(Summary { + validity: state.uidvalidity, + next: state.uidnext, + exists: state.idx_by_uid.len(), + }); + } + pub async fn test(&mut self) -> Result<()> { self.uid_index.sync().await?; @@ -41,22 +67,22 @@ impl Mailbox { let add_mail_op = self .uid_index .state() - .op_mail_add(gen_uuid(), vec!["\\Unseen".into()]); + .op_mail_add(gen_ident(), vec!["\\Unseen".into()]); self.uid_index.push(add_mail_op).await?; dump(&self.uid_index); - if self.uid_index.state().mails_by_uid.len() > 6 { + if self.uid_index.state().idx_by_uid.len() > 6 { for i in 0..2 { - let (_, uuid) = self + let (_, ident) = self .uid_index .state() - .mails_by_uid + .idx_by_uid .iter() .skip(3 + i) .next() .unwrap(); - let del_mail_op = self.uid_index.state().op_mail_del(*uuid); + let del_mail_op = self.uid_index.state().op_mail_del(*ident); self.uid_index.push(del_mail_op).await?; dump(&self.uid_index); @@ -73,16 +99,12 @@ fn dump(uid_index: &Bayou<UidIndex>) { println!("UIDVALIDITY {}", s.uidvalidity); println!("UIDNEXT {}", s.uidnext); println!("INTERNALSEQ {}", s.internalseq); - for (uid, uuid) in s.mails_by_uid.iter() { + for (uid, ident) in s.idx_by_uid.iter() { println!( "{} {} {}", uid, - hex::encode(uuid.0), - s.mail_flags - .get(uuid) - .cloned() - .unwrap_or_default() - .join(", ") + hex::encode(ident.0), + s.table.get(ident).cloned().unwrap_or_default().1.join(", ") ); } println!(""); diff --git a/src/mailstore.rs b/src/mailstore.rs new file mode 100644 index 0000000..2bcc592 --- /dev/null +++ b/src/mailstore.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use anyhow::{bail, Result}; +use rusoto_signature::Region; + +use crate::config::*; +use crate::login::{ldap_provider::*, static_provider::*, *}; + +pub struct Mailstore { + pub login_provider: Box<dyn LoginProvider + Send + Sync>, +} +impl Mailstore { + pub fn new(config: Config) -> Result<Arc<Self>> { + let s3_region = Region::Custom { + name: config.aws_region.clone(), + endpoint: config.s3_endpoint, + }; + let k2v_region = Region::Custom { + name: config.aws_region, + endpoint: config.k2v_endpoint, + }; + let login_provider: Box<dyn LoginProvider + Send + Sync> = + match (config.login_static, config.login_ldap) { + (Some(st), None) => Box::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), + (None, Some(ld)) => Box::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), + (Some(_), Some(_)) => { + bail!("A single login provider must be set up in config file") + } + (None, None) => bail!("No login provider is set up in config file"), + }; + Ok(Arc::new(Self { login_provider })) + } +} diff --git a/src/main.rs b/src/main.rs index 33d3188..9ec5af0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ mod bayou; +mod command; mod config; mod cryptoblob; mod lmtp; mod login; -mod mail_uuid; +mod mail_ident; mod mailbox; +mod mailstore; mod server; +mod service; +mod session; mod time; mod uidindex; @@ -118,9 +122,10 @@ struct UserSecretsArgs { #[tokio::main] async fn main() -> Result<()> { if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "mailrage=info,k2v_client=info") + std::env::set_var("RUST_LOG", "main=info,mailrage=info,k2v_client=info") } - pretty_env_logger::init(); + + tracing_subscriber::fmt::init(); let args = Args::parse(); @@ -128,14 +133,14 @@ async fn main() -> Result<()> { Command::Server { config_file } => { let config = read_config(config_file)?; - let server = Server::new(config)?; + let server = Server::new(config).await?; server.run().await?; } Command::Test { config_file } => { let config = read_config(config_file)?; - let server = Server::new(config)?; - server.test().await?; + let server = Server::new(config).await?; + //server.test().await?; } Command::FirstLogin { creds, diff --git a/src/server.rs b/src/server.rs index 1fd21b4..3abdfd1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,78 +1,97 @@ use std::sync::Arc; + + +use boitalettres::server::accept::addr::AddrIncoming; +use boitalettres::server::accept::addr::AddrStream; +use boitalettres::server::Server as ImapServer; + use anyhow::{bail, Result}; use futures::{try_join, StreamExt}; use log::*; use rusoto_signature::Region; use tokio::sync::watch; +use tower::Service; -use crate::config::*; +use crate::mailstore; +use crate::service; use crate::lmtp::*; +use crate::config::*; use crate::login::{ldap_provider::*, static_provider::*, *}; use crate::mailbox::Mailbox; pub struct Server { - pub login_provider: Arc<dyn LoginProvider + Send + Sync>, - pub lmtp_server: Option<Arc<LmtpServer>>, + lmtp_server: Option<Arc<LmtpServer>>, + imap_server: ImapServer<AddrIncoming, service::Instance>, } impl Server { - pub fn new(config: Config) -> Result<Self> { - let s3_region = Region::Custom { - name: config.aws_region.clone(), - endpoint: config.s3_endpoint, - }; - let k2v_region = Region::Custom { - name: config.aws_region, - endpoint: config.k2v_endpoint, - }; - let login_provider: Arc<dyn LoginProvider + Send + Sync> = - match (config.login_static, config.login_ldap) { - (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), - (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), - (Some(_), Some(_)) => { - bail!("A single login provider must be set up in config file") - } - (None, None) => bail!("No login provider is set up in config file"), - }; - - let lmtp_server = config - .lmtp - .map(|cfg| LmtpServer::new(cfg, login_provider.clone())); + pub async fn new(config: Config) -> Result<Self> { + let lmtp_config = config.lmtp.clone(); //@FIXME + let login = authenticator(config)?; + + let lmtp = lmtp_config.map(|cfg| LmtpServer::new(cfg, login.clone())); + + let incoming = AddrIncoming::new("127.0.0.1:4567").await?; + let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone())); Ok(Self { - login_provider, - lmtp_server, + lmtp_server: lmtp, + imap_server: imap, }) } - pub async fn run(&self) -> Result<()> { + + pub async fn run(self) -> Result<()> { + //tracing::info!("Starting server on {:#}", self.imap.incoming.local_addr); + tracing::info!("Starting Aerogramme..."); + let (exit_signal, provoke_exit) = watch_ctrl_c(); let exit_on_err = move |err: anyhow::Error| { error!("Error: {}", err); let _ = provoke_exit.send(true); }; + try_join!(async { match self.lmtp_server.as_ref() { None => Ok(()), Some(s) => s.run(exit_signal.clone()).await, } - })?; - Ok(()) - } - - pub async fn test(&self) -> Result<()> { - let creds = self.login_provider.login("lx", "plop").await?; + }, + //@FIXME handle ctrl + c + async { + self.imap_server.await?; + Ok(()) + } + )?; - let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?; - - mailbox.test().await?; Ok(()) } } +fn authenticator(config: Config) -> Result<Arc<dyn LoginProvider + Send + Sync>> { + let s3_region = Region::Custom { + name: config.aws_region.clone(), + endpoint: config.s3_endpoint, + }; + let k2v_region = Region::Custom { + name: config.aws_region, + endpoint: config.k2v_endpoint, + }; + + let lp: Arc<dyn LoginProvider + Send + Sync> = match (config.login_static, config.login_ldap) { + (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), + (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), + (Some(_), Some(_)) => { + bail!("A single login provider must be set up in config file") + } + (None, None) => bail!("No login provider is set up in config file"), + }; + Ok(lp) +} + pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) { let (send_cancel, watch_cancel) = watch::channel(false); let send_cancel = Arc::new(send_cancel); diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..ce272a3 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; +use std::task::{Context, Poll}; + +use anyhow::Result; +use boitalettres::errors::Error as BalError; +use boitalettres::proto::{Request, Response}; +use boitalettres::server::accept::addr::AddrStream; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use tower::Service; + +use crate::session; +use crate::LoginProvider; + +pub struct Instance { + login_provider: Arc<dyn LoginProvider + Send + Sync>, +} +impl Instance { + pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { + Self { login_provider } + } +} +impl<'a> Service<&'a AddrStream> for Instance { + type Response = Connection; + type Error = anyhow::Error; + type Future = BoxFuture<'static, Result<Self::Response>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, addr: &'a AddrStream) -> Self::Future { + tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); + let lp = self.login_provider.clone(); + async { Ok(Connection::new(lp)) }.boxed() + } +} + +pub struct Connection { + session: session::Manager, +} +impl Connection { + pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { + Self { + session: session::Manager::new(login_provider), + } + } +} +impl Service<Request> for Connection { + type Response = Response; + type Error = BalError; + type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + tracing::debug!("Got request: {:#?}", req); + self.session.process(req) + } +} diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..a3e4e24 --- /dev/null +++ b/src/session.rs @@ -0,0 +1,145 @@ +use std::sync::Arc; + +use boitalettres::errors::Error as BalError; +use boitalettres::proto::{Request, Response}; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use imap_codec::types::command::CommandBody; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, oneshot}; + +use crate::command; +use crate::login::Credentials; +use crate::mailbox::Mailbox; +use crate::LoginProvider; + +/* This constant configures backpressure in the system, + * or more specifically, how many pipelined messages are allowed + * before refusing them + */ +const MAX_PIPELINED_COMMANDS: usize = 10; + +struct Message { + req: Request, + tx: oneshot::Sender<Result<Response, BalError>>, +} + +pub struct Manager { + tx: mpsc::Sender<Message>, +} + +//@FIXME we should garbage collect the Instance when the Manager is destroyed. +impl Manager { + pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { + let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); + tokio::spawn(async move { + let mut instance = Instance::new(login_provider, rx); + instance.start().await; + }); + Self { tx } + } + + pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> { + let (tx, rx) = oneshot::channel(); + let msg = Message { req, tx }; + + // We use try_send on a bounded channel to protect the daemons from DoS. + // Pipelining requests in IMAP are a special case: they should not occure often + // and in a limited number (like 3 requests). Someone filling the channel + // will probably be malicious so we "rate limit" them. + match self.tx.try_send(msg) { + Ok(()) => (), + Err(TrySendError::Full(_)) => { + return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed() + } + Err(TrySendError::Closed(_)) => { + return async { Response::bad("The session task has exited") }.boxed() + } + }; + + // @FIXME add a timeout, handle a session that fails. + async { + match rx.await { + Ok(r) => r, + Err(e) => { + tracing::warn!("Got error {:#?}", e); + Response::bad("No response from the session handler") + } + } + } + .boxed() + } +} + +pub struct User { + pub name: String, + pub creds: Credentials, +} + +pub struct Instance { + rx: mpsc::Receiver<Message>, + + pub login_provider: Arc<dyn LoginProvider + Send + Sync>, + pub selected: Option<Mailbox>, + pub user: Option<User>, +} +impl Instance { + fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>, rx: mpsc::Receiver<Message>) -> Self { + Self { + login_provider, + rx, + selected: None, + user: None, + } + } + + //@FIXME add a function that compute the runner's name from its local info + // to ease debug + // fn name(&self) -> String { } + + async fn start(&mut self) { + //@FIXME add more info about the runner + tracing::debug!("starting runner"); + + while let Some(msg) = self.rx.recv().await { + let mut cmd = command::Command::new(msg.req.tag, self); + let res = match msg.req.body { + CommandBody::Capability => cmd.capability().await, + CommandBody::Login { username, password } => cmd.login(username, password).await, + CommandBody::Lsub { + reference, + mailbox_wildcard, + } => cmd.lsub(reference, mailbox_wildcard).await, + CommandBody::List { + reference, + mailbox_wildcard, + } => cmd.list(reference, mailbox_wildcard).await, + CommandBody::Select { mailbox } => cmd.select(mailbox).await, + CommandBody::Fetch { + sequence_set, + attributes, + uid, + } => cmd.fetch(sequence_set, attributes, uid).await, + _ => Response::bad("Error in IMAP command received by server.") + .map_err(anyhow::Error::new), + }; + + let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() { + Ok(be) => Err(be), + Err(ae) => { + tracing::warn!(error=%ae, "internal.error"); + Response::bad("Internal error") + } + }); + + //@FIXME I think we should quit this thread on error and having our manager watch it, + // and then abort the session as it is corrupted. + msg.tx.send(wrapped_res).unwrap_or_else(|e| { + tracing::warn!("failed to send imap response to manager: {:#?}", e) + }); + } + + //@FIXME add more info about the runner + tracing::debug!("exiting runner"); + } +} diff --git a/src/uidindex.rs b/src/uidindex.rs index ecd52ff..8e4a189 100644 --- a/src/uidindex.rs +++ b/src/uidindex.rs @@ -1,19 +1,28 @@ -use im::OrdMap; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use im::{HashMap, HashSet, OrdMap, OrdSet}; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use crate::bayou::*; -use crate::mail_uuid::MailUuid; +use crate::mail_ident::MailIdent; -type ImapUid = u32; -type ImapUidvalidity = u32; +pub type ImapUid = u32; +pub type ImapUidvalidity = u32; +pub type Flag = String; #[derive(Clone)] +/// A UidIndex handles the mutable part of a mailbox +/// It is built by running the event log on it +/// Each applied log generates a new UidIndex by cloning the previous one +/// and applying the event. This is why we use immutable datastructures: +/// they are cheap to clone. pub struct UidIndex { - pub mail_uid: OrdMap<MailUuid, ImapUid>, - pub mail_flags: OrdMap<MailUuid, Vec<String>>, + // Source of trust + pub table: OrdMap<MailIdent, (ImapUid, Vec<Flag>)>, - pub mails_by_uid: OrdMap<ImapUid, MailUuid>, + // Indexes optimized for queries + pub idx_by_uid: OrdMap<ImapUid, MailIdent>, + pub idx_by_flag: FlagIndex, + // Counters pub uidvalidity: ImapUidvalidity, pub uidnext: ImapUid, pub internalseq: ImapUid, @@ -21,40 +30,66 @@ pub struct UidIndex { #[derive(Clone, Serialize, Deserialize, Debug)] pub enum UidIndexOp { - MailAdd(MailUuid, ImapUid, Vec<String>), - MailDel(MailUuid), - FlagAdd(MailUuid, Vec<String>), - FlagDel(MailUuid, Vec<String>), + MailAdd(MailIdent, ImapUid, Vec<Flag>), + MailDel(MailIdent), + FlagAdd(MailIdent, Vec<Flag>), + FlagDel(MailIdent, Vec<Flag>), } impl UidIndex { #[must_use] - pub fn op_mail_add(&self, uuid: MailUuid, flags: Vec<String>) -> UidIndexOp { - UidIndexOp::MailAdd(uuid, self.internalseq, flags) + pub fn op_mail_add(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp { + UidIndexOp::MailAdd(ident, self.internalseq, flags) } #[must_use] - pub fn op_mail_del(&self, uuid: MailUuid) -> UidIndexOp { - UidIndexOp::MailDel(uuid) + pub fn op_mail_del(&self, ident: MailIdent) -> UidIndexOp { + UidIndexOp::MailDel(ident) } #[must_use] - pub fn op_flag_add(&self, uuid: MailUuid, flags: Vec<String>) -> UidIndexOp { - UidIndexOp::FlagAdd(uuid, flags) + pub fn op_flag_add(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp { + UidIndexOp::FlagAdd(ident, flags) } #[must_use] - pub fn op_flag_del(&self, uuid: MailUuid, flags: Vec<String>) -> UidIndexOp { - UidIndexOp::FlagDel(uuid, flags) + pub fn op_flag_del(&self, ident: MailIdent, flags: Vec<Flag>) -> UidIndexOp { + UidIndexOp::FlagDel(ident, flags) + } + + // INTERNAL functions to keep state consistent + + fn reg_email(&mut self, ident: MailIdent, uid: ImapUid, flags: &Vec<Flag>) { + // Insert the email in our table + self.table.insert(ident, (uid, flags.clone())); + + // Update the indexes/caches + self.idx_by_uid.insert(uid, ident); + self.idx_by_flag.insert(uid, flags); + } + + fn unreg_email(&mut self, ident: &MailIdent) { + // We do nothing if the mail does not exist + let (uid, flags) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + + // Delete all cache entries + self.idx_by_uid.remove(uid); + self.idx_by_flag.remove(*uid, flags); + + // Remove from source of trust + self.table.remove(ident); } } impl Default for UidIndex { fn default() -> Self { Self { - mail_flags: OrdMap::new(), - mail_uid: OrdMap::new(), - mails_by_uid: OrdMap::new(), + table: OrdMap::new(), + idx_by_uid: OrdMap::new(), + idx_by_flag: FlagIndex::new(), uidvalidity: 1, uidnext: 1, internalseq: 1, @@ -68,42 +103,53 @@ impl BayouState for UidIndex { fn apply(&self, op: &UidIndexOp) -> Self { let mut new = self.clone(); match op { - UidIndexOp::MailAdd(uuid, uid, flags) => { + UidIndexOp::MailAdd(ident, uid, flags) => { + // Change UIDValidity if there is a conflict if *uid < new.internalseq { new.uidvalidity += new.internalseq - *uid; } + + // Assign the real uid of the email let new_uid = new.internalseq; - if let Some(prev_uid) = new.mail_uid.get(uuid) { - new.mails_by_uid.remove(prev_uid); - } else { - new.mail_flags.insert(*uuid, flags.clone()); - } - new.mails_by_uid.insert(new_uid, *uuid); - new.mail_uid.insert(*uuid, new_uid); + // Delete the previous entry if any. + // Our proof has no assumption on `ident` uniqueness, + // so we must handle this case even it is very unlikely + // In this case, we overwrite the email. + // Note: assigning a new UID is mandatory. + new.unreg_email(ident); + + // We record our email and update ou caches + new.reg_email(*ident, new_uid, flags); + // Update counters new.internalseq += 1; new.uidnext = new.internalseq; } - UidIndexOp::MailDel(uuid) => { - if let Some(uid) = new.mail_uid.get(uuid) { - new.mails_by_uid.remove(uid); - new.mail_uid.remove(uuid); - new.mail_flags.remove(uuid); - } + UidIndexOp::MailDel(ident) => { + // If the email is known locally, we remove its references in all our indexes + new.unreg_email(ident); + + // We update the counter new.internalseq += 1; } - UidIndexOp::FlagAdd(uuid, new_flags) => { - let mail_flags = new.mail_flags.entry(*uuid).or_insert(vec![]); - for flag in new_flags { - if !mail_flags.contains(flag) { - mail_flags.push(flag.to_string()); - } + UidIndexOp::FlagAdd(ident, new_flags) => { + if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + // Add flags to the source of trust and the cache + let mut to_add: Vec<Flag> = new_flags + .iter() + .filter(|f| !existing_flags.contains(f)) + .cloned() + .collect(); + new.idx_by_flag.insert(*uid, &to_add); + existing_flags.append(&mut to_add); } } - UidIndexOp::FlagDel(uuid, rm_flags) => { - if let Some(mail_flags) = new.mail_flags.get_mut(uuid) { - mail_flags.retain(|x| !rm_flags.contains(x)); + UidIndexOp::FlagDel(ident, rm_flags) => { + if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + // Remove flags from the source of trust and the cache + existing_flags.retain(|x| !rm_flags.contains(x)); + new.idx_by_flag.remove(*uid, rm_flags); } } } @@ -111,11 +157,34 @@ impl BayouState for UidIndex { } } +// ---- FlagIndex implementation ---- +#[derive(Clone)] +pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>); + +impl FlagIndex { + fn new() -> Self { + Self(HashMap::new()) + } + fn insert(&mut self, uid: ImapUid, flags: &Vec<Flag>) { + flags.iter().for_each(|flag| { + self.0 + .entry(flag.clone()) + .or_insert(OrdSet::new()) + .insert(uid); + }); + } + fn remove(&mut self, uid: ImapUid, flags: &Vec<Flag>) -> () { + flags.iter().for_each(|flag| { + self.0.get_mut(flag).and_then(|set| set.remove(&uid)); + }); + } +} + // ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- #[derive(Serialize, Deserialize)] struct UidIndexSerializedRepr { - mails: Vec<(ImapUid, MailUuid, Vec<String>)>, + mails: Vec<(ImapUid, MailIdent, Vec<Flag>)>, uidvalidity: ImapUidvalidity, uidnext: ImapUid, internalseq: ImapUid, @@ -129,19 +198,17 @@ impl<'de> Deserialize<'de> for UidIndex { let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; let mut uidindex = UidIndex { - mail_flags: OrdMap::new(), - mail_uid: OrdMap::new(), - mails_by_uid: OrdMap::new(), + table: OrdMap::new(), + idx_by_uid: OrdMap::new(), + idx_by_flag: FlagIndex::new(), uidvalidity: val.uidvalidity, uidnext: val.uidnext, internalseq: val.internalseq, }; - for (uid, uuid, flags) in val.mails { - uidindex.mail_flags.insert(uuid, flags); - uidindex.mail_uid.insert(uuid, uid); - uidindex.mails_by_uid.insert(uid, uuid); - } + val.mails + .iter() + .for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f)); Ok(uidindex) } @@ -153,12 +220,8 @@ impl Serialize for UidIndex { S: Serializer, { let mut mails = vec![]; - for (uid, uuid) in self.mails_by_uid.iter() { - mails.push(( - *uid, - *uuid, - self.mail_flags.get(uuid).cloned().unwrap_or_default(), - )); + for (ident, (uid, flags)) in self.table.iter() { + mails.push((*uid, *ident, flags.clone())); } let val = UidIndexSerializedRepr { @@ -171,3 +234,99 @@ impl Serialize for UidIndex { val.serialize(serializer) } } + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uidindex() { + let mut state = UidIndex::default(); + + // Add message 1 + { + let m = MailIdent([0x01; 24]); + let f = vec!["\\Recent".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + // Early checks + assert_eq!(state.table.len(), 1); + let (uid, flags) = state.table.get(&m).unwrap(); + assert_eq!(*uid, 1); + assert_eq!(flags.len(), 2); + let ident = state.idx_by_uid.get(&1).unwrap(); + assert_eq!(&m, ident); + let recent = state.idx_by_flag.0.get("\\Recent").unwrap(); + assert_eq!(recent.len(), 1); + assert_eq!(recent.iter().next().unwrap(), &1); + assert_eq!(state.uidnext, 2); + assert_eq!(state.uidvalidity, 1); + } + + // Add message 2 + { + let m = MailIdent([0x02; 24]); + let f = vec!["\\Seen".to_string(), "\\Archive".to_string()]; + let ev = state.op_mail_add(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Add flags to message 1 + { + let m = MailIdent([0x01; 24]); + let f = vec!["Important".to_string(), "$cl_1".to_string()]; + let ev = state.op_flag_add(m, f); + state = state.apply(&ev); + } + + // Delete flags from message 1 + { + let m = MailIdent([0x01; 24]); + let f = vec!["\\Recent".to_string()]; + let ev = state.op_flag_del(m, f); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + } + + // Delete message 2 + { + let m = MailIdent([0x02; 24]); + let ev = state.op_mail_del(m); + state = state.apply(&ev); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 1); + } + + // Add a message 3 concurrent to message 1 (trigger a uid validity change) + { + let m = MailIdent([0x03; 24]); + let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; + let ev = UidIndexOp::MailAdd(m, 1, f); + state = state.apply(&ev); + } + + // Checks + { + assert_eq!(state.table.len(), 2); + assert!(state.uidvalidity > 1); + + let (last_uid, ident) = state.idx_by_uid.get_max().unwrap(); + assert_eq!(ident, &MailIdent([0x03; 24])); + + let archive = state.idx_by_flag.0.get("\\Archive").unwrap(); + assert_eq!(archive.len(), 2); + let mut iter = archive.iter(); + assert_eq!(iter.next().unwrap(), &1); + assert_eq!(iter.next().unwrap(), last_uid); + } + } +} |