aboutsummaryrefslogtreecommitdiff
path: root/src/session.rs
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2022-06-17 18:39:36 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2022-06-17 18:39:36 +0200
commit5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e (patch)
tree312708a97fcfcd273481f41b8bd6c878030573f4 /src/session.rs
parent41f1b02171cee36706d30cf24329ff12780d47fd (diff)
downloadaerogramme-5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e.tar.gz
aerogramme-5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e.zip
WIP Refactor, code is broken
Diffstat (limited to 'src/session.rs')
-rw-r--r--src/session.rs166
1 files changed, 0 insertions, 166 deletions
diff --git a/src/session.rs b/src/session.rs
deleted file mode 100644
index c74d3cd..0000000
--- a/src/session.rs
+++ /dev/null
@@ -1,166 +0,0 @@
-use std::sync::Arc;
-
-use anyhow::Error;
-use boitalettres::errors::Error as BalError;
-use boitalettres::proto::{Request, Response};
-use futures::future::BoxFuture;
-use futures::future::FutureExt;
-use imap_codec::types::command::CommandBody;
-use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
-use tokio::sync::mpsc::error::TrySendError;
-use tokio::sync::{mpsc, oneshot};
-
-use crate::command;
-use crate::login::Credentials;
-use crate::mailbox::Mailbox;
-use crate::LoginProvider;
-
-/* This constant configures backpressure in the system,
- * or more specifically, how many pipelined messages are allowed
- * before refusing them
- */
-const MAX_PIPELINED_COMMANDS: usize = 10;
-
-struct Message {
- req: Request,
- tx: oneshot::Sender<Result<Response, BalError>>,
-}
-
-pub struct Manager {
- tx: mpsc::Sender<Message>,
-}
-
-//@FIXME we should garbage collect the Instance when the Manager is destroyed.
-impl Manager {
- pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
- let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
- tokio::spawn(async move {
- let mut instance = Instance::new(login_provider, rx);
- instance.start().await;
- });
- Self { tx }
- }
-
- pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
- let (tx, rx) = oneshot::channel();
- let tag = req.tag.clone();
- let msg = Message { req, tx };
-
- // We use try_send on a bounded channel to protect the daemons from DoS.
- // Pipelining requests in IMAP are a special case: they should not occure often
- // and in a limited number (like 3 requests). Someone filling the channel
- // will probably be malicious so we "rate limit" them.
- match self.tx.try_send(msg) {
- Ok(()) => (),
- Err(TrySendError::Full(_)) => {
- return async {
- Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!")
- .map(|s| vec![ImapRes::Status(s)])
- .map_err(|e| BalError::Text(e.to_string()))
- }
- .boxed()
- }
- Err(TrySendError::Closed(_)) => {
- return async {
- Status::bad(Some(tag), None, "The session task has exited")
- .map(|s| vec![ImapRes::Status(s)])
- .map_err(|e| BalError::Text(e.to_string()))
- }
- .boxed()
- }
- };
-
- // @FIXME add a timeout, handle a session that fails.
- async {
- match rx.await {
- Ok(r) => r,
- Err(e) => {
- tracing::warn!("Got error {:#?}", e);
- Status::bad(Some(tag), None, "No response from the session handler")
- .map(|s| vec![ImapRes::Status(s)])
- .map_err(|e| BalError::Text(e.to_string()))
- }
- }
- }
- .boxed()
- }
-}
-
-pub struct User {
- pub name: String,
- pub creds: Credentials,
-}
-
-pub struct Instance {
- rx: mpsc::Receiver<Message>,
-
- pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
- pub selected: Option<Mailbox>,
- pub user: Option<User>,
-}
-impl Instance {
- fn new(
- login_provider: Arc<dyn LoginProvider + Send + Sync>,
- rx: mpsc::Receiver<Message>,
- ) -> Self {
- Self {
- login_provider,
- rx,
- selected: None,
- user: None,
- }
- }
-
- //@FIXME add a function that compute the runner's name from its local info
- // to ease debug
- // fn name(&self) -> String { }
-
- async fn start(&mut self) {
- //@FIXME add more info about the runner
- tracing::debug!("starting runner");
-
- while let Some(msg) = self.rx.recv().await {
- let mut cmd = command::Command::new(msg.req.tag.clone(), self);
- let res = match msg.req.body {
- CommandBody::Capability => cmd.capability().await,
- CommandBody::Login { username, password } => cmd.login(username, password).await,
- CommandBody::Lsub {
- reference,
- mailbox_wildcard,
- } => cmd.lsub(reference, mailbox_wildcard).await,
- CommandBody::List {
- reference,
- mailbox_wildcard,
- } => cmd.list(reference, mailbox_wildcard).await,
- CommandBody::Select { mailbox } => cmd.select(mailbox).await,
- CommandBody::Fetch {
- sequence_set,
- attributes,
- uid,
- } => cmd.fetch(sequence_set, attributes, uid).await,
- _ => Status::bad(Some(msg.req.tag.clone()), None, "Unknown command")
- .map(|s| vec![ImapRes::Status(s)])
- .map_err(Error::msg),
- };
-
- let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() {
- Ok(be) => Err(be),
- Err(ae) => {
- tracing::warn!(error=%ae, "internal.error");
- Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
- .map(|s| vec![ImapRes::Status(s)])
- .map_err(|e| BalError::Text(e.to_string()))
- }
- });
-
- //@FIXME I think we should quit this thread on error and having our manager watch it,
- // and then abort the session as it is corrupted.
- msg.tx.send(wrapped_res).unwrap_or_else(|e| {
- tracing::warn!("failed to send imap response to manager: {:#?}", e)
- });
- }
-
- //@FIXME add more info about the runner
- tracing::debug!("exiting runner");
- }
-}