b0VIM 8.2 B�b�6� ��
quentin lheureduthe ~quentin/Documents/dev/deuxfleurs/mailrage/src/session.rs utf-8
3210 #"! U tp R U r ad � � R � � � � p P # � � � � � � j i 1 �
�
�
�
�
�
�
Q
O
N
9
� � j ,
� � � � ~ } ! �
�
�
}
%
� � n T ( � � � 4 & � � � � w = � � � � � � � � � � k i h R 1 0 � � � � � } n W G + � � z y Y ) � � � � X � � p J � CommandBody::List { } => cmd.lsub(reference, mailbox_wildcard).await, mailbox_wildcard, reference, CommandBody::Lsub { CommandBody::Login { username, password } => cmd.login(username, password).await, CommandBody::Capability => cmd.capability().await, let res = match msg.req.body { let mut cmd = command::Command::new(msg.req.tag, self); while let Some(msg) = self.rx.recv().await { tracing::debug!("starting runner"); //@FIXME add more info about the runner async fn start(&mut self) { // fn name(&self) -> String { } // to ease debug //@FIXME add a function that compute the runner's name from its local info } } user: None, selected: None, rx, mailstore, Self { fn new(mailstore: Arc<Mailstore>, rx: mpsc::Receiver<Message>) -> Self { impl Instance { } pub user pub lo pub login_provider: Arc<dyn LoginProvider + Send + Sync>, rx: mpsc::Receiver<Message>, pub struct Instance { } pub creds: Credentials, pub name: String, pub struct User { } } .boxed() } } } Response::bad("No response from the session handler") tracing::warn!("Got error {:#?}", e); Err(e) => { Ok(r) => r, match rx.await { async { // @FIXME add a timeout, handle a session that fails. }; } return async { Response::bad("The session task has exited") }.boxed() Err(TrySendError::Closed(_)) => { } return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed() Err(TrySendError::Full(_)) => { Ok(()) => (), match self.tx.try_send(msg) { // will probably be malicious so we "rate limit" them. // and in a limited number (like 3 requests). Someone filling the channel // Pipelining requests in IMAP are a special case: they should not occure often // We use try_send on a bounded channel to protect the daemons from DoS. let msg = Message { req, tx }; let (tx, rx) = oneshot::channel(); pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> { } Self { tx } }); instance.start().await; let mut instance = Instance::new(login_provider, rx); tokio::spawn(async move { let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self { impl Manager { //@FIXME we should garbage collect the Instance when the Manager is destroyed. } tx: mpsc::Sender<Message>, pub struct Manager { } tx: oneshot::Sender<Result<Response, BalError>>, req: Request, struct Message { const MAX_PIPELINED_COMMANDS: usize = 10; */ * before refusing them * or more specifically, how many pipelined messages are allowed /* This constant configures backpressure in the system, use crate::LoginProvider; use crate::mailbox::Mailbox; use crate::login::Credentials; use crate::command; use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc::error::TrySendError; use imap_codec::types::command::CommandBody; use futures::future::FutureExt; use futures::future::BoxFuture; use boitalettres::proto::{Request, Response}; use boitalettres::errors::Error as BalError; use std::sync::Arc; ad e
� � y + � � � f �
�
�
�
b
E
� � � � K
� o e d 4 } } tracing::debug!("exiting runner"); //@FIXME add more info about the runner } }); tracing::warn!("failed to send imap response to manager: {:#?}", e) msg.tx.send(wrapped_res).unwrap_or_else(|e| { // and then abort the session as it is corrupted. //@FIXME I think we should quit this thread on error and having our manager watch it, }); } Response::bad("Internal error") tracing::warn!(error=%ae, "internal.error"); Err(ae) => { Ok(be) => Err(be), let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() { }; .map_err(anyhow::Error::new), _ => Response::bad("Error in IMAP command received by server.") } => cmd.fetch(sequence_set, attributes, uid).await, uid, attributes, sequence_set, CommandBody::Fetch { CommandBody::Select { mailbox } => cmd.select(mailbox).await, } => cmd.list(reference, mailbox_wildcard).await, mailbox_wildcard, reference, ad � � � � � G 8 � � � � � x c ? > �
�
�
�
H
� x T 5 � � CommandBody::List { } => cmd.lsub(reference, mailbox_wildcard).await, mailbox_wildcard, reference, CommandBody::Lsub { CommandBody::Login { username, password } => cmd.login(username, password).await, CommandBody::Capability => cmd.capability().await, let res = match msg.req.body { let mut cmd = command::Command::new(msg.req.tag, self); while let Some(msg) = self.rx.recv().await { tracing::debug!("starting runner"); //@FIXME add more info about the runner async fn start(&mut self) { // fn name(&self) -> String { } // to ease debug //@FIXME add a function that compute the runner's name from its local info } } user: None, selected: None, rx, login_provider, Self { fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>, rx: mpsc::Receiver<Message>) -> Self { impl Instance { } pub user: Option<User>, pub selected: Option<Mailbox>,