1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
use std::sync::Arc;
use anyhow::Error;
use boitalettres::errors::Error as BalError;
use boitalettres::proto::{Request, Response};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use imap_codec::types::command::CommandBody;
use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use crate::command;
use crate::login::Credentials;
use crate::mailbox::Mailbox;
use crate::LoginProvider;
/* This constant configures backpressure in the system,
* or more specifically, how many pipelined messages are allowed
* before refusing them
*/
const MAX_PIPELINED_COMMANDS: usize = 10;
struct Message {
req: Request,
tx: oneshot::Sender<Result<Response, BalError>>,
}
pub struct Manager {
tx: mpsc::Sender<Message>,
}
//@FIXME we should garbage collect the Instance when the Manager is destroyed.
impl Manager {
pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
tokio::spawn(async move {
let mut instance = Instance::new(login_provider, rx);
instance.start().await;
});
Self { tx }
}
pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
let (tx, rx) = oneshot::channel();
let tag = req.tag.clone();
let msg = Message { req, tx };
// We use try_send on a bounded channel to protect the daemons from DoS.
// Pipelining requests in IMAP are a special case: they should not occure often
// and in a limited number (like 3 requests). Someone filling the channel
// will probably be malicious so we "rate limit" them.
match self.tx.try_send(msg) {
Ok(()) => (),
Err(TrySendError::Full(_)) => {
return async {
Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
.boxed()
}
Err(TrySendError::Closed(_)) => {
return async {
Status::bad(Some(tag), None, "The session task has exited")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
.boxed()
}
};
// @FIXME add a timeout, handle a session that fails.
async {
match rx.await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Got error {:#?}", e);
Status::bad(Some(tag), None, "No response from the session handler")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
}
}
.boxed()
}
}
pub struct User {
pub name: String,
pub creds: Credentials,
}
pub struct Instance {
rx: mpsc::Receiver<Message>,
pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
pub selected: Option<Mailbox>,
pub user: Option<User>,
}
impl Instance {
fn new(
login_provider: Arc<dyn LoginProvider + Send + Sync>,
rx: mpsc::Receiver<Message>,
) -> Self {
Self {
login_provider,
rx,
selected: None,
user: None,
}
}
//@FIXME add a function that compute the runner's name from its local info
// to ease debug
// fn name(&self) -> String { }
async fn start(&mut self) {
//@FIXME add more info about the runner
tracing::debug!("starting runner");
while let Some(msg) = self.rx.recv().await {
let mut cmd = command::Command::new(msg.req.tag.clone(), self);
let res = match msg.req.body {
CommandBody::Capability => cmd.capability().await,
CommandBody::Login { username, password } => cmd.login(username, password).await,
CommandBody::Lsub {
reference,
mailbox_wildcard,
} => cmd.lsub(reference, mailbox_wildcard).await,
CommandBody::List {
reference,
mailbox_wildcard,
} => cmd.list(reference, mailbox_wildcard).await,
CommandBody::Select { mailbox } => cmd.select(mailbox).await,
CommandBody::Fetch {
sequence_set,
attributes,
uid,
} => cmd.fetch(sequence_set, attributes, uid).await,
_ => Status::bad(Some(msg.req.tag.clone()), None, "Unknown command")
.map(|s| vec![ImapRes::Status(s)])
.map_err(Error::msg),
};
let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() {
Ok(be) => Err(be),
Err(ae) => {
tracing::warn!(error=%ae, "internal.error");
Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
.map(|s| vec![ImapRes::Status(s)])
.map_err(|e| BalError::Text(e.to_string()))
}
});
//@FIXME I think we should quit this thread on error and having our manager watch it,
// and then abort the session as it is corrupted.
msg.tx.send(wrapped_res).unwrap_or_else(|e| {
tracing::warn!("failed to send imap response to manager: {:#?}", e)
});
}
//@FIXME add more info about the runner
tracing::debug!("exiting runner");
}
}
|