aboutsummaryrefslogtreecommitdiff
path: root/src/mail
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-02-22 11:35:39 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-02-22 11:35:39 +0100
commit4d501b6947497d7705c64dcaf093e5e5a09a9235 (patch)
tree841c86683256802e928bf6ca162b2ee18bb348f1 /src/mail
parentde5717a020ed741ef206ebb649bac2cd678572fa (diff)
downloadaerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.tar.gz
aerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.zip
Compile streams
Diffstat (limited to 'src/mail')
-rw-r--r--src/mail/mailbox.rs2
-rw-r--r--src/mail/query.rs68
2 files changed, 32 insertions, 38 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index c20d815..9190883 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou<UidIndex>) {
/// The metadata of a message that is stored in K2V
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MailMeta {
/// INTERNALDATE field (milliseconds since epoch)
pub internaldate: u64,
diff --git a/src/mail/query.rs b/src/mail/query.rs
index a183c5a..cbc5fe6 100644
--- a/src/mail/query.rs
+++ b/src/mail/query.rs
@@ -2,7 +2,8 @@ use super::mailbox::MailMeta;
use super::snapshot::FrozenMailbox;
use super::unique_ident::UniqueIdent;
use anyhow::Result;
-use futures::stream::{FuturesOrdered, StreamExt};
+use futures::stream::{Stream, StreamExt, BoxStream};
+use futures::future::FutureExt;
/// Query is in charge of fetching efficiently
/// requested data for a list of emails
@@ -28,64 +29,57 @@ impl QueryScope {
}
}
+//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>;
+
impl<'a, 'b> Query<'a, 'b> {
- pub async fn fetch(&self) -> Result<Vec<QueryResult>> {
+ pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
match self.scope {
- QueryScope::Index => Ok(self
- .emails
- .iter()
- .map(|&uuid| QueryResult::IndexResult { uuid })
- .collect()),
- QueryScope::Partial => self.partial().await,
- QueryScope::Full => self.full().await,
+ QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))),
+ QueryScope::Partial => Box::pin(self.partial()),
+ QueryScope::Full => Box::pin(self.full()),
}
}
// --- functions below are private *for reasons*
+ fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+ async move {
+ let maybe_meta_list: Result<Vec<MailMeta>> = self.frozen.mailbox.fetch_meta(self.emails).await;
+ let list_res = maybe_meta_list
+ .map(|meta_list| meta_list
+ .into_iter()
+ .zip(self.emails)
+ .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
+ .collect()
+ )
+ .unwrap_or_else(|e| vec![Err(e)]);
- async fn partial(&self) -> Result<Vec<QueryResult>> {
- let meta = self.frozen.mailbox.fetch_meta(self.emails).await?;
- let result = meta
- .into_iter()
- .zip(self.emails.iter())
- .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata })
- .collect::<Vec<_>>();
-
- Ok(result)
+ futures::stream::iter(list_res)
+ }.flatten_stream()
}
- /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY
- /// AND GENERATE SO MUCH NETWORK TRAFFIC.
- /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH
- /// SOMETHING LIKE AN ITERATOR
- async fn full(&self) -> Result<Vec<QueryResult>> {
- let meta_list = self.partial().await?;
- meta_list
- .into_iter()
- .map(|meta| async move {
+ fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
+ self.partial()
+ .then(move |maybe_meta| async move {
+ let meta = maybe_meta?;
+
let content = self
.frozen
.mailbox
.fetch_full(
*meta.uuid(),
&meta
- .metadata()
- .expect("meta to be PartialResult")
- .message_key,
- )
+ .metadata()
+ .expect("meta to be PartialResult")
+ .message_key,
+ )
.await?;
Ok(meta.into_full(content).expect("meta to be PartialResult"))
})
- .collect::<FuturesOrdered<_>>()
- .collect::<Vec<_>>()
- .await
- .into_iter()
- .collect::<Result<Vec<_>, _>>()
}
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum QueryResult {
IndexResult {
uuid: UniqueIdent,