diff options
Diffstat (limited to 'src/service.rs')
-rw-r--r-- | src/service.rs | 56 |
1 files changed, 6 insertions, 50 deletions
diff --git a/src/service.rs b/src/service.rs index 051e065..35c753b 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::Result; @@ -7,17 +7,10 @@ use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; use futures::future::BoxFuture; use futures::future::FutureExt; -use imap_codec::types::command::CommandBody; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; use tower::Service; -use crate::command; -use crate::login::Credentials; use crate::mailstore::Mailstore; -use crate::mailbox::Mailbox; - -const MAX_PIPELINED_COMMANDS: usize = 10; +use crate::session; pub struct Instance { pub mailstore: Arc<Mailstore>, @@ -39,21 +32,16 @@ impl<'a> Service<&'a AddrStream> for Instance { fn call(&mut self, addr: &'a AddrStream) -> Self::Future { tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); let ms = self.mailstore.clone(); - Box::pin(async { Ok(Connection::new(ms)) }) + async { Ok(Connection::new(ms)) }.boxed() } } pub struct Connection { - pub tx: mpsc::Sender<Request>, + session: session::Manager, } impl Connection { pub fn new(mailstore: Arc<Mailstore>) -> Self { - let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); - tokio::spawn(async move { - let mut session = Session::new(mailstore, rx); - session.run().await; - }); - Self { tx } + Self { session: session::Manager::new(mailstore) } } } impl Service<Request> for Connection { @@ -67,41 +55,9 @@ impl Service<Request> for Connection { fn call(&mut self, req: Request) -> Self::Future { tracing::debug!("Got request: {:#?}", req); - match self.tx.try_send(req) { - Ok(()) => return async { Response::ok("Ok") }.boxed(), - Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(), - Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(), - } - - // send a future that await here later a oneshot command + self.session.process(req) } } -pub struct Session { - pub mailstore: Arc<Mailstore>, - pub creds: Option<Credentials>, - pub selected: Option<Mailbox>, - rx: mpsc::Receiver<Request>, -} - -impl Session { - pub fn new(mailstore: Arc<Mailstore>, rx: mpsc::Receiver<Request>) -> Self { - Self { mailstore, rx, creds: None, selected: None, } - } - pub async fn run(&mut self) { - while let Some(req) = self.rx.recv().await { - let mut cmd = command::Command::new(req.tag, self); - let _ = match req.body { - CommandBody::Capability => cmd.capability().await, - CommandBody::Login { username, password } => cmd.login(username, password).await, - CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await, - CommandBody::List { reference, mailbox_wildcard } => cmd.list(reference, mailbox_wildcard).await, - CommandBody::Select { mailbox } => cmd.select(mailbox).await, - CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await, - _ => Response::bad("Error in IMAP command received by server."), - }; - } - } -} |