aboutsummaryrefslogtreecommitdiff
path: root/src/imap/session.rs
blob: 15141d30fab69e3f3237502654606ac85c0f179c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use anyhow::Error;
use boitalettres::errors::Error as BalError;
use boitalettres::proto::{Request, Response};
use futures::future::BoxFuture;
use futures::future::FutureExt;

use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};

use crate::imap::command::{anonymous, authenticated, examined, selected};
use crate::imap::flow;
use crate::login::ArcLoginProvider;

/* This constant configures backpressure in the system,
 * or more specifically, how many pipelined messages are allowed
 * before refusing them
 */
const MAX_PIPELINED_COMMANDS: usize = 10;

struct Message {
    req: Request,
    tx: oneshot::Sender<Result<Response, BalError>>,
}

//-----

pub struct Manager {
    tx: mpsc::Sender<Message>,
}

impl Manager {
    pub fn new(login_provider: ArcLoginProvider) -> Self {
        let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
        tokio::spawn(async move {
            let instance = Instance::new(login_provider, rx);
            instance.start().await;
        });
        Self { tx }
    }

    pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
        let (tx, rx) = oneshot::channel();
        let msg = Message { req, tx };

        // We use try_send on a bounded channel to protect the daemons from DoS.
        // Pipelining requests in IMAP are a special case: they should not occure often
        // and in a limited number (like 3 requests). Someone filling the channel
        // will probably be malicious so we "rate limit" them.
        match self.tx.try_send(msg) {
            Ok(()) => (),
            Err(TrySendError::Full(_)) => {
                return async { Response::bad("Too fast! Send less pipelined requests.") }.boxed()
            }
            Err(TrySendError::Closed(_)) => {
                return async { Err(BalError::Text("Terminated session".to_string())) }.boxed()
            }
        };

        // @FIXME add a timeout, handle a session that fails.
        async {
            match rx.await {
                Ok(r) => r,
                Err(e) => {
                    tracing::warn!("Got error {:#?}", e);
                    Response::bad("No response from the session handler")
                }
            }
        }
        .boxed()
    }
}

//-----

pub struct Instance {
    rx: mpsc::Receiver<Message>,

    pub login_provider: ArcLoginProvider,
    pub state: flow::State,
}
impl Instance {
    fn new(login_provider: ArcLoginProvider, rx: mpsc::Receiver<Message>) -> Self {
        Self {
            login_provider,
            rx,
            state: flow::State::NotAuthenticated,
        }
    }

    //@FIXME add a function that compute the runner's name from its local info
    // to ease debug
    // fn name(&self) -> String { }

    async fn start(mut self) {
        //@FIXME add more info about the runner
        tracing::debug!("starting runner");

        while let Some(msg) = self.rx.recv().await {
            // Command behavior is modulated by the state.
            // To prevent state error, we handle the same command in separate code paths.
            let ctrl = match &mut self.state {
                flow::State::NotAuthenticated => {
                    let ctx = anonymous::AnonymousContext {
                        req: &msg.req,
                        login_provider: Some(&self.login_provider),
                    };
                    anonymous::dispatch(ctx).await
                }
                flow::State::Authenticated(ref user) => {
                    let ctx = authenticated::AuthenticatedContext {
                        req: &msg.req,
                        user,
                    };
                    authenticated::dispatch(ctx).await
                }
                flow::State::Selected(ref user, ref mut mailbox) => {
                    let ctx = selected::SelectedContext {
                        req: &msg.req,
                        user,
                        mailbox,
                    };
                    selected::dispatch(ctx).await
                }
                flow::State::Examined(ref user, ref mut mailbox) => {
                    let ctx = examined::ExaminedContext {
                        req: &msg.req,
                        user,
                        mailbox,
                    };
                    examined::dispatch(ctx).await
                }
                flow::State::Logout => {
                    Response::bad("No commands are allowed in the LOGOUT state.")
                        .map(|r| (r, flow::Transition::None))
                        .map_err(Error::msg)
                }
            };

            // Process result
            let res = match ctrl {
                Ok((res, tr)) => {
                    //@FIXME remove unwrap
                    self.state = match self.state.apply(tr) {
                        Ok(new_state) => new_state,
                        Err(e) => {
                            tracing::error!("Invalid transition: {}, exiting", e);
                            break;
                        }
                    };

                    //@FIXME enrich here the command with some global status

                    Ok(res)
                }
                // Cast from anyhow::Error to Bal::Error
                // @FIXME proper error handling would be great
                Err(e) => match e.downcast::<BalError>() {
                    Ok(be) => Err(be),
                    Err(e) => {
                        tracing::warn!(error=%e, "internal.error");
                        Response::bad("Internal error")
                    }
                },
            };

            //@FIXME I think we should quit this thread on error and having our manager watch it,
            // and then abort the session as it is corrupted.
            msg.tx.send(res).unwrap_or_else(|e| {
                tracing::warn!("failed to send imap response to manager: {:#?}", e)
            });

            if let flow::State::Logout = &self.state {
                break;
            }
        }

        //@FIXME add more info about the runner
        tracing::debug!("exiting runner");
    }
}