diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-01-31 11:01:18 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-01-31 11:01:18 +0100 |
commit | 22f0eb901ae1f4a38a93bcfc268ebe0f74a6482e (patch) | |
tree | 885d59c3bfb961b058b2ad7a7ec8915750c54a3b | |
parent | c27919a757ac15fe49393ef91d90a525a3e501ee (diff) | |
download | aerogramme-22f0eb901ae1f4a38a93bcfc268ebe0f74a6482e.tar.gz aerogramme-22f0eb901ae1f4a38a93bcfc268ebe0f74a6482e.zip |
format + fix storage bug
-rw-r--r-- | src/auth.rs | 384 | ||||
-rw-r--r-- | src/imap/mod.rs | 14 | ||||
-rw-r--r-- | src/main.rs | 12 | ||||
-rw-r--r-- | src/server.rs | 15 | ||||
-rw-r--r-- | src/storage/garage.rs | 33 |
5 files changed, 286 insertions, 172 deletions
diff --git a/src/auth.rs b/src/auth.rs index a3edcbc..81ba496 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use anyhow::{Result, anyhow, bail}; +use anyhow::{anyhow, bail, Result}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::io::BufStream; use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; @@ -25,9 +25,9 @@ use crate::login::ArcLoginProvider; /// C: VERSION 1 2 /// C: CPID 1 /// -/// C: AUTH 2 PLAIN service=smtp -/// S: CONT 2 -/// C: CONT 2 base64stringFollowingRFC4616== +/// C: AUTH 2 PLAIN service=smtp +/// S: CONT 2 +/// C: CONT 2 base64stringFollowingRFC4616== /// S: OK 2 user=alice@example.tld /// /// C: AUTH 42 LOGIN service=smtp @@ -41,7 +41,7 @@ use crate::login::ArcLoginProvider; /// ## RFC References /// /// PLAIN SASL - https://datatracker.ietf.org/doc/html/rfc4616 -/// +/// /// /// ## Dovecot References /// @@ -54,22 +54,20 @@ pub struct AuthServer { bind_addr: SocketAddr, } - impl AuthServer { - pub fn new( - config: AuthConfig, - login_provider: ArcLoginProvider, - ) -> Self { + pub fn new(config: AuthConfig, login_provider: ArcLoginProvider) -> Self { Self { bind_addr: config.bind_addr, login_provider, } } - pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> { let tcp = TcpListener::bind(self.bind_addr).await?; - tracing::info!("SASL Authentication Protocol listening on {:#}", self.bind_addr); + tracing::info!( + "SASL Authentication Protocol listening on {:#}", + self.bind_addr + ); let mut connections = FuturesUnordered::new(); @@ -89,8 +87,9 @@ impl AuthServer { }; tracing::info!("AUTH: accepted connection from {}", remote_addr); - let conn = tokio::spawn(NetLoop::new(socket, self.login_provider.clone(), must_exit.clone()).run_error()); - + let conn = tokio::spawn( + NetLoop::new(socket, self.login_provider.clone(), must_exit.clone()).run_error(), + ); connections.push(conn); } @@ -106,7 +105,7 @@ impl AuthServer { struct NetLoop { login: ArcLoginProvider, stream: BufStream<TcpStream>, - stop: watch::Receiver<bool>, + stop: watch::Receiver<bool>, state: State, read_buf: Vec<u8>, write_buf: BytesMut, @@ -197,45 +196,53 @@ enum State { Init, HandshakePart(Version), HandshakeDone, - AuthPlainProgress { - id: u64, - }, - AuthDone { - id: u64, - res: AuthRes - }, + AuthPlainProgress { id: u64 }, + AuthDone { id: u64, res: AuthRes }, } const SERVER_MAJOR: u64 = 1; const SERVER_MINOR: u64 = 2; impl State { async fn progress(&mut self, cmd: ClientCommand, login: &ArcLoginProvider) { - let new_state = 'state: { match (std::mem::replace(self, State::Error), cmd) { (Self::Init, ClientCommand::Version(v)) => Self::HandshakePart(v), (Self::HandshakePart(version), ClientCommand::Cpid(_cpid)) => { if version.major != SERVER_MAJOR { - tracing::error!(client_major=version.major, server_major=SERVER_MAJOR, "Unsupported client major version"); - break 'state Self::Error + tracing::error!( + client_major = version.major, + server_major = SERVER_MAJOR, + "Unsupported client major version" + ); + break 'state Self::Error; } - + Self::HandshakeDone - }, - (Self::HandshakeDone { .. }, ClientCommand::Auth { id, mech, .. }) | - (Self::AuthDone { .. }, ClientCommand::Auth { id, mech, ..}) => { + } + (Self::HandshakeDone { .. }, ClientCommand::Auth { id, mech, .. }) + | (Self::AuthDone { .. }, ClientCommand::Auth { id, mech, .. }) => { if mech != Mechanism::Plain { tracing::error!(mechanism=?mech, "Unsupported Authentication Mechanism"); - break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) } + break 'state Self::AuthDone { + id, + res: AuthRes::Failed(None, None), + }; } - Self::AuthPlainProgress { id } - }, + Self::AuthPlainProgress { id } + } (Self::AuthPlainProgress { id }, ClientCommand::Cont { id: cid, data }) => { // Check that ID matches if cid != id { - tracing::error!(auth_id=id, cont_id=cid, "CONT id does not match AUTH id"); - break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) } + tracing::error!( + auth_id = id, + cont_id = cid, + "CONT id does not match AUTH id" + ); + break 'state Self::AuthDone { + id, + res: AuthRes::Failed(None, None), + }; } // Check that we can extract user's login+pass @@ -243,36 +250,54 @@ impl State { Ok(([], ([], user, pass))) => (user, pass), Ok(_) => { tracing::error!("Impersonating user is not supported"); - break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) } + break 'state Self::AuthDone { + id, + res: AuthRes::Failed(None, None), + }; } Err(e) => { tracing::error!(err=?e, "Could not parse the SASL PLAIN data chunk"); - break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) } - }, + break 'state Self::AuthDone { + id, + res: AuthRes::Failed(None, None), + }; + } }; // Try to convert it to UTF-8 - let (user, password) = match (std::str::from_utf8(ubin), std::str::from_utf8(pbin)) { - (Ok(u), Ok(p)) => (u, p), - _ => { - tracing::error!("Username or password contain invalid UTF-8 characters"); - break 'state Self::AuthDone { id, res: AuthRes::Failed(None, None) } - } - }; + let (user, password) = + match (std::str::from_utf8(ubin), std::str::from_utf8(pbin)) { + (Ok(u), Ok(p)) => (u, p), + _ => { + tracing::error!( + "Username or password contain invalid UTF-8 characters" + ); + break 'state Self::AuthDone { + id, + res: AuthRes::Failed(None, None), + }; + } + }; // Try to connect user match login.login(user, password).await { - Ok(_) => Self::AuthDone { id, res: AuthRes::Success(user.to_string())}, + Ok(_) => Self::AuthDone { + id, + res: AuthRes::Success(user.to_string()), + }, Err(e) => { tracing::warn!(err=?e, "login failed"); - Self::AuthDone { id, res: AuthRes::Failed(Some(user.to_string()), None) } + Self::AuthDone { + id, + res: AuthRes::Failed(Some(user.to_string()), None), + } } } - }, + } _ => { tracing::error!("This command is not valid in this context"); Self::Error - }, + } } }; tracing::debug!(state=?new_state, "Made progress"); @@ -284,7 +309,10 @@ impl State { match self { Self::HandshakeDone { .. } => { - srv_cmd.push(ServerCommand::Version(Version { major: SERVER_MAJOR, minor: SERVER_MINOR })); + srv_cmd.push(ServerCommand::Version(Version { + major: SERVER_MAJOR, + minor: SERVER_MINOR, + })); srv_cmd.push(ServerCommand::Mech { kind: Mechanism::Plain, @@ -299,16 +327,34 @@ impl State { srv_cmd.push(ServerCommand::Cookie(cookie)); srv_cmd.push(ServerCommand::Done); - }, + } Self::AuthPlainProgress { id } => { - srv_cmd.push(ServerCommand::Cont { id: *id, data: None }); - }, - Self::AuthDone { id, res: AuthRes::Success(user) } => { - srv_cmd.push(ServerCommand::Ok { id: *id, user_id: Some(user.to_string()), extra_parameters: vec![]}); - }, - Self::AuthDone { id, res: AuthRes::Failed(maybe_user, maybe_failcode) } => { - srv_cmd.push(ServerCommand::Fail { id: *id, user_id: maybe_user.clone(), code: maybe_failcode.clone(), extra_parameters: vec![]}); - }, + srv_cmd.push(ServerCommand::Cont { + id: *id, + data: None, + }); + } + Self::AuthDone { + id, + res: AuthRes::Success(user), + } => { + srv_cmd.push(ServerCommand::Ok { + id: *id, + user_id: Some(user.to_string()), + extra_parameters: vec![], + }); + } + Self::AuthDone { + id, + res: AuthRes::Failed(maybe_user, maybe_failcode), + } => { + srv_cmd.push(ServerCommand::Fail { + id: *id, + user_id: maybe_user.clone(), + code: maybe_failcode.clone(), + extra_parameters: vec![], + }); + } _ => (), }; @@ -316,7 +362,6 @@ impl State { } } - // ----------------------------------------------------------------- // // DOVECOT AUTH TYPES @@ -329,7 +374,6 @@ enum Mechanism { Login, } - #[derive(Clone, Debug)] enum AuthOption { /// Unique session ID. Mainly used for logging. @@ -343,9 +387,9 @@ enum AuthOption { /// Remote client port RemotePort(u16), /// When Dovecot proxy is used, the real_rip/real_port are the proxy’s IP/port and real_lip/real_lport are the backend’s IP/port where the proxy was connected to. - RealRemoteIp(String), - RealLocalIp(String), - RealLocalPort(u16), + RealRemoteIp(String), + RealLocalIp(String), + RealLocalPort(u16), RealRemotePort(u16), /// TLS SNI name LocalName(String), @@ -380,8 +424,8 @@ enum AuthOption { /// An unknown key UnknownPair(String, Vec<u8>), UnknownBool(Vec<u8>), - /// Initial response for authentication mechanism. - /// NOTE: This must be the last parameter. Everything after it is ignored. + /// Initial response for authentication mechanism. + /// NOTE: This must be the last parameter. Everything after it is ignored. /// This is to avoid accidental security holes if user-given data is directly put to base64 string without filtering out tabs. /// @FIXME: I don't understand this parameter Resp(Vec<u8>), @@ -409,14 +453,13 @@ enum ClientCommand { service: String, /// All the optional parameters options: Vec<AuthOption>, - }, Cont { /// The <id> must match the <id> of the AUTH command. id: u64, /// Data that will be serialized to / deserialized from base64 data: Vec<u8>, - } + }, } #[derive(Debug)] @@ -464,8 +507,8 @@ enum ServerCommand { parameters: Vec<MechanismParameters>, }, /// COOKIE returns connection-specific 128 bit cookie in hex. It must be given to REQUEST command. (Protocol v1.1+ / Dovecot v2.0+) - Cookie([u8;16]), - /// DONE finishes the handshake from server. + Cookie([u8; 16]), + /// DONE finishes the handshake from server. Done, Fail { @@ -478,7 +521,7 @@ enum ServerCommand { id: u64, data: Option<Vec<u8>>, }, - /// FAIL and OK may contain multiple unspecified parameters which authentication client may handle specially. + /// FAIL and OK may contain multiple unspecified parameters which authentication client may handle specially. /// The only one specified here is user=<userid> parameter, which should always be sent if the userid is known. Ok { id: u64, @@ -493,26 +536,20 @@ enum ServerCommand { // // ------------------------------------------------------------------ +use base64::Engine; use nom::{ - IResult, - branch::alt, - error::{ErrorKind, Error}, - character::complete::{tab, u64, u16}, - bytes::complete::{is_not, tag, tag_no_case, take, take_while, take_while1}, - multi::{many1, separated_list0}, - combinator::{map, opt, recognize, value, rest}, - sequence::{pair, preceded, tuple}, + branch::alt, + bytes::complete::{is_not, tag, tag_no_case, take, take_while, take_while1}, + character::complete::{tab, u16, u64}, + combinator::{map, opt, recognize, rest, value}, + error::{Error, ErrorKind}, + multi::{many1, separated_list0}, + sequence::{pair, preceded, tuple}, + IResult, }; -use base64::Engine; fn version_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { - let mut parser = tuple(( - tag_no_case(b"VERSION"), - tab, - u64, - tab, - u64 - )); + let mut parser = tuple((tag_no_case(b"VERSION"), tab, u64, tab, u64)); let (input, (_, _, major, _, minor)) = parser(input)?; Ok((input, ClientCommand::Version(Version { major, minor }))) @@ -521,7 +558,7 @@ fn version_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { fn cpid_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { preceded( pair(tag_no_case(b"CPID"), tab), - map(u64, |v| ClientCommand::Cpid(v)) + map(u64, |v| ClientCommand::Cpid(v)), )(input) } @@ -541,10 +578,7 @@ fn is_esc<'a>(input: &'a [u8]) -> IResult<&'a [u8], &[u8]> { } fn parameter<'a>(input: &'a [u8]) -> IResult<&'a [u8], &[u8]> { - recognize(many1(alt(( - take_while1(is_not_tab_or_esc_or_lf), - is_esc - ))))(input) + recognize(many1(alt((take_while1(is_not_tab_or_esc_or_lf), is_esc))))(input) } fn parameter_str(input: &[u8]) -> IResult<&[u8], String> { @@ -568,10 +602,7 @@ fn parameter_name(input: &[u8]) -> IResult<&[u8], String> { } fn service<'a>(input: &'a [u8]) -> IResult<&'a [u8], String> { - preceded( - tag_no_case("service="), - parameter_str - )(input) + preceded(tag_no_case("service="), parameter_str)(input) } fn auth_option<'a>(input: &'a [u8]) -> IResult<&'a [u8], AuthOption> { @@ -583,31 +614,74 @@ fn auth_option<'a>(input: &'a [u8]) -> IResult<&'a [u8], AuthOption> { value(ClientId, tag_no_case(b"client_id")), value(NoLogin, tag_no_case(b"nologin")), map(preceded(tag_no_case(b"session="), u64), |id| Session(id)), - map(preceded(tag_no_case(b"lip="), parameter_str), |ip| LocalIp(ip)), - map(preceded(tag_no_case(b"rip="), parameter_str), |ip| RemoteIp(ip)), - map(preceded(tag_no_case(b"lport="), u16), |port| LocalPort(port)), - map(preceded(tag_no_case(b"rport="), u16), |port| RemotePort(port)), - map(preceded(tag_no_case(b"real_rip="), parameter_str), |ip| RealRemoteIp(ip)), - map(preceded(tag_no_case(b"real_lip="), parameter_str), |ip| RealLocalIp(ip)), - map(preceded(tag_no_case(b"real_lport="), u16), |port| RealLocalPort(port)), - map(preceded(tag_no_case(b"real_rport="), u16), |port| RealRemotePort(port)), + map(preceded(tag_no_case(b"lip="), parameter_str), |ip| { + LocalIp(ip) + }), + map(preceded(tag_no_case(b"rip="), parameter_str), |ip| { + RemoteIp(ip) + }), + map(preceded(tag_no_case(b"lport="), u16), |port| { + LocalPort(port) + }), + map(preceded(tag_no_case(b"rport="), u16), |port| { + RemotePort(port) + }), + map(preceded(tag_no_case(b"real_rip="), parameter_str), |ip| { + RealRemoteIp(ip) + }), + map(preceded(tag_no_case(b"real_lip="), parameter_str), |ip| { + RealLocalIp(ip) + }), + map(preceded(tag_no_case(b"real_lport="), u16), |port| { + RealLocalPort(port) + }), + map(preceded(tag_no_case(b"real_rport="), u16), |port| { + RealRemotePort(port) + }), )), alt(( - map(preceded(tag_no_case(b"local_name="), parameter_str), |name| LocalName(name)), - map(preceded(tag_no_case(b"forward_views="), parameter), |views| ForwardViews(views.into())), - map(preceded(tag_no_case(b"secured="), parameter_str), |info| Secured(Some(info))), + map( + preceded(tag_no_case(b"local_name="), parameter_str), + |name| LocalName(name), + ), + map( + preceded(tag_no_case(b"forward_views="), parameter), + |views| ForwardViews(views.into()), + ), + map(preceded(tag_no_case(b"secured="), parameter_str), |info| { + Secured(Some(info)) + }), value(Secured(None), tag_no_case(b"secured")), value(CertUsername, tag_no_case(b"cert_username")), - map(preceded(tag_no_case(b"transport="), parameter_str), |ts| Transport(ts)), - map(preceded(tag_no_case(b"tls_cipher="), parameter_str), |cipher| TlsCipher(cipher)), - map(preceded(tag_no_case(b"tls_cipher_bits="), parameter_str), |bits| TlsCipherBits(bits)), - map(preceded(tag_no_case(b"tls_pfs="), parameter_str), |pfs| TlsPfs(pfs)), - map(preceded(tag_no_case(b"tls_protocol="), parameter_str), |proto| TlsProtocol(proto)), - map(preceded(tag_no_case(b"valid-client-cert="), parameter_str), |cert| ValidClientCert(cert)), + map(preceded(tag_no_case(b"transport="), parameter_str), |ts| { + Transport(ts) + }), + map( + preceded(tag_no_case(b"tls_cipher="), parameter_str), + |cipher| TlsCipher(cipher), + ), + map( + preceded(tag_no_case(b"tls_cipher_bits="), parameter_str), + |bits| TlsCipherBits(bits), + ), + map(preceded(tag_no_case(b"tls_pfs="), parameter_str), |pfs| { + TlsPfs(pfs) + }), + map( + preceded(tag_no_case(b"tls_protocol="), parameter_str), + |proto| TlsProtocol(proto), + ), + map( + preceded(tag_no_case(b"valid-client-cert="), parameter_str), + |cert| ValidClientCert(cert), + ), )), alt(( map(preceded(tag_no_case(b"resp="), base64), |data| Resp(data)), - map(tuple((parameter_name, tag(b"="), parameter)), |(n, _, v)| UnknownPair(n, v.into())), + map( + tuple((parameter_name, tag(b"="), parameter)), + |(n, _, v)| UnknownPair(n, v.into()), + ), map(parameter, |v| UnknownBool(v.into())), )), ))(input) @@ -622,13 +696,20 @@ fn auth_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { mechanism, tab, service, - map( - opt(preceded(tab, separated_list0(tab, auth_option))), - |o| o.unwrap_or(vec![]) - ), + map(opt(preceded(tab, separated_list0(tab, auth_option))), |o| { + o.unwrap_or(vec![]) + }), )); let (input, (_, _, id, _, mech, _, service, options)) = parser(input)?; - Ok((input, ClientCommand::Auth { id, mech, service, options })) + Ok(( + input, + ClientCommand::Auth { + id, + mech, + service, + options, + }, + )) } fn is_base64_core(c: u8) -> bool { @@ -644,10 +725,7 @@ fn is_base64_pad(c: u8) -> bool { } fn base64(input: &[u8]) -> IResult<&[u8], Vec<u8>> { - let (input, (b64, _)) = tuple(( - take_while1(is_base64_core), - take_while(is_base64_pad), - ))(input)?; + let (input, (b64, _)) = tuple((take_while1(is_base64_core), take_while(is_base64_pad)))(input)?; let data = base64::engine::general_purpose::STANDARD_NO_PAD .decode(b64) @@ -657,26 +735,15 @@ fn base64(input: &[u8]) -> IResult<&[u8], Vec<u8>> { } /// @FIXME Dovecot does not say if base64 content must be padded or not -fn cont_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { - let mut parser = tuple(( - tag_no_case(b"CONT"), - tab, - u64, - tab, - base64 - )); +fn cont_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { + let mut parser = tuple((tag_no_case(b"CONT"), tab, u64, tab, base64)); let (input, (_, _, id, _, data)) = parser(input)?; Ok((input, ClientCommand::Cont { id, data })) } fn client_command<'a>(input: &'a [u8]) -> IResult<&'a [u8], ClientCommand> { - alt(( - version_command, - cpid_command, - auth_command, - cont_command, - ))(input) + alt((version_command, cpid_command, auth_command, cont_command))(input) } /* @@ -698,7 +765,13 @@ fn not_null(c: u8) -> bool { // impersonated user, login, password fn auth_plain<'a>(input: &'a [u8]) -> IResult<&'a [u8], (&'a [u8], &'a [u8], &'a [u8])> { map( - tuple((take_while(not_null), take(1usize), take_while(not_null), take(1usize), rest)), + tuple(( + take_while(not_null), + take(1usize), + take_while(not_null), + take(1usize), + rest, + )), |(imp, _, user, _, pass)| (imp, user, pass), )(input) } @@ -746,7 +819,6 @@ impl Encode for MechanismParameters { } } - impl Encode for FailCode { fn encode(&self, out: &mut BytesMut) -> Result<()> { match self { @@ -762,33 +834,32 @@ impl Encode for FailCode { impl Encode for ServerCommand { fn encode(&self, out: &mut BytesMut) -> Result<()> { match self { - Self::Version (Version { major, minor }) => { + Self::Version(Version { major, minor }) => { out.put(&b"VERSION"[..]); tab_enc(out); out.put(major.to_string().as_bytes()); tab_enc(out); out.put(minor.to_string().as_bytes()); lf_enc(out); - }, + } Self::Spid(pid) => { out.put(&b"SPID"[..]); tab_enc(out); out.put(pid.to_string().as_bytes()); lf_enc(out); - }, + } Self::Cuid(pid) => { out.put(&b"CUID"[..]); tab_enc(out); out.put(pid.to_string().as_bytes()); lf_enc(out); - }, + } Self::Cookie(cval) => { out.put(&b"COOKIE"[..]); tab_enc(out); - out.put(hex::encode(cval).as_bytes()); + out.put(hex::encode(cval).as_bytes()); lf_enc(out); - - }, + } Self::Mech { kind, parameters } => { out.put(&b"MECH"[..]); tab_enc(out); @@ -798,11 +869,11 @@ impl Encode for ServerCommand { p.encode(out)?; } lf_enc(out); - }, + } Self::Done => { out.put(&b"DONE"[..]); lf_enc(out); - }, + } Self::Cont { id, data } => { out.put(&b"CONT"[..]); tab_enc(out); @@ -813,8 +884,12 @@ impl Encode for ServerCommand { out.put(b64.as_bytes()); } lf_enc(out); - }, - Self::Ok { id, user_id, extra_parameters } => { + } + Self::Ok { + id, + user_id, + extra_parameters, + } => { out.put(&b"OK"[..]); tab_enc(out); out.put(id.to_string().as_bytes()); @@ -828,8 +903,13 @@ impl Encode for ServerCommand { out.put(&p[..]); } lf_enc(out); - }, - Self::Fail {id, user_id, code, extra_parameters } => { + } + Self::Fail { + id, + user_id, + code, + extra_parameters, + } => { out.put(&b"FAIL"[..]); tab_enc(out); out.put(id.to_string().as_bytes()); @@ -848,7 +928,7 @@ impl Encode for ServerCommand { out.put(&p[..]); } lf_enc(out); - }, + } } Ok(()) } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index b08a4ff..21b4d58 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -26,8 +26,8 @@ use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Respons use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; -use tokio_rustls::TlsAcceptor; use rustls_pemfile::{certs, private_key}; +use tokio_rustls::TlsAcceptor; use crate::config::{ImapConfig, ImapUnsecureConfig}; use crate::imap::capability::ServerCapability; @@ -53,8 +53,14 @@ struct ClientContext { } pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> { - let loaded_certs = certs(&mut std::io::BufReader::new(std::fs::File::open(config.certs)?)).collect::<Result<Vec<_>, _>>()?; - let loaded_key = private_key(&mut std::io::BufReader::new(std::fs::File::open(config.key)?))?.unwrap(); + let loaded_certs = certs(&mut std::io::BufReader::new(std::fs::File::open( + config.certs, + )?)) + .collect::<Result<Vec<_>, _>>()?; + let loaded_key = private_key(&mut std::io::BufReader::new(std::fs::File::open( + config.key, + )?))? + .unwrap(); let tls_config = rustls::ServerConfig::builder() .with_no_client_auth() @@ -109,7 +115,7 @@ impl Server { } }; AnyStream::new(stream) - }, + } None => AnyStream::new(socket), }; diff --git a/src/main.rs b/src/main.rs index 72bce83..12a5895 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,7 +34,12 @@ struct Args { #[clap(long)] dev: bool, - #[clap(short, long, env = "AEROGRAMME_CONFIG", default_value = "aerogramme.toml")] + #[clap( + short, + long, + env = "AEROGRAMME_CONFIG", + default_value = "aerogramme.toml" + )] /// Path to the main Aerogramme configuration file config_file: PathBuf, } @@ -187,7 +192,10 @@ async fn main() -> Result<()> { hostname: "example.tld".to_string(), }), auth: Some(AuthConfig { - bind_addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345), + bind_addr: SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 12345, + ), }), users: UserManagement::Demo, }) diff --git a/src/server.rs b/src/server.rs index cf9930a..9899981 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,9 +7,9 @@ use futures::try_join; use log::*; use tokio::sync::watch; +use crate::auth; use crate::config::*; use crate::imap; -use crate::auth; use crate::lmtp::*; use crate::login::ArcLoginProvider; use crate::login::{demo_provider::*, ldap_provider::*, static_provider::*}; @@ -47,9 +47,16 @@ impl Server { }; let lmtp_server = config.lmtp.map(|lmtp| LmtpServer::new(lmtp, login.clone())); - let imap_unsecure_server = config.imap_unsecure.map(|imap| imap::new_unsecure(imap, login.clone())); - let imap_server = config.imap.map(|imap| imap::new(imap, login.clone())).transpose()?; - let auth_server = config.auth.map(|auth| auth::AuthServer::new(auth, login.clone())); + let imap_unsecure_server = config + .imap_unsecure + .map(|imap| imap::new_unsecure(imap, login.clone())); + let imap_server = config + .imap + .map(|imap| imap::new(imap, login.clone())) + .transpose()?; + let auth_server = config + .auth + .map(|auth| auth::AuthServer::new(auth, login.clone())); Ok(Self { lmtp_server, diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 90b84d6..709e729 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -105,6 +105,7 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); let (pk_list, batch_op) = match select { Selector::Range { shard, @@ -196,21 +197,26 @@ impl IStore for GarageStore { } Ok(v) => v, }; + //println!("fetch res -> {:?}", all_raw_res); - let row_vals = all_raw_res - .into_iter() - .fold(vec![], |mut acc, v| { - acc.extend(v.items); - acc - }) - .into_iter() - .zip(pk_list.into_iter()) - .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) - .collect::<Vec<_>>(); + let row_vals = + all_raw_res + .into_iter() + .zip(pk_list.into_iter()) + .fold(vec![], |mut acc, (page, pk)| { + page.items + .into_iter() + .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) + .for_each(|rr| acc.push(rr)); + + acc + }); + tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch"); Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); let del_op = match select { Selector::Range { shard, @@ -280,6 +286,7 @@ impl IStore for GarageStore { } async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); let batch_ops = values .iter() .map(|v| k2v_client::BatchInsertOp { @@ -307,6 +314,7 @@ impl IStore for GarageStore { } } async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + tracing::trace!(entry=%value, command="row_poll"); loop { if let Some(ct) = &value.causality { match self @@ -343,6 +351,7 @@ impl IStore for GarageStore { } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); let maybe_out = self .s3 .get_object() @@ -382,6 +391,7 @@ impl IStore for GarageStore { Ok(bv) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let streamable_value = s3::primitives::ByteStream::from(blob_val.value); let maybe_send = self @@ -406,6 +416,7 @@ impl IStore for GarageStore { } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); let maybe_copy = self .s3 .copy_object() @@ -433,6 +444,7 @@ impl IStore for GarageStore { } } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + tracing::trace!(prefix = prefix, command = "blob_list"); let maybe_list = self .s3 .list_objects_v2() @@ -462,6 +474,7 @@ impl IStore for GarageStore { } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); let maybe_delete = self .s3 .delete_object() |