From 1e4d5eb8f1fe1d7abe726572b9ee063d91f72a98 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 9 Jun 2022 10:43:38 +0200 Subject: Compiling with a MPSC channel --- src/command.rs | 34 +++++++++++----------------------- src/service.rs | 53 ++++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 36 deletions(-) diff --git a/src/command.rs b/src/command.rs index 9b2f12d..4c2ec41 100644 --- a/src/command.rs +++ b/src/command.rs @@ -12,15 +12,14 @@ use crate::mailstore::Mailstore; use crate::mailbox::Mailbox; use crate::service::Session; -pub struct Command { +pub struct Command<'a> { tag: Tag, - mailstore: Arc, - session: Arc>, + session: &'a mut Session, } -impl Command { - pub fn new(tag: Tag, mailstore: Arc, session: Arc>) -> Self { - Self { tag, mailstore, session } +impl<'a> Command<'a> { + pub fn new(tag: Tag, session: &'a mut Session) -> Self { + Self { tag, session } } pub async fn capability(&self) -> Result { @@ -31,24 +30,19 @@ impl Command { Ok(r) } - pub async fn login(&self, username: AString, password: AString) -> Result { + pub async fn login(&mut self, username: AString, password: AString) -> Result { let (u, p) = match (String::try_from(username), String::try_from(password)) { (Ok(u), Ok(p)) => (u, p), _ => return Response::bad("Invalid characters"), }; tracing::debug!(user = %u, "command.login"); - let creds = match self.mailstore.login_provider.login(&u, &p).await { + let creds = match self.session.mailstore.login_provider.login(&u, &p).await { Err(_) => return Response::no("[AUTHENTICATIONFAILED] Authentication failed."), Ok(c) => c, }; - let mut session = match self.session.lock() { - Err(_) => return Response::bad("[AUTHENTICATIONFAILED] Unable to acquire lock on session."), - Ok(s) => s, - }; - session.creds = Some(creds); - drop(session); + self.session.creds = Some(creds); Response::ok("Logged in") } @@ -61,16 +55,10 @@ impl Command { Response::bad("Not implemented") } - pub async fn select(&self, mailbox: MailboxCodec) -> Result { - - let mut session = match self.session.lock() { - Err(_) => return Response::no("[SELECTFAILED] Unable to acquire lock on session."), - Ok(s) => s, - }; + pub async fn select(&mut self, mailbox: MailboxCodec) -> Result { - let mb = Mailbox::new(session.creds.as_ref().unwrap(), "TestMailbox".to_string()).unwrap(); - session.selected = Some(mb); - drop(session); + let mb = Mailbox::new(self.session.creds.as_ref().unwrap(), "TestMailbox".to_string()).unwrap(); + self.session.selected = Some(mb); Response::bad("Not implemented") } diff --git a/src/service.rs b/src/service.rs index a2fca4d..051e065 100644 --- a/src/service.rs +++ b/src/service.rs @@ -6,7 +6,10 @@ use boitalettres::server::accept::addr::AddrStream; use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; use futures::future::BoxFuture; +use futures::future::FutureExt; use imap_codec::types::command::CommandBody; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tower::Service; use crate::command; @@ -14,6 +17,8 @@ use crate::login::Credentials; use crate::mailstore::Mailstore; use crate::mailbox::Mailbox; +const MAX_PIPELINED_COMMANDS: usize = 10; + pub struct Instance { pub mailstore: Arc, } @@ -38,18 +43,17 @@ impl<'a> Service<&'a AddrStream> for Instance { } } -pub struct Session { - pub creds: Option, - pub selected: Option, -} - pub struct Connection { - pub mailstore: Arc, - pub session: Arc>, + pub tx: mpsc::Sender, } impl Connection { pub fn new(mailstore: Arc) -> Self { - Self { mailstore, session: Arc::new(Mutex::new(Session { creds: None, selected: None, })) } + let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); + tokio::spawn(async move { + let mut session = Session::new(mailstore, rx); + session.run().await; + }); + Self { tx } } } impl Service for Connection { @@ -63,9 +67,32 @@ impl Service for Connection { fn call(&mut self, req: Request) -> Self::Future { tracing::debug!("Got request: {:#?}", req); - let cmd = command::Command::new(req.tag, self.mailstore.clone(), self.session.clone()); - Box::pin(async move { - match req.body { + match self.tx.try_send(req) { + Ok(()) => return async { Response::ok("Ok") }.boxed(), + Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(), + Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(), + } + + // send a future that await here later a oneshot command + } +} + +pub struct Session { + pub mailstore: Arc, + pub creds: Option, + pub selected: Option, + rx: mpsc::Receiver, +} + +impl Session { + pub fn new(mailstore: Arc, rx: mpsc::Receiver) -> Self { + Self { mailstore, rx, creds: None, selected: None, } + } + + pub async fn run(&mut self) { + while let Some(req) = self.rx.recv().await { + let mut cmd = command::Command::new(req.tag, self); + let _ = match req.body { CommandBody::Capability => cmd.capability().await, CommandBody::Login { username, password } => cmd.login(username, password).await, CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await, @@ -73,8 +100,8 @@ impl Service for Connection { CommandBody::Select { mailbox } => cmd.select(mailbox).await, CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await, _ => Response::bad("Error in IMAP command received by server."), - } - }) + }; + } } } -- cgit v1.2.3