aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/auth.rs384
-rw-r--r--src/imap/mod.rs34
-rw-r--r--src/lmtp.rs8
-rw-r--r--src/login/ldap_provider.rs3
-rw-r--r--src/main.rs12
-rw-r--r--src/server.rs15
-rw-r--r--src/storage/garage.rs33
7 files changed, 314 insertions, 175 deletions
diff --git a/src/auth.rs b/src/auth.rs
index a3edcbc..81ba496 100644
--- a/src/auth.rs
+++ b/src/auth.rs
@@ -1,6 +1,6 @@
use std::net::SocketAddr;
-use anyhow::{Result, anyhow, bail};
+use anyhow::{anyhow, bail, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::io::BufStream;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
@@ -25,9 +25,9 @@ use crate::login::ArcLoginProvider;
/// C: VERSION 1 2
/// C: CPID 1
///
-/// C: AUTH 2 PLAIN service=smtp
-/// S: CONT 2
-/// C: CONT 2 base64stringFollowingRFC4616==
+/// C: AUTH 2 PLAIN service=smtp
+/// S: CONT 2
+/// C: CONT 2 base64stringFollowingRFC4616==
/// S: OK 2 user=alice@example.tld
///
/// C: AUTH 42 LOGIN service=smtp
@@ -41,7 +41,7 @@ use crate::login::ArcLoginProvider;
/// ## RFC References
///
/// PLAIN SASL - https://datatracker.ietf.org/doc/html/rfc4616
-///
+///
///
/// ## Dovecot References
///
@@ -54,22 +54,20 @@ pub struct AuthServer {
bind_addr: SocketAddr,
}
-
impl AuthServer {
- pub fn new(
- config: AuthConfig,
- login_provider: ArcLoginProvider,
- ) -> Self {
+ pub fn new(config: AuthConfig, login_provider: ArcLoginProvider) -> Self {
Self {
bind_addr: config.bind_addr,
login_provider,
}
}
-
pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
let tcp = TcpListener::bind(self.bind_addr).await?;
- tracing::info!("SASL Authentication Protocol listening on {:#}", self.bind_addr);
+ tracing::info!(
+ "SASL Authentication Protocol listening on {:#}",
+ self.bind_addr
+ );
let mut connections = FuturesUnordered::new();
@@ -89,8 +87,9 @@ impl AuthServer {
};
tracing::info!("AUTH: accepted connection from {}", remote_addr);
- let conn = tokio::spawn(NetLoop::new(socket, self.login_provider.clone(), must_exit.clone()).run_error());
-
+ let conn = tokio::spawn(
+ NetLoop::new(socket, self.login_provider.clone(), must_exit.clone()).run_error(),
+ );
connections.push(conn);
}
@@ -106,7 +105,7 @@ impl AuthServer {
struct NetLoop {
login: ArcLoginProvider,
stream: BufStream<TcpStream>,
- stop: watch::Receiver<bool>,
+ stop: watch::Receiver<bool>,
state: State,
read_buf: Vec<u8>,
write_buf: BytesMut,
@@ -197,45 +196,53 @@ enum State {
Init,
HandshakePart(Version),
HandshakeDone,
- AuthPlainProgress {
- id: u64,
- },
- AuthDone {
- id: u64,
- res: AuthRes
- },
+ AuthPlainProgress { id: u64 },
+ AuthDone { id: u64, res: AuthRes },
}
const SERVER_MAJOR: u64 = 1;
const SERVER_MINOR: u64 = 2;
impl State {
async fn progress(&mut self, cmd: ClientCommand, login: &ArcLoginProvider) {
-
let new_state = 'state: {
match (std::mem::replace(self, State::Error), cmd) {
(Self::Init, ClientCommand::Version(v)) => Self::HandshakePart(v),
(Self::HandshakePart(version), ClientCommand::Cpid(_cpid)) => {
if version.major != SERVER_MAJOR {
- tracing::error!(client_major=version.major, server_major=SERVER_MAJOR, "Unsupported client major version");
- break 'state Self::Error
+ tracing::error!(
+ client_major = version.major,
+ server_major = SERVER_MAJOR,
+ "Unsupported client major version"
+ );
+ break 'state Self::Error;
}
-
+
Self::HandshakeDone
- },
- (Self::HandshakeDone { .. }, ClientCommand::Auth { id, mech, .. }) |
- (Self::AuthDone { .. }, ClientCommand::Auth { id, mech, ..}) => {
+ }
+ (Self::HandshakeDone { .. }, ClientCommand::Auth { id, mech, .. })
+ | (Self::AuthDone { .. }, ClientCommand::Auth { id, mech, .. }) => {
if mech != Mechanism::Plain {
tracing::error!(mechanism=?mech, "Unsupported Authentication Mechanism");
- break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) }
+ break 'state Self::AuthDone {
+ id,
+ res: AuthRes::Failed(None, None),
+ };
}
- Self::AuthPlainProgress { id }
- },
+ Self::AuthPlainProgress { id }
+ }
(Self::AuthPlainProgress { id }, ClientCommand::Cont { id: cid, data }) => {
// Check that ID matches
if cid != id {
- tracing::error!(auth_id=id, cont_id=cid, "CONT id does not match AUTH id");
- break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) }
+ tracing::error!(
+ auth_id = id,
+ cont_id = cid,
+ "CONT id does not match AUTH id"
+ );
+ break 'state Self::AuthDone {
+ id,
+ res: AuthRes::Failed(None, None),
+ };
}
// Check that we can extract user's login+pass
@@ -243,36 +250,54 @@ impl State {
Ok(([], ([], user, pass))) => (user, pass),
Ok(_) => {
tracing::error!("Impersonating user is not supported");
- break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) }
+ break 'state Self::AuthDone {
+ id,
+ res: AuthRes::Failed(None, None),
+ };
}
Err(e) => {
tracing::error!(err=?e, "Could not parse the SASL PLAIN data chunk");
- break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) }
- },
+ break 'state Self::AuthDone {
+ id,
+ res: AuthRes::Failed(None, None),
+ };
+ }
};
// Try to convert it to UTF-8
- let (user, password) = match (std::str::from_utf8(ubin), std::str::from_utf8(pbin)) {
- (Ok(u), Ok(p)) => (u, p),
- _ => {
- tracing::error!("Username or password contain invalid UTF-8 characters");
- break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) }
- }
- };
+ let (user, password) =
+ match (std::str::from_utf8(ubin), std::str::from_utf8(pbin)) {
+ (Ok(u), Ok(p)) => (u, p),
+ _ => {
+ tracing::error!(
+ "Username or password contain invalid UTF-8 characters"
+ );
+ break 'state Self::AuthDone {
+ id,
+ res: AuthRes::Failed(None, None),
+ };
+ }
+ };
// Try to connect user
match login.login(user, password).await {
- Ok(_) => Self::AuthDone { id, res: AuthRes::Success(user.to_string())},
+ Ok(_) => Self::AuthDone {
+ id,
+ res: AuthRes::Success(user.to_string()),
+ },
Err(e) => {
tracing::warn!(err=?e, "login failed");
- Self::AuthDone { id, res: AuthRes::Failed(Some(user.to_string()), None) }
+ Self::AuthDone {
+ id,
+ res: AuthRes::Failed(Some(user.to_string()), None),
+ }
}
}
- },
+ }
_ => {
tracing::error!("This command is not valid in this context");
Self::Error
- },
+ }
}
};
tracing::debug!(state=?new_state, "Made progress");
@@ -284,7 +309,10 @@ impl State {
match self {
Self::HandshakeDone { .. } => {
- srv_cmd.push(ServerCommand::Version(Version { major: SERVER_MAJOR, minor: SERVER_MINOR }));
+ srv_cmd.push(ServerCommand::Version(Version {
+ major: SERVER_MAJOR,
+ minor: SERVER_MINOR,
+ }));
srv_cmd.push(ServerCommand::Mech {
kind: Mechanism::Plain,
@@ -299,16 +327,34 @@ impl State {
srv_cmd.push(ServerCommand::Cookie(cookie));
srv_cmd.push(ServerCommand::Done);
- },
+ }
Self::AuthPlainProgress { id } => {
- srv_cmd.push(ServerCommand::Cont { id: *id, data: None });
- },
- Self::AuthDone { id, res: AuthRes::Success(user) } => {
- srv_cmd.push(ServerCommand::Ok { id: *id, user_id: Some(user.to_string()), extra_parameters: vec![]});
- },
- Self::AuthDone { id, res: AuthRes::Failed(maybe_user, maybe_failcode) } => {
- srv_cmd.push(ServerCommand::Fail { id: *id, user_id: maybe_user.clone(), code: maybe_failcode.clone(), extra_parameters: vec![]});
- },
+ srv_cmd.push(ServerCommand::Cont {
+ id: *id,
+ data: None,
+ });
+ }
+ Self::AuthDone {
+ id,
+ res: AuthRes::Success(user),
+ } => {
+ srv_cmd.push(ServerCommand::Ok {
+ id: *id,
+ user_id: Some(user.to_string()),
+ extra_parameters: vec![],
+ });
+ }
+ Self::AuthDone {
+ id,
+ res: AuthRes::Failed(maybe_user, maybe_failcode),
+ } => {
+ srv_cmd.push(ServerCommand::Fail {
+ id: *id,
+ user_id: maybe_user.clone(),
+ code: maybe_failcode.clone(),
+ extra_parameters: vec![],
+ });
+ }
_ => (),
};
@@ -316,7 +362,6 @@ impl State {
}
}
-
// -----------------------------------------------------------------
//
// DOVECOT AUTH TYPES
@@ -329,7 +374,6 @@ enum Mechanism {
Login,
}
-
#[derive(Clone, Debug)]
enum AuthOption {
/// Unique session ID. Mainly used for logging.
@@ -343,9 +387,9 @@ enum AuthOption {
/// Remote client port
RemotePort(u16),
/// When Dovecot proxy is used, the real_rip/real_port are the proxy’s IP/port and real_lip/real_lport are the backend’s IP/port where the proxy was connected to.
- RealRemoteIp(String),
- RealLocalIp(String),
- RealLocalPort(u16),
+ RealRemoteIp(String),
+ RealLocalIp(String),
+ RealLocalPort(u16),
RealRemotePort(u16),
/// TLS SNI name
LocalName(String),
@@ -380,8 +424,8 @@ enum AuthOption {
/// An unknown key
UnknownPair(String, Vec<u8>),
UnknownBool(Vec<u8>),
- /// Initial response for authentication mechanism.
- /// NOTE: This must be the last parameter. Everything after it is ignored.
+ /// Initial response for authentication mechanism.
+ /// NOTE: This must be the last parameter. Everything after it is ignored.
/// This is to avoid accidental security holes if user-given data is directly put to base64 string without filtering out tabs.
/// @FIXME: I don't understand this parameter
Resp(Vec<u8>),
@@ -409,14 +453,13 @@ enum ClientCommand {
service: String,
/// All the optional parameters
options: Vec<AuthOption>,
-
},
Cont {
/// The <id> must match the <id> of the AUTH command.
id: u64,
/// Data that will be serialized to / deserialized from base64
data: Vec<u8>,
- }
+ },
}
#[derive(Debug)]
@@ -464,8 +507,8 @@ enum ServerCommand {
parameters: Vec<MechanismParameters>,
},
/// COOKIE returns connection-specific 128 bit cookie in hex. It must be given to REQUEST command. (Protocol v1.1+ / Dovecot v2.0+)
- Cookie([u8;16]),
- /// DONE finishes the handshake from server.
+ Cookie([u8; 16]),
+ /// DONE finishes the handshake from server.
Done,
Fail {
@@ -478,7 +521,7 @@ enum ServerCommand {
id: u64,
data: Option<Vec<u8>>,
},
- /// FAIL and OK may contain multiple unspecified parameters which authentication client may handle specially.
+ /// FAIL and OK may contain multiple unspecified parameters which authentication client may handle specially.
/// The only one specified here is user=<userid> parameter, which should always be sent if the userid is known.
Ok {
id: u64,
@@ -493,26 +536,20 @@ enum ServerCommand {
//
// ------------------------------------------------------------------
+use base64::Engine;
use nom::{
- IResult,
- branch::alt,
- error::{ErrorKind, Error},
- character::complete::{tab, u64, u16},
- bytes::complete::{is_not, tag, tag_no_case, take, take_while, take_while1},
- multi::{many1, separated_list0},
- combinator::{map, opt, recognize, value, rest},
- sequence::{pair, preceded, tuple},
+ branch::alt,
+ bytes::complete::{is_not, tag, tag_no_case, take, take_while, take_while1},
+ character::complete::{tab, u16, u64},
+ combinator::{map, opt, recognize, rest, value},
+ error::{Error, ErrorKind},
+ multi::{many1, separated_list0},
+ sequence::{pair, preceded, tuple},
+ IResult,
};
-use base64::Engine;
fn version_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
- let mut parser = tuple((
- tag_no_case(b"VERSION"),
- tab,
- u64,
- tab,
- u64
- ));
+ let mut parser = tuple((tag_no_case(b"VERSION"), tab, u64, tab, u64));
let (input, (_, _, major, _, minor)) = parser(input)?;
Ok((input, ClientCommand::Version(Version { major, minor })))
@@ -521,7 +558,7 @@ fn version_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
fn cpid_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
preceded(
pair(tag_no_case(b"CPID"), tab),
- map(u64, |v| ClientCommand::Cpid(v))
+ map(u64, |v| ClientCommand::Cpid(v)),
)(input)
}
@@ -541,10 +578,7 @@ fn is_esc<'a>(input: &'a [u8]) -> IResult<&'a [u8], &[u8]> {
}
fn parameter<'a>(input: &'a [u8]) -> IResult<&'a [u8], &[u8]> {
- recognize(many1(alt((
- take_while1(is_not_tab_or_esc_or_lf),
- is_esc
- ))))(input)
+ recognize(many1(alt((take_while1(is_not_tab_or_esc_or_lf), is_esc))))(input)
}
fn parameter_str(input: &[u8]) -> IResult<&[u8], String> {
@@ -568,10 +602,7 @@ fn parameter_name(input: &[u8]) -> IResult<&[u8], String> {
}
fn service<'a>(input: &'a [u8]) -> IResult<&'a [u8], String> {
- preceded(
- tag_no_case("service="),
- parameter_str
- )(input)
+ preceded(tag_no_case("service="), parameter_str)(input)
}
fn auth_option<'a>(input: &'a [u8]) -> IResult<&'a [u8], AuthOption> {
@@ -583,31 +614,74 @@ fn auth_option<'a>(input: &'a [u8]) -> IResult<&'a [u8], AuthOption> {
value(ClientId, tag_no_case(b"client_id")),
value(NoLogin, tag_no_case(b"nologin")),
map(preceded(tag_no_case(b"session="), u64), |id| Session(id)),
- map(preceded(tag_no_case(b"lip="), parameter_str), |ip| LocalIp(ip)),
- map(preceded(tag_no_case(b"rip="), parameter_str), |ip| RemoteIp(ip)),
- map(preceded(tag_no_case(b"lport="), u16), |port| LocalPort(port)),
- map(preceded(tag_no_case(b"rport="), u16), |port| RemotePort(port)),
- map(preceded(tag_no_case(b"real_rip="), parameter_str), |ip| RealRemoteIp(ip)),
- map(preceded(tag_no_case(b"real_lip="), parameter_str), |ip| RealLocalIp(ip)),
- map(preceded(tag_no_case(b"real_lport="), u16), |port| RealLocalPort(port)),
- map(preceded(tag_no_case(b"real_rport="), u16), |port| RealRemotePort(port)),
+ map(preceded(tag_no_case(b"lip="), parameter_str), |ip| {
+ LocalIp(ip)
+ }),
+ map(preceded(tag_no_case(b"rip="), parameter_str), |ip| {
+ RemoteIp(ip)
+ }),
+ map(preceded(tag_no_case(b"lport="), u16), |port| {
+ LocalPort(port)
+ }),
+ map(preceded(tag_no_case(b"rport="), u16), |port| {
+ RemotePort(port)
+ }),
+ map(preceded(tag_no_case(b"real_rip="), parameter_str), |ip| {
+ RealRemoteIp(ip)
+ }),
+ map(preceded(tag_no_case(b"real_lip="), parameter_str), |ip| {
+ RealLocalIp(ip)
+ }),
+ map(preceded(tag_no_case(b"real_lport="), u16), |port| {
+ RealLocalPort(port)
+ }),
+ map(preceded(tag_no_case(b"real_rport="), u16), |port| {
+ RealRemotePort(port)
+ }),
)),
alt((
- map(preceded(tag_no_case(b"local_name="), parameter_str), |name| LocalName(name)),
- map(preceded(tag_no_case(b"forward_views="), parameter), |views| ForwardViews(views.into())),
- map(preceded(tag_no_case(b"secured="), parameter_str), |info| Secured(Some(info))),
+ map(
+ preceded(tag_no_case(b"local_name="), parameter_str),
+ |name| LocalName(name),
+ ),
+ map(
+ preceded(tag_no_case(b"forward_views="), parameter),
+ |views| ForwardViews(views.into()),
+ ),
+ map(preceded(tag_no_case(b"secured="), parameter_str), |info| {
+ Secured(Some(info))
+ }),
value(Secured(None), tag_no_case(b"secured")),
value(CertUsername, tag_no_case(b"cert_username")),
- map(preceded(tag_no_case(b"transport="), parameter_str), |ts| Transport(ts)),
- map(preceded(tag_no_case(b"tls_cipher="), parameter_str), |cipher| TlsCipher(cipher)),
- map(preceded(tag_no_case(b"tls_cipher_bits="), parameter_str), |bits| TlsCipherBits(bits)),
- map(preceded(tag_no_case(b"tls_pfs="), parameter_str), |pfs| TlsPfs(pfs)),
- map(preceded(tag_no_case(b"tls_protocol="), parameter_str), |proto| TlsProtocol(proto)),
- map(preceded(tag_no_case(b"valid-client-cert="), parameter_str), |cert| ValidClientCert(cert)),
+ map(preceded(tag_no_case(b"transport="), parameter_str), |ts| {
+ Transport(ts)
+ }),
+ map(
+ preceded(tag_no_case(b"tls_cipher="), parameter_str),
+ |cipher| TlsCipher(cipher),
+ ),
+ map(
+ preceded(tag_no_case(b"tls_cipher_bits="), parameter_str),
+ |bits| TlsCipherBits(bits),
+ ),
+ map(preceded(tag_no_case(b"tls_pfs="), parameter_str), |pfs| {
+ TlsPfs(pfs)
+ }),
+ map(
+ preceded(tag_no_case(b"tls_protocol="), parameter_str),
+ |proto| TlsProtocol(proto),
+ ),
+ map(
+ preceded(tag_no_case(b"valid-client-cert="), parameter_str),
+ |cert| ValidClientCert(cert),
+ ),
)),
alt((
map(preceded(tag_no_case(b"resp="), base64), |data| Resp(data)),
- map(tuple((parameter_name, tag(b"="), parameter)), |(n, _, v)| UnknownPair(n, v.into())),
+ map(
+ tuple((parameter_name, tag(b"="), parameter)),
+ |(n, _, v)| UnknownPair(n, v.into()),
+ ),
map(parameter, |v| UnknownBool(v.into())),
)),
))(input)
@@ -622,13 +696,20 @@ fn auth_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
mechanism,
tab,
service,
- map(
- opt(preceded(tab, separated_list0(tab, auth_option))),
- |o| o.unwrap_or(vec![])
- ),
+ map(opt(preceded(tab, separated_list0(tab, auth_option))), |o| {
+ o.unwrap_or(vec![])
+ }),
));
let (input, (_, _, id, _, mech, _, service, options)) = parser(input)?;
- Ok((input, ClientCommand::Auth { id, mech, service, options }))
+ Ok((
+ input,
+ ClientCommand::Auth {
+ id,
+ mech,
+ service,
+ options,
+ },
+ ))
}
fn is_base64_core(c: u8) -> bool {
@@ -644,10 +725,7 @@ fn is_base64_pad(c: u8) -> bool {
}
fn base64(input: &[u8]) -> IResult<&[u8], Vec<u8>> {
- let (input, (b64, _)) = tuple((
- take_while1(is_base64_core),
- take_while(is_base64_pad),
- ))(input)?;
+ let (input, (b64, _)) = tuple((take_while1(is_base64_core), take_while(is_base64_pad)))(input)?;
let data = base64::engine::general_purpose::STANDARD_NO_PAD
.decode(b64)
@@ -657,26 +735,15 @@ fn base64(input: &[u8]) -> IResult<&[u8], Vec<u8>> {
}
/// @FIXME Dovecot does not say if base64 content must be padded or not
-fn cont_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
- let mut parser = tuple((
- tag_no_case(b"CONT"),
- tab,
- u64,
- tab,
- base64
- ));
+fn cont_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
+ let mut parser = tuple((tag_no_case(b"CONT"), tab, u64, tab, base64));
let (input, (_, _, id, _, data)) = parser(input)?;
Ok((input, ClientCommand::Cont { id, data }))
}
fn client_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> {
- alt((
- version_command,
- cpid_command,
- auth_command,
- cont_command,
- ))(input)
+ alt((version_command, cpid_command, auth_command, cont_command))(input)
}
/*
@@ -698,7 +765,13 @@ fn not_null(c: u8) -> bool {
// impersonated user, login, password
fn auth_plain<'a>(input: &'a [u8]) -> IResult<&'a [u8], (&'a [u8], &'a [u8], &'a [u8])> {
map(
- tuple((take_while(not_null), take(1usize), take_while(not_null), take(1usize), rest)),
+ tuple((
+ take_while(not_null),
+ take(1usize),
+ take_while(not_null),
+ take(1usize),
+ rest,
+ )),
|(imp, _, user, _, pass)| (imp, user, pass),
)(input)
}
@@ -746,7 +819,6 @@ impl Encode for MechanismParameters {
}
}
-
impl Encode for FailCode {
fn encode(&self, out: &mut BytesMut) -> Result<()> {
match self {
@@ -762,33 +834,32 @@ impl Encode for FailCode {
impl Encode for ServerCommand {
fn encode(&self, out: &mut BytesMut) -> Result<()> {
match self {
- Self::Version (Version { major, minor }) => {
+ Self::Version(Version { major, minor }) => {
out.put(&b"VERSION"[..]);
tab_enc(out);
out.put(major.to_string().as_bytes());
tab_enc(out);
out.put(minor.to_string().as_bytes());
lf_enc(out);
- },
+ }
Self::Spid(pid) => {
out.put(&b"SPID"[..]);
tab_enc(out);
out.put(pid.to_string().as_bytes());
lf_enc(out);
- },
+ }
Self::Cuid(pid) => {
out.put(&b"CUID"[..]);
tab_enc(out);
out.put(pid.to_string().as_bytes());
lf_enc(out);
- },
+ }
Self::Cookie(cval) => {
out.put(&b"COOKIE"[..]);
tab_enc(out);
- out.put(hex::encode(cval).as_bytes());
+ out.put(hex::encode(cval).as_bytes());
lf_enc(out);
-
- },
+ }
Self::Mech { kind, parameters } => {
out.put(&b"MECH"[..]);
tab_enc(out);
@@ -798,11 +869,11 @@ impl Encode for ServerCommand {
p.encode(out)?;
}
lf_enc(out);
- },
+ }
Self::Done => {
out.put(&b"DONE"[..]);
lf_enc(out);
- },
+ }
Self::Cont { id, data } => {
out.put(&b"CONT"[..]);
tab_enc(out);
@@ -813,8 +884,12 @@ impl Encode for ServerCommand {
out.put(b64.as_bytes());
}
lf_enc(out);
- },
- Self::Ok { id, user_id, extra_parameters } => {
+ }
+ Self::Ok {
+ id,
+ user_id,
+ extra_parameters,
+ } => {
out.put(&b"OK"[..]);
tab_enc(out);
out.put(id.to_string().as_bytes());
@@ -828,8 +903,13 @@ impl Encode for ServerCommand {
out.put(&p[..]);
}
lf_enc(out);
- },
- Self::Fail {id, user_id, code, extra_parameters } => {
+ }
+ Self::Fail {
+ id,
+ user_id,
+ code,
+ extra_parameters,
+ } => {
out.put(&b"FAIL"[..]);
tab_enc(out);
out.put(id.to_string().as_bytes());
@@ -848,7 +928,7 @@ impl Encode for ServerCommand {
out.put(&p[..]);
}
lf_enc(out);
- },
+ }
}
Ok(())
}
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index b08a4ff..6f143d7 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -26,8 +26,8 @@ use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Respons
use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
-use tokio_rustls::TlsAcceptor;
use rustls_pemfile::{certs, private_key};
+use tokio_rustls::TlsAcceptor;
use crate::config::{ImapConfig, ImapUnsecureConfig};
use crate::imap::capability::ServerCapability;
@@ -53,8 +53,14 @@ struct ClientContext {
}
pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> {
- let loaded_certs = certs(&mut std::io::BufReader::new(std::fs::File::open(config.certs)?)).collect::<Result<Vec<_>, _>>()?;
- let loaded_key = private_key(&mut std::io::BufReader::new(std::fs::File::open(config.key)?))?.unwrap();
+ let loaded_certs = certs(&mut std::io::BufReader::new(std::fs::File::open(
+ config.certs,
+ )?))
+ .collect::<Result<Vec<_>, _>>()?;
+ let loaded_key = private_key(&mut std::io::BufReader::new(std::fs::File::open(
+ config.key,
+ )?))?
+ .unwrap();
let tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
@@ -109,7 +115,7 @@ impl Server {
}
};
AnyStream::new(stream)
- },
+ }
None => AnyStream::new(socket),
};
@@ -135,6 +141,8 @@ use std::sync::Arc;
use tokio::sync::mpsc::*;
use tokio::sync::Notify;
use tokio_util::bytes::BytesMut;
+
+#[derive(Debug)]
enum LoopMode {
Quit,
Interactive,
@@ -232,8 +240,10 @@ impl NetLoop {
}
async fn core(mut self) -> Result<()> {
+ tracing::trace!("Starting the core loop");
let mut mode = LoopMode::Interactive;
loop {
+ tracing::trace!(mode=?mode, "Core loop iter");
mode = match mode {
LoopMode::Interactive => self.interactive_mode().await?,
LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
@@ -275,6 +285,7 @@ impl NetLoop {
// Managing response generated by Aerogramme
maybe_msg = self.resp_rx.recv() => match maybe_msg {
Some(ResponseOrIdle::Response(response)) => {
+ tracing::trace!("Interactive, server has a response for the client");
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
@@ -284,6 +295,7 @@ impl NetLoop {
self.server.enqueue_status(response.completion);
},
Some(ResponseOrIdle::StartIdle(stop)) => {
+ tracing::trace!("Interactive, server agreed to switch in idle mode");
let cr = CommandContinuationRequest::basic(None, "Idling")?;
self.server.enqueue_continuation(cr);
self.cmd_tx.try_send(Request::Idle)?;
@@ -299,6 +311,7 @@ impl NetLoop {
// When receiving a CTRL+C
_ = self.ctx.must_exit.changed() => {
+ tracing::trace!("Interactive, CTRL+C, exiting");
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
@@ -308,6 +321,7 @@ impl NetLoop {
async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
// Flush send
loop {
+ tracing::trace!("flush server send");
match self.server.progress_send().await? {
Some(..) => continue,
None => break,
@@ -319,6 +333,7 @@ impl NetLoop {
maybe_msg = self.resp_rx.recv() => match maybe_msg {
// Session decided idle is terminated
Some(ResponseOrIdle::Response(response)) => {
+ tracing::trace!("server imap session said idle is done, sending response done, switching to interactive");
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
@@ -330,6 +345,7 @@ impl NetLoop {
},
// Session has some information for user
Some(ResponseOrIdle::IdleEvent(elems)) => {
+ tracing::trace!("server imap session has some change to communicate to the client");
for body_elem in elems.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
@@ -352,16 +368,21 @@ impl NetLoop {
},
// User is trying to interact with us
- _read_client_bytes = self.server.stream.read(&mut buff) => {
+ read_client_result = self.server.stream.read(&mut buff) => {
+ let _bytes_read = read_client_result?;
use imap_codec::decode::Decoder;
let codec = imap_codec::IdleDoneCodec::new();
+ tracing::trace!("client sent some data for the server IMAP session");
match codec.decode(&buff) {
Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => {
// Session will be informed that it must stop idle
// It will generate the "done" message and change the loop mode
+ tracing::trace!("client sent DONE and want to stop IDLE");
stop.notify_one()
},
- Err(_) => (),
+ Err(_) => {
+ tracing::trace!("Unable to decode DONE, maybe not enough data were sent?");
+ },
_ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."),
};
@@ -370,6 +391,7 @@ impl NetLoop {
// When receiving a CTRL+C
_ = self.ctx.must_exit.changed() => {
+ tracing::trace!("CTRL+C sent, aborting IDLE for this session");
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
return Ok(LoopMode::Interactive)
},
diff --git a/src/lmtp.rs b/src/lmtp.rs
index f88ed8f..28c4d51 100644
--- a/src/lmtp.rs
+++ b/src/lmtp.rs
@@ -16,7 +16,7 @@ use tokio::select;
use tokio::sync::watch;
use tokio_util::compat::*;
-use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode};
+use smtp_message::{Email, EscapedDataReader, DataUnescaper, Reply, ReplyCode};
use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata};
use crate::config::*;
@@ -181,6 +181,12 @@ impl Config for LmtpServer {
return err_response_stream(meta, format!("io error: {}", e));
}
reader.complete();
+ let raw_size = text.len();
+
+ // Unescape email, shrink it also to remove last dot
+ let unesc_res = DataUnescaper::new(true).unescape(&mut text);
+ text.truncate(unesc_res.written);
+ tracing::debug!(prev_sz=raw_size, new_sz=text.len(), "unescaped");
let encrypted_message = match EncryptedMessage::new(text) {
Ok(x) => Arc::new(x),
diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs
index 81e5879..e73e1dc 100644
--- a/src/login/ldap_provider.rs
+++ b/src/login/ldap_provider.rs
@@ -60,6 +60,9 @@ impl LdapLoginProvider {
let specific = match config.storage {
LdapStorage::InMemory => StorageSpecific::InMemory,
LdapStorage::Garage(grgconf) => {
+ attrs_to_retrieve.push(grgconf.aws_access_key_id_attr.clone());
+ attrs_to_retrieve.push(grgconf.aws_secret_access_key_attr.clone());
+
let bucket_source =
match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) {
(Some(b), None) => BucketSource::Constant(b),
diff --git a/src/main.rs b/src/main.rs
index 72bce83..12a5895 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -34,7 +34,12 @@ struct Args {
#[clap(long)]
dev: bool,
- #[clap(short, long, env = "AEROGRAMME_CONFIG", default_value = "aerogramme.toml")]
+ #[clap(
+ short,
+ long,
+ env = "AEROGRAMME_CONFIG",
+ default_value = "aerogramme.toml"
+ )]
/// Path to the main Aerogramme configuration file
config_file: PathBuf,
}
@@ -187,7 +192,10 @@ async fn main() -> Result<()> {
hostname: "example.tld".to_string(),
}),
auth: Some(AuthConfig {
- bind_addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345),
+ bind_addr: SocketAddr::new(
+ IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
+ 12345,
+ ),
}),
users: UserManagement::Demo,
})
diff --git a/src/server.rs b/src/server.rs
index cf9930a..9899981 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -7,9 +7,9 @@ use futures::try_join;
use log::*;
use tokio::sync::watch;
+use crate::auth;
use crate::config::*;
use crate::imap;
-use crate::auth;
use crate::lmtp::*;
use crate::login::ArcLoginProvider;
use crate::login::{demo_provider::*, ldap_provider::*, static_provider::*};
@@ -47,9 +47,16 @@ impl Server {
};
let lmtp_server = config.lmtp.map(|lmtp| LmtpServer::new(lmtp, login.clone()));
- let imap_unsecure_server = config.imap_unsecure.map(|imap| imap::new_unsecure(imap, login.clone()));
- let imap_server = config.imap.map(|imap| imap::new(imap, login.clone())).transpose()?;
- let auth_server = config.auth.map(|auth| auth::AuthServer::new(auth, login.clone()));
+ let imap_unsecure_server = config
+ .imap_unsecure
+ .map(|imap| imap::new_unsecure(imap, login.clone()));
+ let imap_server = config
+ .imap
+ .map(|imap| imap::new(imap, login.clone()))
+ .transpose()?;
+ let auth_server = config
+ .auth
+ .map(|auth| auth::AuthServer::new(auth, login.clone()));
Ok(Self {
lmtp_server,
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 90b84d6..709e729 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -105,6 +105,7 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) ->
#[async_trait]
impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ tracing::trace!(select=%select, command="row_fetch");
let (pk_list, batch_op) = match select {
Selector::Range {
shard,
@@ -196,21 +197,26 @@ impl IStore for GarageStore {
}
Ok(v) => v,
};
+ //println!("fetch res -> {:?}", all_raw_res);
- let row_vals = all_raw_res
- .into_iter()
- .fold(vec![], |mut acc, v| {
- acc.extend(v.items);
- acc
- })
- .into_iter()
- .zip(pk_list.into_iter())
- .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
- .collect::<Vec<_>>();
+ let row_vals =
+ all_raw_res
+ .into_iter()
+ .zip(pk_list.into_iter())
+ .fold(vec![], |mut acc, (page, pk)| {
+ page.items
+ .into_iter()
+ .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
+ .for_each(|rr| acc.push(rr));
+
+ acc
+ });
+ tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch");
Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
+ tracing::trace!(select=%select, command="row_rm");
let del_op = match select {
Selector::Range {
shard,
@@ -280,6 +286,7 @@ impl IStore for GarageStore {
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
+ tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
let batch_ops = values
.iter()
.map(|v| k2v_client::BatchInsertOp {
@@ -307,6 +314,7 @@ impl IStore for GarageStore {
}
}
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
+ tracing::trace!(entry=%value, command="row_poll");
loop {
if let Some(ct) = &value.causality {
match self
@@ -343,6 +351,7 @@ impl IStore for GarageStore {
}
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_fetch");
let maybe_out = self
.s3
.get_object()
@@ -382,6 +391,7 @@ impl IStore for GarageStore {
Ok(bv)
}
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
+ tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
let maybe_send = self
@@ -406,6 +416,7 @@ impl IStore for GarageStore {
}
}
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(src=%src, dst=%dst, command="blob_copy");
let maybe_copy = self
.s3
.copy_object()
@@ -433,6 +444,7 @@ impl IStore for GarageStore {
}
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
+ tracing::trace!(prefix = prefix, command = "blob_list");
let maybe_list = self
.s3
.list_objects_v2()
@@ -462,6 +474,7 @@ impl IStore for GarageStore {
}
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_rm");
let maybe_delete = self
.s3
.delete_object()