aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-13 11:19:08 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-13 11:19:08 +0200
commitfaca15f164c8e2860d27144f75f2dee05742ec6d (patch)
treee81ef9a4360e20685b0f8b2e14ee024119b69fd7
parenta1ca6d9defc844fee52d966951701a57727050c7 (diff)
downloadaerogramme-faca15f164c8e2860d27144f75f2dee05742ec6d.tar.gz
aerogramme-faca15f164c8e2860d27144f75f2dee05742ec6d.zip
LMTP refactoring, implement EXPUNGE
-rw-r--r--src/imap/mailbox_view.rs16
-rw-r--r--src/lmtp.rs73
-rw-r--r--src/mail/uidindex.rs11
-rw-r--r--src/server.rs2
4 files changed, 63 insertions, 39 deletions
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs
index f293bfa..371bc50 100644
--- a/src/imap/mailbox_view.rs
+++ b/src/imap/mailbox_view.rs
@@ -160,7 +160,7 @@ impl MailboxView {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let mails = self.get_mail_ids(sequence_set)?;
- for (i, uid, uuid) in mails.iter() {
+ for (_i, _uid, uuid) in mails.iter() {
match kind {
StoreType::Add => {
self.mailbox.add_flags(*uuid, &flags[..]).await?;
@@ -178,7 +178,19 @@ impl MailboxView {
}
pub async fn expunge(&mut self) -> Result<Vec<Body>> {
- unimplemented!()
+ let deleted_flag = Flag::Deleted.to_string();
+ let msgs = self
+ .known_state
+ .table
+ .iter()
+ .filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag))
+ .map(|(uuid, _)| *uuid);
+
+ for msg in msgs {
+ self.mailbox.delete(msg).await?;
+ }
+
+ self.update().await
}
/// Looks up state changes in the mailbox and produces a set of IMAP
diff --git a/src/lmtp.rs b/src/lmtp.rs
index 28194e3..f88ed8f 100644
--- a/src/lmtp.rs
+++ b/src/lmtp.rs
@@ -5,7 +5,11 @@ use anyhow::Result;
use async_trait::async_trait;
use duplexify::Duplex;
use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite};
-use futures::{stream, stream::FuturesUnordered, StreamExt};
+use futures::{
+ stream,
+ stream::{FuturesOrdered, FuturesUnordered},
+ StreamExt,
+};
use log::*;
use tokio::net::TcpListener;
use tokio::select;
@@ -56,11 +60,12 @@ impl LmtpServer {
_ = wait_conn_finished => continue,
_ = must_exit.changed() => continue,
};
+ info!("LMTP: accepted connection from {}", remote_addr);
let conn = tokio::spawn(smtp_server::interact(
socket.compat(),
smtp_server::IsAlreadyTls::No,
- Conn { remote_addr },
+ (),
self.clone(),
));
@@ -77,10 +82,6 @@ impl LmtpServer {
// ----
-pub struct Conn {
- remote_addr: SocketAddr,
-}
-
pub struct Message {
to: Vec<PublicCredentials>,
}
@@ -89,21 +90,21 @@ pub struct Message {
impl Config for LmtpServer {
type Protocol = smtp_server::protocol::Lmtp;
- type ConnectionUserMeta = Conn;
+ type ConnectionUserMeta = ();
type MailUserMeta = Message;
- fn hostname(&self, _conn_meta: &ConnectionMetadata<Conn>) -> &str {
+ fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str {
&self.hostname
}
- async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<Conn>) -> Message {
+ async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) -> Message {
Message { to: vec![] }
}
async fn tls_accept<IO>(
&self,
_io: IO,
- _conn_meta: &mut ConnectionMetadata<Conn>,
+ _conn_meta: &mut ConnectionMetadata<()>,
) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>>
where
IO: Send + AsyncRead + AsyncWrite,
@@ -118,7 +119,7 @@ impl Config for LmtpServer {
&self,
from: Option<Email>,
_meta: &mut MailMetadata<Message>,
- _conn_meta: &mut ConnectionMetadata<Conn>,
+ _conn_meta: &mut ConnectionMetadata<()>,
) -> Decision<Option<Email>> {
Decision::Accept {
reply: reply::okay_from().convert(),
@@ -130,7 +131,7 @@ impl Config for LmtpServer {
&self,
to: Email,
meta: &mut MailMetadata<Message>,
- _conn_meta: &mut ConnectionMetadata<Conn>,
+ _conn_meta: &mut ConnectionMetadata<()>,
) -> Decision<Email> {
let to_str = match to.hostname.as_ref() {
Some(h) => format!("{}@{}", to.localpart, h),
@@ -158,7 +159,7 @@ impl Config for LmtpServer {
&'resp self,
reader: &mut EscapedDataReader<'_, R>,
meta: MailMetadata<Message>,
- _conn_meta: &'resp mut ConnectionMetadata<Conn>,
+ _conn_meta: &'resp mut ConnectionMetadata<()>,
) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'resp>>
where
R: Send + Unpin + AsyncRead,
@@ -176,8 +177,8 @@ impl Config for LmtpServer {
};
let mut text = Vec::new();
- if reader.read_to_end(&mut text).await.is_err() {
- return err_response_stream(meta, "io error".into());
+ if let Err(e) = reader.read_to_end(&mut text).await {
+ return err_response_stream(meta, format!("io error: {}", e));
}
reader.complete();
@@ -186,23 +187,29 @@ impl Config for LmtpServer {
Err(e) => return err_response_stream(meta, e.to_string()),
};
- Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| {
- let encrypted_message = encrypted_message.clone();
- async move {
- match encrypted_message.deliver_to(creds).await {
- Ok(()) => Decision::Accept {
- reply: reply::okay_mail().convert(),
- res: (),
- },
- Err(e) => Decision::Reject {
- reply: Reply {
- code: ReplyCode::POLICY_REASON,
- ecode: None,
- text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
- },
- },
- }
- }
- }))
+ Box::pin(
+ meta.user
+ .to
+ .into_iter()
+ .map(move |creds| {
+ let encrypted_message = encrypted_message.clone();
+ async move {
+ match encrypted_message.deliver_to(creds).await {
+ Ok(()) => Decision::Accept {
+ reply: reply::okay_mail().convert(),
+ res: (),
+ },
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+ })
+ .collect::<FuturesOrdered<_>>(),
+ )
}
}
diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs
index 3a3e252..43d6507 100644
--- a/src/mail/uidindex.rs
+++ b/src/mail/uidindex.rs
@@ -214,9 +214,14 @@ impl FlagIndex {
});
}
fn remove(&mut self, uid: ImapUid, flags: &Vec<Flag>) -> () {
- flags.iter().for_each(|flag| {
- self.0.get_mut(flag).and_then(|set| set.remove(&uid));
- });
+ for flag in flags.iter() {
+ if let Some(set) = self.0.get_mut(flag) {
+ set.remove(&uid);
+ if set.is_empty() {
+ self.0.remove(flag);
+ }
+ }
+ }
}
pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> {
diff --git a/src/server.rs b/src/server.rs
index 55fa5ba..f0eb35f 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use anyhow::{bail, Result};
-use futures::{try_join, StreamExt};
+use futures::try_join;
use log::*;
use tokio::sync::watch;