aboutsummaryrefslogtreecommitdiff
path: root/src/session.rs
blob: d689e72ca1f56596222fd46676cba401ae712e43 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::sync::Arc;

use boitalettres::proto::{Request, Response};
use boitalettres::errors::Error as BalError;
use imap_codec::types::command::CommandBody;
use tokio::sync::{oneshot,mpsc};
use tokio::sync::mpsc::error::TrySendError;
use futures::future::BoxFuture;
use futures::future::FutureExt;

use crate::command;
use crate::login::Credentials;
use crate::mailstore::Mailstore;
use crate::mailbox::Mailbox;

/* This constant configures backpressure in the system,
 * or more specifically, how many pipelined messages are allowed
 * before refusing them 
 */
const MAX_PIPELINED_COMMANDS: usize = 10;

struct Message {
    req: Request,
    tx: oneshot::Sender<Result<Response, BalError>>,
}

pub struct Manager {
    tx: mpsc::Sender<Message>,
}

//@FIXME we should garbage collect the Instance when the Manager is destroyed.
impl Manager {
    pub fn new(mailstore: Arc<Mailstore>) -> Self {
        let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
        tokio::spawn(async move { 
            let mut instance = Instance::new(mailstore, rx);
            instance.start().await;
        });
        Self { tx }
    }

    pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
        let (tx, rx) = oneshot::channel();
        let msg = Message { req, tx };

        // We use try_send on a bounded channel to protect the daemons from DoS.
        // Pipelining requests in IMAP are a special case: they should not occure often
        // and in a limited number (like 3 requests). Someone filling the channel
        // will probably be malicious so we "rate limit" them.
        match self.tx.try_send(msg) {
            Ok(()) => (),
            Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(),
            Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(),
        };

        // @FIXME add a timeout, handle a session that fails.
        async {
            match rx.await {
                Ok(r) => r,
                Err(e) => {
                    tracing::warn!("Got error {:#?}", e);
                    Response::bad("No response from the session handler")
                },
            }
        }.boxed()
    }
}

pub struct User {
    pub name: String,
    pub creds: Credentials,
}

pub struct Instance {
    rx: mpsc::Receiver<Message>,

    pub mailstore: Arc<Mailstore>, 
    pub selected: Option<Mailbox>,
    pub user: Option<User>,
}
impl Instance {
    fn new(mailstore: Arc<Mailstore>, rx: mpsc::Receiver<Message>) -> Self {
        Self { mailstore, rx, selected: None, user: None, }
    }

    //@FIXME add a function that compute the runner's name from its local info
    // to ease debug
    // fn name(&self) -> String { }

    async fn start(&mut self) {
        //@FIXME add more info about the runner
        tracing::debug!("starting runner");

        while let Some(msg) = self.rx.recv().await {
            let mut cmd = command::Command::new(msg.req.tag, self);
            let res = match msg.req.body {
                CommandBody::Capability => cmd.capability().await,
                CommandBody::Login { username, password } => cmd.login(username, password).await,
                CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await,
                CommandBody::List { reference, mailbox_wildcard } => cmd.list(reference, mailbox_wildcard).await,
                CommandBody::Select { mailbox } => cmd.select(mailbox).await,
                CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await,
                _ => Response::bad("Error in IMAP command received by server.").map_err(anyhow::Error::new),
            };

            let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() {
                Ok(be) => Err(be),
                Err(ae) => {
                    tracing::warn!(error=%ae, "internal.error");
                    Response::bad("Internal error")
                }
            });

            //@FIXME I think we should quit this thread on error and having our manager watch it,
            // and then abort the session as it is corrupted.
            msg.tx.send(wrapped_res).unwrap_or_else(|e| tracing::warn!("failed to send imap response to manager: {:#?}", e));
        }

        //@FIXME add more info about the runner
        tracing::debug!("exiting runner");
    }
}