aboutsummaryrefslogtreecommitdiff
path: root/src/session.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.rs')
-rw-r--r--src/session.rs32
1 files changed, 25 insertions, 7 deletions
diff --git a/src/session.rs b/src/session.rs
index a709986..c74d3cd 100644
--- a/src/session.rs
+++ b/src/session.rs
@@ -1,10 +1,12 @@
use std::sync::Arc;
+use anyhow::Error;
use boitalettres::errors::Error as BalError;
use boitalettres::proto::{Request, Response};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use imap_codec::types::command::CommandBody;
+use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
@@ -41,6 +43,7 @@ impl Manager {
pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
let (tx, rx) = oneshot::channel();
+ let tag = req.tag.clone();
let msg = Message { req, tx };
// We use try_send on a bounded channel to protect the daemons from DoS.
@@ -50,10 +53,20 @@ impl Manager {
match self.tx.try_send(msg) {
Ok(()) => (),
Err(TrySendError::Full(_)) => {
- return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed()
+ return async {
+ Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
+ }
+ .boxed()
}
Err(TrySendError::Closed(_)) => {
- return async { Response::bad("The session task has exited") }.boxed()
+ return async {
+ Status::bad(Some(tag), None, "The session task has exited")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
+ }
+ .boxed()
}
};
@@ -63,7 +76,9 @@ impl Manager {
Ok(r) => r,
Err(e) => {
tracing::warn!("Got error {:#?}", e);
- Response::bad("No response from the session handler")
+ Status::bad(Some(tag), None, "No response from the session handler")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
}
}
}
@@ -105,7 +120,7 @@ impl Instance {
tracing::debug!("starting runner");
while let Some(msg) = self.rx.recv().await {
- let mut cmd = command::Command::new(msg.req.tag, self);
+ let mut cmd = command::Command::new(msg.req.tag.clone(), self);
let res = match msg.req.body {
CommandBody::Capability => cmd.capability().await,
CommandBody::Login { username, password } => cmd.login(username, password).await,
@@ -123,15 +138,18 @@ impl Instance {
attributes,
uid,
} => cmd.fetch(sequence_set, attributes, uid).await,
- _ => Response::bad("Error in IMAP command received by server.")
- .map_err(anyhow::Error::new),
+ _ => Status::bad(Some(msg.req.tag.clone()), None, "Unknown command")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
};
let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() {
Ok(be) => Err(be),
Err(ae) => {
tracing::warn!(error=%ae, "internal.error");
- Response::bad("Internal error")
+ Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
}
});