1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
use std::sync::Arc;
use anyhow::Error;
use boitalettres::errors::Error as BalError;
use boitalettres::proto::{Request, Response};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use imap_codec::types::command::CommandBody;
use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use crate::command::{anonymous,authenticated,selected};
use crate::login::Credentials;
use crate::mailbox::Mailbox;
use crate::LoginProvider;
/* This constant configures backpressure in the system,
* or more specifically, how many pipelined messages are allowed
* before refusing them
*/
const MAX_PIPELINED_COMMANDS: usize = 10;
struct Message {
req: Request,
tx: oneshot::Sender<Result<Response, BalError>>,
}
//-----
pub struct Manager {
tx: mpsc::Sender<Message>,
}
impl Manager {
pub fn new(login_provider: ArcLoginProvider) -> Self {
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
tokio::spawn(async move {
let mut instance = Instance::new(login_provider, rx);
instance.start().await;
});
Self { tx }
}
pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
let (tx, rx) = oneshot::channel();
let tag = req.tag.clone();
let msg = Message { req, tx };
// We use try_send on a bounded channel to protect the daemons from DoS.
// Pipelining requests in IMAP are a special case: they should not occure often
// and in a limited number (like 3 requests). Someone filling the channel
// will probably be malicious so we "rate limit" them.
match self.tx.try_send(msg) {
Ok(()) => (),
Err(TrySendError::Full(_)) => {
return async {
Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
.boxed()
}
Err(TrySendError::Closed(_)) => {
return async {
Status::bad(Some(tag), None, "The session task has exited")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
.boxed()
}
};
// @FIXME add a timeout, handle a session that fails.
async {
match rx.await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Got error {:#?}", e);
Status::bad(Some(tag), None, "No response from the session handler")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
}
}
.boxed()
}
}
//-----
pub struct Context<'a> {
req: &'a Request,
state: &'a mut flow::State,
login: ArcLoginProvider,
}
pub struct Instance {
rx: mpsc::Receiver<Message>,
pub login_provider: ArcLoginProvider,
pub state: flow::State,
}
impl Instance {
fn new(
login_provider: ArcLoginProvider,
rx: mpsc::Receiver<Message>,
) -> Self {
Self {
login_provider,
rx,
state: flow::State::NotAuthenticated,
}
}
//@FIXME add a function that compute the runner's name from its local info
// to ease debug
// fn name(&self) -> String { }
async fn start(&mut self) {
//@FIXME add more info about the runner
tracing::debug!("starting runner");
while let Some(msg) = self.rx.recv().await {
let ctx = Context { req: &msg.req, state: &mut self.state, login: self.login_provider };
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code path depending
// on the State.
let cmd_res = match self.state {
flow::State::NotAuthenticated => anonymous::dispatch(ctx).await,
flow::State::Authenticated(user) => authenticated::dispatch(ctx).await,
flow::State::Selected(user, mailbox) => selected::dispatch(ctx).await,
flow::State::Logout => Status::bad(Some(ctx.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.")
.map(|s| vec![ImapRes::Status(s)])
.map_err(Error::msg),
};
/*
match req.body {
CommandBody::Capability => anonymous::capability().await,
CommandBody::Login { username, password } => anonymous::login(self.login_provider, username, password).await.and_then(|(user, response)| {
self.state.authenticate(user)?;
Ok(response)
},
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.")
.map(|s| vec![ImapRes::Status(s)])
.map_err(Error::msg),
},
flow::State::Authenticated(user) => match req.body {
CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await,
CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await,
CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| {
self.state.select(mailbox);
Ok(response)
}),
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.")
.map(|s| vec![ImapRes::Status(s)])
.map_err(Error::msg),
},
flow::State::Selected(user, mailbox) => match req.body {
CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
CommandBody::Fetch { sequence_set, attributes, uid, } => selected::fetch(sequence_set, attributes, uid).await,
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the SELECTED state.")
.map(|s| vec![ImapRes::Status(s)])
.map_err(Error::msg),
},
flow::State::Logout => Status::bad(Some(msg.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.")
.map(|s| vec![ImapRes::Status(s)])
.map_err(Error::msg),
}
*/
let imap_res = match cmd_res {
Ok(new_state, imap_res) => {
self.state = new_state;
Ok(imap_res)
},
Err(e) if Ok(be) = e.downcast::<BalError>() => Err(be),
Err(e) => {
tracing::warn!(error=%e, "internal.error");
Ok(Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string())))
}
};
//@FIXME I think we should quit this thread on error and having our manager watch it,
// and then abort the session as it is corrupted.
msg.tx.send(imap_res).unwrap_or_else(|e| {
tracing::warn!("failed to send imap response to manager: {:#?}", e)
});
}
//@FIXME add more info about the runner
tracing::debug!("exiting runner");
}
}
|