diff options
Diffstat (limited to 'src/session.rs')
-rw-r--r-- | src/session.rs | 61 |
1 files changed, 42 insertions, 19 deletions
diff --git a/src/session.rs b/src/session.rs index d689e72..8ad44dd 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,21 +1,21 @@ use std::sync::Arc; -use boitalettres::proto::{Request, Response}; use boitalettres::errors::Error as BalError; -use imap_codec::types::command::CommandBody; -use tokio::sync::{oneshot,mpsc}; -use tokio::sync::mpsc::error::TrySendError; +use boitalettres::proto::{Request, Response}; use futures::future::BoxFuture; use futures::future::FutureExt; +use imap_codec::types::command::CommandBody; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, oneshot}; use crate::command; use crate::login::Credentials; -use crate::mailstore::Mailstore; use crate::mailbox::Mailbox; +use crate::mailstore::Mailstore; /* This constant configures backpressure in the system, * or more specifically, how many pipelined messages are allowed - * before refusing them + * before refusing them */ const MAX_PIPELINED_COMMANDS: usize = 10; @@ -31,8 +31,8 @@ pub struct Manager { //@FIXME we should garbage collect the Instance when the Manager is destroyed. impl Manager { pub fn new(mailstore: Arc<Mailstore>) -> Self { - let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); - tokio::spawn(async move { + let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); + tokio::spawn(async move { let mut instance = Instance::new(mailstore, rx); instance.start().await; }); @@ -49,8 +49,12 @@ impl Manager { // will probably be malicious so we "rate limit" them. match self.tx.try_send(msg) { Ok(()) => (), - Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(), - Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(), + Err(TrySendError::Full(_)) => { + return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed() + } + Err(TrySendError::Closed(_)) => { + return async { Response::bad("The session task has exited") }.boxed() + } }; // @FIXME add a timeout, handle a session that fails. @@ -60,9 +64,10 @@ impl Manager { Err(e) => { tracing::warn!("Got error {:#?}", e); Response::bad("No response from the session handler") - }, + } } - }.boxed() + } + .boxed() } } @@ -74,13 +79,18 @@ pub struct User { pub struct Instance { rx: mpsc::Receiver<Message>, - pub mailstore: Arc<Mailstore>, + pub mailstore: Arc<Mailstore>, pub selected: Option<Mailbox>, pub user: Option<User>, } impl Instance { fn new(mailstore: Arc<Mailstore>, rx: mpsc::Receiver<Message>) -> Self { - Self { mailstore, rx, selected: None, user: None, } + Self { + mailstore, + rx, + selected: None, + user: None, + } } //@FIXME add a function that compute the runner's name from its local info @@ -96,11 +106,22 @@ impl Instance { let res = match msg.req.body { CommandBody::Capability => cmd.capability().await, CommandBody::Login { username, password } => cmd.login(username, password).await, - CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await, - CommandBody::List { reference, mailbox_wildcard } => cmd.list(reference, mailbox_wildcard).await, + CommandBody::Lsub { + reference, + mailbox_wildcard, + } => cmd.lsub(reference, mailbox_wildcard).await, + CommandBody::List { + reference, + mailbox_wildcard, + } => cmd.list(reference, mailbox_wildcard).await, CommandBody::Select { mailbox } => cmd.select(mailbox).await, - CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await, - _ => Response::bad("Error in IMAP command received by server.").map_err(anyhow::Error::new), + CommandBody::Fetch { + sequence_set, + attributes, + uid, + } => cmd.fetch(sequence_set, attributes, uid).await, + _ => Response::bad("Error in IMAP command received by server.") + .map_err(anyhow::Error::new), }; let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() { @@ -113,7 +134,9 @@ impl Instance { //@FIXME I think we should quit this thread on error and having our manager watch it, // and then abort the session as it is corrupted. - msg.tx.send(wrapped_res).unwrap_or_else(|e| tracing::warn!("failed to send imap response to manager: {:#?}", e)); + msg.tx.send(wrapped_res).unwrap_or_else(|e| { + tracing::warn!("failed to send imap response to manager: {:#?}", e) + }); } //@FIXME add more info about the runner |