From 32dfd25f570b7a55bf43752684d286be0f6b2dc2 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 16 May 2024 17:38:34 +0200 Subject: format + WIP calendar-query --- aero-proto/src/dav/codec.rs | 71 ++-- aero-proto/src/dav/controller.rs | 200 +++++++--- aero-proto/src/dav/middleware.rs | 28 +- aero-proto/src/dav/mod.rs | 61 +-- aero-proto/src/dav/node.rs | 68 ++-- aero-proto/src/dav/resource.rs | 565 ++++++++++++++++++--------- aero-proto/src/imap/command/anonymous.rs | 2 +- aero-proto/src/imap/command/authenticated.rs | 4 +- aero-proto/src/imap/mod.rs | 6 +- aero-proto/src/lmtp.rs | 6 +- aero-proto/src/sasl.rs | 2 +- 11 files changed, 677 insertions(+), 336 deletions(-) (limited to 'aero-proto/src') diff --git a/aero-proto/src/dav/codec.rs b/aero-proto/src/dav/codec.rs index 57c3808..a441e7e 100644 --- a/aero-proto/src/dav/codec.rs +++ b/aero-proto/src/dav/codec.rs @@ -1,26 +1,30 @@ use anyhow::{bail, Result}; -use hyper::{Request, Response, body::Bytes}; -use hyper::body::Incoming; -use http_body_util::Full; +use futures::sink::SinkExt; use futures::stream::StreamExt; use futures::stream::TryStreamExt; +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::BodyExt; use http_body_util::BodyStream; +use http_body_util::Full; use http_body_util::StreamBody; -use http_body_util::combinators::UnsyncBoxBody; use hyper::body::Frame; -use tokio_util::sync::PollSender; +use hyper::body::Incoming; +use hyper::{body::Bytes, Request, Response}; use std::io::{Error, ErrorKind}; -use futures::sink::SinkExt; -use tokio_util::io::{SinkWriter, CopyToBytes}; -use http_body_util::BodyExt; +use tokio_util::io::{CopyToBytes, SinkWriter}; +use tokio_util::sync::PollSender; -use aero_dav::types as dav; -use aero_dav::xml as dxml; use super::controller::HttpResponse; use super::node::PutPolicy; +use aero_dav::types as dav; +use aero_dav::xml as dxml; pub(crate) fn depth(req: &Request) -> dav::Depth { - match req.headers().get("Depth").map(hyper::header::HeaderValue::to_str) { + match req + .headers() + .get("Depth") + .map(hyper::header::HeaderValue::to_str) + { Some(Ok("0")) => dav::Depth::Zero, Some(Ok("1")) => dav::Depth::One, Some(Ok("Infinity")) => dav::Depth::Infinity, @@ -29,20 +33,28 @@ pub(crate) fn depth(req: &Request) -> dav::Depth { } pub(crate) fn put_policy(req: &Request) -> Result { - if let Some(maybe_txt_etag) = req.headers().get("If-Match").map(hyper::header::HeaderValue::to_str) { + if let Some(maybe_txt_etag) = req + .headers() + .get("If-Match") + .map(hyper::header::HeaderValue::to_str) + { let etag = maybe_txt_etag?; let dquote_count = etag.chars().filter(|c| *c == '"').count(); if dquote_count != 2 { bail!("Either If-Match value is invalid or it's not supported (only single etag is supported)"); } - return Ok(PutPolicy::ReplaceEtag(etag.into())) + return Ok(PutPolicy::ReplaceEtag(etag.into())); } - if let Some(maybe_txt_etag) = req.headers().get("If-None-Match").map(hyper::header::HeaderValue::to_str) { + if let Some(maybe_txt_etag) = req + .headers() + .get("If-None-Match") + .map(hyper::header::HeaderValue::to_str) + { let etag = maybe_txt_etag?; if etag == "*" { - return Ok(PutPolicy::CreateOnly) + return Ok(PutPolicy::CreateOnly); } bail!("Either If-None-Match value is invalid or it's not supported (only asterisk is supported)") } @@ -54,7 +66,10 @@ pub(crate) fn text_body(txt: &'static str) -> UnsyncBoxBody(status_ok: hyper::StatusCode, elem: T) -> Result { +pub(crate) fn serialize( + status_ok: hyper::StatusCode, + elem: T, +) -> Result { let (tx, rx) = tokio::sync::mpsc::channel::(1); // Build the writer @@ -62,10 +77,21 @@ pub(crate) fn serialize(status_ok: hyper::Stat let sink = PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::BrokenPipe)); let mut writer = SinkWriter::new(CopyToBytes::new(sink)); let q = quick_xml::writer::Writer::new_with_indent(&mut writer, b' ', 4); - let ns_to_apply = vec![ ("xmlns:D".into(), "DAV:".into()), ("xmlns:C".into(), "urn:ietf:params:xml:ns:caldav".into()) ]; + let ns_to_apply = vec![ + ("xmlns:D".into(), "DAV:".into()), + ("xmlns:C".into(), "urn:ietf:params:xml:ns:caldav".into()), + ]; let mut qwriter = dxml::Writer { q, ns_to_apply }; - let decl = quick_xml::events::BytesDecl::from_start(quick_xml::events::BytesStart::from_content("xml version=\"1.0\" encoding=\"utf-8\"", 0)); - match qwriter.q.write_event_async(quick_xml::events::Event::Decl(decl)).await { + let decl = + quick_xml::events::BytesDecl::from_start(quick_xml::events::BytesStart::from_content( + "xml version=\"1.0\" encoding=\"utf-8\"", + 0, + )); + match qwriter + .q + .write_event_async(quick_xml::events::Event::Decl(decl)) + .await + { Ok(_) => (), Err(e) => tracing::error!(err=?e, "unable to write XML declaration "), } @@ -75,7 +101,6 @@ pub(crate) fn serialize(status_ok: hyper::Stat } }); - // Build the reader let recv = tokio_stream::wrappers::ReceiverStream::new(rx); let stream = StreamBody::new(recv.map(|v| Ok(Frame::data(v)))); @@ -89,7 +114,6 @@ pub(crate) fn serialize(status_ok: hyper::Stat Ok(response) } - /// Deserialize a request body to an XML request pub(crate) async fn deserialize>(req: Request) -> Result { let stream_of_frames = BodyStream::new(req.into_body()); @@ -97,7 +121,10 @@ pub(crate) async fn deserialize>(req: Request) -> Res .map_ok(|frame| frame.into_data()) .map(|obj| match obj { Ok(Ok(v)) => Ok(v), - Ok(Err(_)) => Err(std::io::Error::new(std::io::ErrorKind::Other, "conversion error")), + Ok(Err(_)) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "conversion error", + )), Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)), }); let async_read = tokio_util::io::StreamReader::new(stream_of_bytes); diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs index 885828f..0bf7a7d 100644 --- a/aero-proto/src/dav/controller.rs +++ b/aero-proto/src/dav/controller.rs @@ -1,21 +1,21 @@ use anyhow::Result; -use http_body_util::combinators::{UnsyncBoxBody, BoxBody}; -use hyper::body::Incoming; -use hyper::{Request, Response, body::Bytes}; +use futures::stream::{StreamExt, TryStreamExt}; +use http_body_util::combinators::{BoxBody, UnsyncBoxBody}; use http_body_util::BodyStream; use http_body_util::StreamBody; use hyper::body::Frame; -use futures::stream::{StreamExt, TryStreamExt}; +use hyper::body::Incoming; +use hyper::{body::Bytes, Request, Response}; use aero_collections::user::User; -use aero_dav::types as dav; -use aero_dav::realization::All; use aero_dav::caltypes as cal; +use aero_dav::realization::All; +use aero_dav::types as dav; -use crate::dav::codec::{serialize, deserialize, depth, text_body}; +use crate::dav::codec; +use crate::dav::codec::{depth, deserialize, serialize, text_body}; use crate::dav::node::{DavNode, PutPolicy}; use crate::dav::resource::RootNode; -use crate::dav::codec; pub(super) type ArcUser = std::sync::Arc; pub(super) type HttpResponse = Response>; @@ -39,19 +39,22 @@ pub(crate) struct Controller { req: Request, } impl Controller { - pub(crate) async fn route(user: std::sync::Arc, req: Request) -> Result { + pub(crate) async fn route( + user: std::sync::Arc, + req: Request, + ) -> Result { let path = req.uri().path().to_string(); let path_segments: Vec<_> = path.split("/").filter(|s| *s != "").collect(); let method = req.method().as_str().to_uppercase(); let can_create = matches!(method.as_str(), "PUT" | "MKCOL" | "MKCALENDAR"); - let node = match (RootNode {}).fetch(&user, &path_segments, can_create).await{ + let node = match (RootNode {}).fetch(&user, &path_segments, can_create).await { Ok(v) => v, Err(e) => { tracing::warn!(err=?e, "dav node fetch failed"); return Ok(Response::builder() .status(404) - .body(codec::text_body("Resource not found"))?) + .body(codec::text_body("Resource not found"))?); } }; @@ -80,7 +83,6 @@ impl Controller { } } - // --- Per-method functions --- /// REPORT has been first described in the "Versioning Extension" of WebDAV @@ -89,7 +91,7 @@ impl Controller { /// Note: current implementation is not generic at all, it is heavily tied to CalDAV. /// A rewrite would be required to make it more generic (with the extension system that has /// been introduced in aero-dav) - async fn report(self) -> Result { + async fn report(self) -> Result { let status = hyper::StatusCode::from_u16(207)?; let report = match deserialize::>(self.req).await { @@ -97,54 +99,75 @@ impl Controller { Err(e) => { tracing::error!(err=?e, "unable to decode REPORT body"); return Ok(Response::builder() - .status(400) - .body(text_body("Bad request"))?) + .status(400) + .body(text_body("Bad request"))?); } }; - // Multiget is really like a propfind where Depth: 0|1|Infinity is replaced by an arbitrary - // list of URLs - // @FIXME - let multiget = match report { - cal::Report::Multiget(m) => m, - cal::Report::Query(q) => todo!(), - cal::Report::FreeBusy(_) => return Ok(Response::builder() - .status(501) - .body(text_body("Not implemented"))?), - }; - - // Getting the list of nodes + // Internal representation that will handle processed request let (mut ok_node, mut not_found) = (Vec::new(), Vec::new()); - for h in multiget.href.into_iter() { - let maybe_collected_node = match Path::new(h.0.as_str()) { - Ok(Path::Abs(p)) => RootNode{}.fetch(&self.user, p.as_slice(), false).await.or(Err(h)), - Ok(Path::Rel(p)) => self.node.fetch(&self.user, p.as_slice(), false).await.or(Err(h)), - Err(_) => Err(h), - }; - - match maybe_collected_node { - Ok(v) => ok_node.push(v), - Err(h) => not_found.push(h), - }; - } + let calprop: Option>; + + // Extracting request information + match report { + cal::Report::Multiget(m) => { + // Multiget is really like a propfind where Depth: 0|1|Infinity is replaced by an arbitrary + // list of URLs + // Getting the list of nodes + for h in m.href.into_iter() { + let maybe_collected_node = match Path::new(h.0.as_str()) { + Ok(Path::Abs(p)) => RootNode {} + .fetch(&self.user, p.as_slice(), false) + .await + .or(Err(h)), + Ok(Path::Rel(p)) => self + .node + .fetch(&self.user, p.as_slice(), false) + .await + .or(Err(h)), + Err(_) => Err(h), + }; + + match maybe_collected_node { + Ok(v) => ok_node.push(v), + Err(h) => not_found.push(h), + }; + } + calprop = m.selector; + } + cal::Report::Query(q) => { + calprop = q.selector; + ok_node = apply_filter(&self.user, self.node.children(&self.user).await, q.filter) + .try_collect() + .await?; + } + cal::Report::FreeBusy(_) => { + return Ok(Response::builder() + .status(501) + .body(text_body("Not implemented"))?) + } + }; // Getting props - let props = match multiget.selector { + let props = match calprop { None | Some(cal::CalendarSelector::AllProp) => Some(dav::PropName(ALLPROP.to_vec())), Some(cal::CalendarSelector::PropName) => None, Some(cal::CalendarSelector::Prop(inner)) => Some(inner), }; - serialize(status, Self::multistatus(&self.user, ok_node, not_found, props).await) + serialize( + status, + Self::multistatus(&self.user, ok_node, not_found, props).await, + ) } /// PROPFIND is the standard way to fetch WebDAV properties - async fn propfind(self) -> Result { + async fn propfind(self) -> Result { let depth = depth(&self.req); if matches!(depth, dav::Depth::Infinity) { return Ok(Response::builder() .status(501) - .body(text_body("Depth: Infinity not implemented"))?) + .body(text_body("Depth: Infinity not implemented"))?); } let status = hyper::StatusCode::from_u16(207)?; @@ -153,7 +176,9 @@ impl Controller { // request body MUST be treated as if it were an 'allprop' request. // @FIXME here we handle any invalid data as an allprop, an empty request is thus correctly // handled, but corrupted requests are also silently handled as allprop. - let propfind = deserialize::>(self.req).await.unwrap_or_else(|_| dav::PropFind::::AllProp(None)); + let propfind = deserialize::>(self.req) + .await + .unwrap_or_else(|_| dav::PropFind::::AllProp(None)); tracing::debug!(recv=?propfind, "inferred propfind request"); // Collect nodes as PROPFIND is not limited to the targeted node @@ -170,29 +195,36 @@ impl Controller { dav::PropFind::AllProp(Some(dav::Include(mut include))) => { include.extend_from_slice(&ALLPROP); Some(dav::PropName(include)) - }, + } dav::PropFind::Prop(inner) => Some(inner), }; // Not Found is currently impossible considering the way we designed this function let not_found = vec![]; - serialize(status, Self::multistatus(&self.user, nodes, not_found, propname).await) + serialize( + status, + Self::multistatus(&self.user, nodes, not_found, propname).await, + ) } - async fn put(self) -> Result { + async fn put(self) -> Result { let put_policy = codec::put_policy(&self.req)?; let stream_of_frames = BodyStream::new(self.req.into_body()); let stream_of_bytes = stream_of_frames - .map_ok(|frame| frame.into_data()) - .map(|obj| match obj { - Ok(Ok(v)) => Ok(v), - Ok(Err(_)) => Err(std::io::Error::new(std::io::ErrorKind::Other, "conversion error")), - Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)), - }).boxed(); + .map_ok(|frame| frame.into_data()) + .map(|obj| match obj { + Ok(Ok(v)) => Ok(v), + Ok(Err(_)) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "conversion error", + )), + Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)), + }) + .boxed(); let etag = self.node.put(put_policy, stream_of_bytes).await?; - + let response = Response::builder() .status(201) .header("ETag", etag) @@ -202,7 +234,7 @@ impl Controller { Ok(response) } - async fn get(self) -> Result { + async fn get(self) -> Result { let stream_body = StreamBody::new(self.node.content().map_ok(|v| Frame::data(v))); let boxed_body = UnsyncBoxBody::new(stream_body); @@ -227,17 +259,33 @@ impl Controller { // --- Common utility functions --- /// Build a multistatus response from a list of DavNodes - async fn multistatus(user: &ArcUser, nodes: Vec>, not_found: Vec, props: Option>) -> dav::Multistatus { + async fn multistatus( + user: &ArcUser, + nodes: Vec>, + not_found: Vec, + props: Option>, + ) -> dav::Multistatus { // Collect properties on existing objects let mut responses: Vec> = match props { - Some(props) => futures::stream::iter(nodes).then(|n| n.response_props(user, props.clone())).collect().await, - None => nodes.into_iter().map(|n| n.response_propname(user)).collect(), + Some(props) => { + futures::stream::iter(nodes) + .then(|n| n.response_props(user, props.clone())) + .collect() + .await + } + None => nodes + .into_iter() + .map(|n| n.response_propname(user)) + .collect(), }; // Register not found objects only if relevant if !not_found.is_empty() { responses.push(dav::Response { - status_or_propstat: dav::StatusOrPropstat::Status(not_found, dav::Status(hyper::StatusCode::NOT_FOUND)), + status_or_propstat: dav::StatusOrPropstat::Status( + not_found, + dav::Status(hyper::StatusCode::NOT_FOUND), + ), error: None, location: None, responsedescription: None, @@ -252,7 +300,6 @@ impl Controller { } } - /// Path is a voluntarily feature limited /// compared to the expressiveness of a UNIX path /// For example getting parent with ../ is not supported, scheme is not supported, etc. @@ -271,8 +318,39 @@ impl<'a> Path<'a> { let path_segments: Vec<_> = path.split("/").filter(|s| *s != "" && *s != ".").collect(); if path.starts_with("/") { - return Ok(Path::Abs(path_segments)) + return Ok(Path::Abs(path_segments)); } Ok(Path::Rel(path_segments)) } } + +//@FIXME move somewhere else +//@FIXME naive implementation, must be refactored later +use futures::stream::Stream; +use icalendar; +fn apply_filter( + user: &ArcUser, + nodes: Vec>, + filter: cal::Filter, +) -> impl Stream, std::io::Error>> { + futures::stream::iter(nodes).filter_map(|single_node| async move { + // Get ICS + let chunks: Vec<_> = match single_node.content().try_collect().await { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + let raw_ics = chunks.iter().fold(String::new(), |mut acc, single_chunk| { + let str_fragment = std::str::from_utf8(single_chunk.as_ref()); + acc.extend(str_fragment); + acc + }); + + // Parse ICS + let ics = icalendar::parser::read_calendar(&raw_ics).unwrap(); + + // Do checks + + // Object has been kept + Some(Ok(single_node)) + }) +} diff --git a/aero-proto/src/dav/middleware.rs b/aero-proto/src/dav/middleware.rs index e19ce14..8964699 100644 --- a/aero-proto/src/dav/middleware.rs +++ b/aero-proto/src/dav/middleware.rs @@ -1,10 +1,10 @@ use anyhow::{anyhow, Result}; use base64::Engine; -use hyper::{Request, Response}; use hyper::body::Incoming; +use hyper::{Request, Response}; -use aero_user::login::ArcLoginProvider; use aero_collections::user::User; +use aero_user::login::ArcLoginProvider; use super::codec::text_body; use super::controller::HttpResponse; @@ -13,7 +13,7 @@ type ArcUser = std::sync::Arc; pub(super) async fn auth<'a>( login: ArcLoginProvider, - req: Request, + req: Request, next: impl Fn(ArcUser, Request) -> futures::future::BoxFuture<'a, Result>, ) -> Result { let auth_val = match req.headers().get(hyper::header::AUTHORIZATION) { @@ -23,8 +23,8 @@ pub(super) async fn auth<'a>( return Ok(Response::builder() .status(401) .header("WWW-Authenticate", "Basic realm=\"Aerogramme\"") - .body(text_body("Missing Authorization field"))?) - }, + .body(text_body("Missing Authorization field"))?); + } }; let b64_creds_maybe_padded = match auth_val.split_once(" ") { @@ -33,8 +33,8 @@ pub(super) async fn auth<'a>( tracing::info!("Unsupported authorization field"); return Ok(Response::builder() .status(400) - .body(text_body("Unsupported Authorization field"))?) - }, + .body(text_body("Unsupported Authorization field"))?); + } }; // base64urlencoded may have trailing equals, base64urlsafe has not @@ -44,22 +44,22 @@ pub(super) async fn auth<'a>( // Decode base64 let creds = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64_creds_clean)?; let str_creds = std::str::from_utf8(&creds)?; - + // Split username and password - let (username, password) = str_creds - .split_once(':') - .ok_or(anyhow!("Missing colon in Authorization, can't split decoded value into a username/password pair"))?; + let (username, password) = str_creds.split_once(':').ok_or(anyhow!( + "Missing colon in Authorization, can't split decoded value into a username/password pair" + ))?; // Call login provider let creds = match login.login(username, password).await { Ok(c) => c, Err(_) => { - tracing::info!(user=username, "Wrong credentials"); + tracing::info!(user = username, "Wrong credentials"); return Ok(Response::builder() .status(401) .header("WWW-Authenticate", "Basic realm=\"Aerogramme\"") - .body(text_body("Wrong credentials"))?) - }, + .body(text_body("Wrong credentials"))?); + } }; // Build a user diff --git a/aero-proto/src/dav/mod.rs b/aero-proto/src/dav/mod.rs index de2e690..43de3a5 100644 --- a/aero-proto/src/dav/mod.rs +++ b/aero-proto/src/dav/mod.rs @@ -1,6 +1,6 @@ -mod middleware; -mod controller; mod codec; +mod controller; +mod middleware; mod node; mod resource; @@ -8,19 +8,19 @@ use std::net::SocketAddr; use std::sync::Arc; use anyhow::Result; +use futures::future::FutureExt; +use futures::stream::{FuturesUnordered, StreamExt}; +use hyper::rt::{Read, Write}; +use hyper::server::conn::http1 as http; use hyper::service::service_fn; use hyper::{Request, Response}; -use hyper::server::conn::http1 as http; -use hyper::rt::{Read, Write}; use hyper_util::rt::TokioIo; -use futures::stream::{FuturesUnordered, StreamExt}; +use rustls_pemfile::{certs, private_key}; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; +use tokio::net::TcpStream; use tokio::sync::watch; use tokio_rustls::TlsAcceptor; -use tokio::net::TcpStream; -use futures::future::FutureExt; -use tokio::io::{AsyncRead, AsyncWrite}; -use rustls_pemfile::{certs, private_key}; use aero_user::config::{DavConfig, DavUnsecureConfig}; use aero_user::login::ArcLoginProvider; @@ -90,7 +90,7 @@ impl Server { Ok(v) => v, Err(e) => { tracing::error!(err=?e, "TLS acceptor failed"); - continue + continue; } }; @@ -100,21 +100,31 @@ impl Server { //abitrarily bound //@FIXME replace with a handler supporting http2 and TLS - match http::Builder::new().serve_connection(stream, service_fn(|req: Request| { - let login = login.clone(); - tracing::info!("{:?} {:?}", req.method(), req.uri()); - async { - match middleware::auth(login, req, |user, request| async { Controller::route(user, request).await }.boxed()).await { - Ok(v) => Ok(v), - Err(e) => { - tracing::error!(err=?e, "internal error"); - Response::builder() - .status(500) - .body(codec::text_body("Internal error")) - }, - } - } - })).await { + match http::Builder::new() + .serve_connection( + stream, + service_fn(|req: Request| { + let login = login.clone(); + tracing::info!("{:?} {:?}", req.method(), req.uri()); + async { + match middleware::auth(login, req, |user, request| { + async { Controller::route(user, request).await }.boxed() + }) + .await + { + Ok(v) => Ok(v), + Err(e) => { + tracing::error!(err=?e, "internal error"); + Response::builder() + .status(500) + .body(codec::text_body("Internal error")) + } + } + } + }), + ) + .await + { Err(e) => tracing::warn!(err=?e, "connection failed"), Ok(()) => tracing::trace!("connection terminated with success"), } @@ -149,7 +159,6 @@ impl Server { // // - // // // diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs index d246280..877342a 100644 --- a/aero-proto/src/dav/node.rs +++ b/aero-proto/src/dav/node.rs @@ -1,16 +1,17 @@ use anyhow::Result; -use futures::stream::{BoxStream, StreamExt}; use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{BoxStream, StreamExt}; use hyper::body::Bytes; -use aero_dav::types as dav; -use aero_dav::realization::All; use aero_collections::davdag::Etag; +use aero_dav::realization::All; +use aero_dav::types as dav; use super::controller::ArcUser; pub(crate) type Content<'a> = BoxStream<'a, std::result::Result>; -pub(crate) type PropertyStream<'a> = BoxStream<'a, std::result::Result, dav::PropertyRequest>>; +pub(crate) type PropertyStream<'a> = + BoxStream<'a, std::result::Result, dav::PropertyRequest>>; pub(crate) enum PutPolicy { OverwriteAll, @@ -25,7 +26,12 @@ pub(crate) trait DavNode: Send { /// This node direct children fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>>; /// Recursively fetch a child (progress inside the filesystem hierarchy) - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>>; + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>>; // node properties /// Get the path @@ -36,13 +42,17 @@ pub(crate) trait DavNode: Send { fn properties(&self, user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static>; //fn properties(&self, user: &ArcUser, prop: dav::PropName) -> Vec>; /// Put an element (create or update) - fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result>; + fn put<'a>( + &'a self, + policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result>; /// Content type of the element fn content_type(&self) -> &str; /// Get ETag fn etag(&self) -> BoxFuture>; /// Get content - fn content(&self) -> Content<'static>; + fn content<'a>(&self) -> Content<'a>; /// Delete fn delete(&self) -> BoxFuture>; @@ -52,24 +62,32 @@ pub(crate) trait DavNode: Send { fn response_propname(&self, user: &ArcUser) -> dav::Response { dav::Response { status_or_propstat: dav::StatusOrPropstat::PropStat( - dav::Href(self.path(user)), - vec![ - dav::PropStat { - status: dav::Status(hyper::StatusCode::OK), - prop: dav::AnyProp(self.supported_properties(user).0.into_iter().map(dav::AnyProperty::Request).collect()), - error: None, - responsedescription: None, - } - ], + dav::Href(self.path(user)), + vec![dav::PropStat { + status: dav::Status(hyper::StatusCode::OK), + prop: dav::AnyProp( + self.supported_properties(user) + .0 + .into_iter() + .map(dav::AnyProperty::Request) + .collect(), + ), + error: None, + responsedescription: None, + }], ), error: None, location: None, - responsedescription: None + responsedescription: None, } } /// Utility function to get a prop response from a node & a list of propname - fn response_props(&self, user: &ArcUser, props: dav::PropName) -> BoxFuture<'static, dav::Response> { + fn response_props( + &self, + user: &ArcUser, + props: dav::PropName, + ) -> BoxFuture<'static, dav::Response> { //@FIXME we should make the DAV parsed object a stream... let mut result_stream = self.properties(user, props); let path = self.path(user); @@ -87,8 +105,8 @@ pub(crate) trait DavNode: Send { // If at least one property has been found on this object, adding a HTTP 200 propstat to // the response if !found.is_empty() { - prop_desc.push(dav::PropStat { - status: dav::Status(hyper::StatusCode::OK), + prop_desc.push(dav::PropStat { + status: dav::Status(hyper::StatusCode::OK), prop: dav::AnyProp(found), error: None, responsedescription: None, @@ -98,8 +116,8 @@ pub(crate) trait DavNode: Send { // If at least one property can't be found on this object, adding a HTTP 404 propstat to // the response if !not_found.is_empty() { - prop_desc.push(dav::PropStat { - status: dav::Status(hyper::StatusCode::NOT_FOUND), + prop_desc.push(dav::PropStat { + status: dav::Status(hyper::StatusCode::NOT_FOUND), prop: dav::AnyProp(not_found), error: None, responsedescription: None, @@ -111,9 +129,9 @@ pub(crate) trait DavNode: Send { status_or_propstat: dav::StatusOrPropstat::PropStat(dav::Href(path), prop_desc), error: None, location: None, - responsedescription: None + responsedescription: None, } - }.boxed() + } + .boxed() } } - diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs index 944c6c8..d65ce38 100644 --- a/aero-proto/src/dav/resource.rs +++ b/aero-proto/src/dav/resource.rs @@ -2,23 +2,32 @@ use std::sync::Arc; type ArcUser = std::sync::Arc; use anyhow::{anyhow, Result}; -use futures::stream::{TryStreamExt, StreamExt}; use futures::io::AsyncReadExt; +use futures::stream::{StreamExt, TryStreamExt}; use futures::{future::BoxFuture, future::FutureExt}; -use aero_collections::{user::User, calendar::Calendar, davdag::{BlobId, Etag}}; -use aero_dav::types as dav; -use aero_dav::caltypes as cal; +use aero_collections::{ + calendar::Calendar, + davdag::{BlobId, Etag}, + user::User, +}; use aero_dav::acltypes as acl; -use aero_dav::realization::{All, self as all}; +use aero_dav::caltypes as cal; +use aero_dav::realization::{self as all, All}; +use aero_dav::types as dav; -use crate::dav::node::{DavNode, PutPolicy, Content}; use super::node::PropertyStream; +use crate::dav::node::{Content, DavNode, PutPolicy}; #[derive(Clone)] pub(crate) struct RootNode {} impl DavNode for RootNode { - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>> { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>> { if path.len() == 0 { let this = self.clone(); return async { Ok(Box::new(this) as Box) }.boxed(); @@ -34,7 +43,7 @@ impl DavNode for RootNode { } fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>> { - async { vec![Box::new(HomeNode { }) as Box] }.boxed() + async { vec![Box::new(HomeNode {}) as Box] }.boxed() } fn path(&self, user: &ArcUser) -> String { @@ -46,33 +55,53 @@ impl DavNode for RootNode { dav::PropertyRequest::DisplayName, dav::PropertyRequest::ResourceType, dav::PropertyRequest::GetContentType, - dav::PropertyRequest::Extension(all::PropertyRequest::Acl(acl::PropertyRequest::CurrentUserPrincipal)), + dav::PropertyRequest::Extension(all::PropertyRequest::Acl( + acl::PropertyRequest::CurrentUserPrincipal, + )), ]) } fn properties(&self, user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { let user = user.clone(); - futures::stream::iter(prop.0).map(move |n| { - let prop = match n { - dav::PropertyRequest::DisplayName => dav::Property::DisplayName("DAV Root".to_string()), - dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ - dav::ResourceType::Collection, - ]), - dav::PropertyRequest::GetContentType => dav::Property::GetContentType("httpd/unix-directory".into()), - dav::PropertyRequest::Extension(all::PropertyRequest::Acl(acl::PropertyRequest::CurrentUserPrincipal)) => - dav::Property::Extension(all::Property::Acl(acl::Property::CurrentUserPrincipal(acl::User::Authenticated(dav::Href(HomeNode{}.path(&user)))))), - v => return Err(v), - }; - Ok(prop) - }).boxed() + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName("DAV Root".to_string()) + } + dav::PropertyRequest::ResourceType => { + dav::Property::ResourceType(vec![dav::ResourceType::Collection]) + } + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("httpd/unix-directory".into()) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Acl( + acl::PropertyRequest::CurrentUserPrincipal, + )) => dav::Property::Extension(all::Property::Acl( + acl::Property::CurrentUserPrincipal(acl::User::Authenticated(dav::Href( + HomeNode {}.path(&user), + ))), + )), + v => return Err(v), + }; + Ok(prop) + }) + .boxed() } - fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result> { + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result> { futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() } - fn content(&self) -> Content<'static> { - futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() } fn content_type(&self) -> &str { @@ -91,29 +120,37 @@ impl DavNode for RootNode { #[derive(Clone)] pub(crate) struct HomeNode {} impl DavNode for HomeNode { - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>> { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>> { if path.len() == 0 { let node = Box::new(self.clone()) as Box; - return async { Ok(node) }.boxed() + return async { Ok(node) }.boxed(); } if path[0] == "calendar" { return async move { let child = Box::new(CalendarListNode::new(user).await?); child.fetch(user, &path[1..], create).await - }.boxed(); + } + .boxed(); } - + //@NOTE: we can't create a node at this level async { Err(anyhow!("Not found")) }.boxed() } fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>> { - async { - CalendarListNode::new(user).await + async { + CalendarListNode::new(user) + .await .map(|c| vec![Box::new(c) as Box]) - .unwrap_or(vec![]) - }.boxed() + .unwrap_or(vec![]) + } + .boxed() } fn path(&self, user: &ArcUser) -> String { @@ -125,38 +162,58 @@ impl DavNode for HomeNode { dav::PropertyRequest::DisplayName, dav::PropertyRequest::ResourceType, dav::PropertyRequest::GetContentType, - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarHomeSet)), + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarHomeSet, + )), ]) } fn properties(&self, user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { let user = user.clone(); - futures::stream::iter(prop.0).map(move |n| { - let prop = match n { - dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} home", user.username)), - dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ - dav::ResourceType::Collection, - dav::ResourceType::Extension(all::ResourceType::Acl(acl::ResourceType::Principal)), - ]), - dav::PropertyRequest::GetContentType => dav::Property::GetContentType("httpd/unix-directory".into()), - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarHomeSet)) => - dav::Property::Extension(all::Property::Cal(cal::Property::CalendarHomeSet(dav::Href( - //@FIXME we are hardcoding the calendar path, instead we would want to use - //objects - format!("/{}/calendar/", user.username) - )))), - v => return Err(v), - }; - Ok(prop) - }).boxed() + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} home", user.username)) + } + dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ + dav::ResourceType::Collection, + dav::ResourceType::Extension(all::ResourceType::Acl( + acl::ResourceType::Principal, + )), + ]), + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("httpd/unix-directory".into()) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarHomeSet, + )) => dav::Property::Extension(all::Property::Cal( + cal::Property::CalendarHomeSet(dav::Href( + //@FIXME we are hardcoding the calendar path, instead we would want to use + //objects + format!("/{}/calendar/", user.username), + )), + )), + v => return Err(v), + }; + Ok(prop) + }) + .boxed() } - fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result> { + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result> { futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() } - - fn content(&self) -> Content<'static> { - futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() } fn content_type(&self) -> &str { @@ -183,7 +240,12 @@ impl CalendarListNode { } } impl DavNode for CalendarListNode { - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>> { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>> { if path.len() == 0 { let node = Box::new(self.clone()) as Box; return async { Ok(node) }.boxed(); @@ -191,13 +253,18 @@ impl DavNode for CalendarListNode { async move { //@FIXME: we should create a node if the open returns a "not found". - let cal = user.calendars.open(user, path[0]).await?.ok_or(anyhow!("Not found"))?; - let child = Box::new(CalendarNode { + let cal = user + .calendars + .open(user, path[0]) + .await? + .ok_or(anyhow!("Not found"))?; + let child = Box::new(CalendarNode { col: cal, - calname: path[0].to_string() + calname: path[0].to_string(), }); child.fetch(user, &path[1..], create).await - }.boxed() + } + .boxed() } fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>> { @@ -206,18 +273,23 @@ impl DavNode for CalendarListNode { //@FIXME maybe we want to be lazy here?! futures::stream::iter(list.iter()) .filter_map(|name| async move { - user.calendars.open(user, name).await + user.calendars + .open(user, name) + .await .ok() .flatten() .map(|v| (name, v)) }) - .map(|(name, cal)| Box::new(CalendarNode { - col: cal, - calname: name.to_string(), - }) as Box) + .map(|(name, cal)| { + Box::new(CalendarNode { + col: cal, + calname: name.to_string(), + }) as Box + }) .collect::>>() .await - }.boxed() + } + .boxed() } fn path(&self, user: &ArcUser) -> String { @@ -234,23 +306,38 @@ impl DavNode for CalendarListNode { fn properties(&self, user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { let user = user.clone(); - futures::stream::iter(prop.0).map(move |n| { - let prop = match n { - dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} calendars", user.username)), - dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![dav::ResourceType::Collection]), - dav::PropertyRequest::GetContentType => dav::Property::GetContentType("httpd/unix-directory".into()), - v => return Err(v), - }; - Ok(prop) - }).boxed() + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} calendars", user.username)) + } + dav::PropertyRequest::ResourceType => { + dav::Property::ResourceType(vec![dav::ResourceType::Collection]) + } + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("httpd/unix-directory".into()) + } + v => return Err(v), + }; + Ok(prop) + }) + .boxed() } - fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result> { + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result> { futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() } - fn content(&self) -> Content<'static> { - futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() } fn content_type(&self) -> &str { @@ -272,17 +359,22 @@ pub(crate) struct CalendarNode { calname: String, } impl DavNode for CalendarNode { - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>> { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>> { if path.len() == 0 { let node = Box::new(self.clone()) as Box; - return async { Ok(node) }.boxed() + return async { Ok(node) }.boxed(); } let col = self.col.clone(); let calname = self.calname.clone(); async move { match (col.dag().await.idx_by_filename.get(path[0]), create) { - (Some(blob_id), _) => { + (Some(blob_id), _) => { let child = Box::new(EventNode { col: col.clone(), calname, @@ -290,7 +382,7 @@ impl DavNode for CalendarNode { blob_id: *blob_id, }); child.fetch(user, &path[1..], create).await - }, + } (None, true) => { let child = Box::new(CreateEventNode { col: col.clone(), @@ -298,11 +390,11 @@ impl DavNode for CalendarNode { filename: path[0].to_string(), }); child.fetch(user, &path[1..], create).await - }, + } _ => Err(anyhow!("Not found")), } - - }.boxed() + } + .boxed() } fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>> { @@ -310,15 +402,21 @@ impl DavNode for CalendarNode { let calname = self.calname.clone(); async move { - col.dag().await.idx_by_filename.iter().map(|(filename, blob_id)| { - Box::new(EventNode { - col: col.clone(), - calname: calname.clone(), - filename: filename.to_string(), - blob_id: *blob_id, - }) as Box - }).collect() - }.boxed() + col.dag() + .await + .idx_by_filename + .iter() + .map(|(filename, blob_id)| { + Box::new(EventNode { + col: col.clone(), + calname: calname.clone(), + filename: filename.to_string(), + blob_id: *blob_id, + }) as Box + }) + .collect() + } + .boxed() } fn path(&self, user: &ArcUser) -> String { @@ -330,38 +428,58 @@ impl DavNode for CalendarNode { dav::PropertyRequest::DisplayName, dav::PropertyRequest::ResourceType, dav::PropertyRequest::GetContentType, - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::SupportedCalendarComponentSet)), + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::SupportedCalendarComponentSet, + )), ]) } fn properties(&self, _user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { let calname = self.calname.to_string(); - futures::stream::iter(prop.0).map(move |n| { - let prop = match n { - dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} calendar", calname)), - dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ - dav::ResourceType::Collection, - dav::ResourceType::Extension(all::ResourceType::Cal(cal::ResourceType::Calendar)), - ]), - //dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("httpd/unix-directory".into())), - //@FIXME seems wrong but seems to be what Thunderbird expects... - dav::PropertyRequest::GetContentType => dav::Property::GetContentType("text/calendar".into()), - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::SupportedCalendarComponentSet)) - => dav::Property::Extension(all::Property::Cal(cal::Property::SupportedCalendarComponentSet(vec![ - cal::CompSupport(cal::Component::VEvent), - ]))), - v => return Err(v), - }; - Ok(prop) - }).boxed() + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} calendar", calname)) + } + dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ + dav::ResourceType::Collection, + dav::ResourceType::Extension(all::ResourceType::Cal( + cal::ResourceType::Calendar, + )), + ]), + //dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("httpd/unix-directory".into())), + //@FIXME seems wrong but seems to be what Thunderbird expects... + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("text/calendar".into()) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::SupportedCalendarComponentSet, + )) => dav::Property::Extension(all::Property::Cal( + cal::Property::SupportedCalendarComponentSet(vec![cal::CompSupport( + cal::Component::VEvent, + )]), + )), + v => return Err(v), + }; + Ok(prop) + }) + .boxed() } - fn put<'a>(&'a self, _policy: PutPolicy, _stream: Content<'a>) -> BoxFuture<'a, std::result::Result> { + fn put<'a>( + &'a self, + _policy: PutPolicy, + _stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result> { futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() } - fn content<'a>(&'a self) -> Content<'static> { - futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() } fn content_type(&self) -> &str { @@ -386,13 +504,23 @@ pub(crate) struct EventNode { } impl DavNode for EventNode { - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>> { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>> { if path.len() == 0 { let node = Box::new(self.clone()) as Box; - return async { Ok(node) }.boxed() + return async { Ok(node) }.boxed(); } - async { Err(anyhow!("Not supported: can't create a child on an event node")) }.boxed() + async { + Err(anyhow!( + "Not supported: can't create a child on an event node" + )) + } + .boxed() } fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>> { @@ -400,7 +528,10 @@ impl DavNode for EventNode { } fn path(&self, user: &ArcUser) -> String { - format!("/{}/calendar/{}/{}", user.username, self.calname, self.filename) + format!( + "/{}/calendar/{}/{}", + user.username, self.calname, self.filename + ) } fn supported_properties(&self, user: &ArcUser) -> dav::PropName { @@ -408,66 +539,106 @@ impl DavNode for EventNode { dav::PropertyRequest::DisplayName, dav::PropertyRequest::ResourceType, dav::PropertyRequest::GetEtag, - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(cal::CalendarDataRequest::default()))), + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarData(cal::CalendarDataRequest::default()), + )), ]) } fn properties(&self, _user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { let this = self.clone(); - futures::stream::iter(prop.0).then(move |n| { - let this = this.clone(); - - async move { - let prop = match &n { - dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} event", this.filename)), - dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![]), - dav::PropertyRequest::GetContentType => dav::Property::GetContentType("text/calendar".into()), - dav::PropertyRequest::GetEtag => { - let etag = this.etag().await.ok_or(n.clone())?; - dav::Property::GetEtag(etag) - }, - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(_req))) => { - let ics = String::from_utf8(this.col.get(this.blob_id).await.or(Err(n.clone()))?).or(Err(n.clone()))?; - - dav::Property::Extension(all::Property::Cal(cal::Property::CalendarData(cal::CalendarDataPayload { - mime: None, - payload: ics, - }))) - }, - _ => return Err(n), - }; - Ok(prop) - } - }).boxed() - } - - fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result> { + futures::stream::iter(prop.0) + .then(move |n| { + let this = this.clone(); + + async move { + let prop = match &n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} event", this.filename)) + } + dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![]), + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("text/calendar".into()) + } + dav::PropertyRequest::GetEtag => { + let etag = this.etag().await.ok_or(n.clone())?; + dav::Property::GetEtag(etag) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarData(_req), + )) => { + let ics = String::from_utf8( + this.col.get(this.blob_id).await.or(Err(n.clone()))?, + ) + .or(Err(n.clone()))?; + + dav::Property::Extension(all::Property::Cal( + cal::Property::CalendarData(cal::CalendarDataPayload { + mime: None, + payload: ics, + }), + )) + } + _ => return Err(n), + }; + Ok(prop) + } + }) + .boxed() + } + + fn put<'a>( + &'a self, + policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result> { async { - let existing_etag = self.etag().await.ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Etag error"))?; + let existing_etag = self + .etag() + .await + .ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Etag error"))?; match policy { - PutPolicy::CreateOnly => return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)), - PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)), - _ => () + PutPolicy::CreateOnly => { + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)) + } + PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => { + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)) + } + _ => (), }; //@FIXME for now, our storage interface does not allow streaming, // so we load everything in memory let mut evt = Vec::new(); let mut reader = stream.into_async_read(); - reader.read_to_end(&mut evt).await.or(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))?; - let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await.or(Err(std::io::ErrorKind::Interrupted))?; - self.col.opportunistic_sync().await.or(Err(std::io::ErrorKind::ConnectionReset))?; + reader + .read_to_end(&mut evt) + .await + .or(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))?; + let (_token, entry) = self + .col + .put(self.filename.as_str(), evt.as_ref()) + .await + .or(Err(std::io::ErrorKind::Interrupted))?; + self.col + .opportunistic_sync() + .await + .or(Err(std::io::ErrorKind::ConnectionReset))?; Ok(entry.2) - }.boxed() + } + .boxed() } - fn content<'a>(&'a self) -> Content<'static> { + fn content<'a>(&self) -> Content<'a> { //@FIXME for now, our storage interface does not allow streaming, // so we load everything in memory let calendar = self.col.clone(); let blob_id = self.blob_id.clone(); let r = async move { - let content = calendar.get(blob_id).await.or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted))); + let content = calendar + .get(blob_id) + .await + .or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted))); Ok(hyper::body::Bytes::from(content?)) }; futures::stream::once(Box::pin(r)).boxed() @@ -481,8 +652,14 @@ impl DavNode for EventNode { let calendar = self.col.clone(); async move { - calendar.dag().await.table.get(&self.blob_id).map(|(_, _, etag)| etag.to_string()) - }.boxed() + calendar + .dag() + .await + .table + .get(&self.blob_id) + .map(|(_, _, etag)| etag.to_string()) + } + .boxed() } fn delete(&self) -> BoxFuture> { @@ -494,12 +671,16 @@ impl DavNode for EventNode { Ok(v) => v, Err(e) => { tracing::error!(err=?e, "delete event node"); - return Err(std::io::Error::from(std::io::ErrorKind::Interrupted)) - }, + return Err(std::io::Error::from(std::io::ErrorKind::Interrupted)); + } }; - calendar.opportunistic_sync().await.or(Err(std::io::ErrorKind::ConnectionReset))?; + calendar + .opportunistic_sync() + .await + .or(Err(std::io::ErrorKind::ConnectionReset))?; Ok(()) - }.boxed() + } + .boxed() } } @@ -510,13 +691,23 @@ pub(crate) struct CreateEventNode { filename: String, } impl DavNode for CreateEventNode { - fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result>> { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result>> { if path.len() == 0 { let node = Box::new(self.clone()) as Box; - return async { Ok(node) }.boxed() + return async { Ok(node) }.boxed(); } - async { Err(anyhow!("Not supported: can't create a child on an event node")) }.boxed() + async { + Err(anyhow!( + "Not supported: can't create a child on an event node" + )) + } + .boxed() } fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec>> { @@ -524,33 +715,51 @@ impl DavNode for CreateEventNode { } fn path(&self, user: &ArcUser) -> String { - format!("/{}/calendar/{}/{}", user.username, self.calname, self.filename) + format!( + "/{}/calendar/{}/{}", + user.username, self.calname, self.filename + ) } fn supported_properties(&self, user: &ArcUser) -> dav::PropName { dav::PropName(vec![]) } - fn properties(&self, _user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { + fn properties(&self, _user: &ArcUser, prop: dav::PropName) -> PropertyStream<'static> { futures::stream::iter(vec![]).boxed() } - fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result> { + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result> { //@NOTE: policy might not be needed here: whatever we put, there is no known entries here - + async { //@FIXME for now, our storage interface does not allow for streaming let mut evt = Vec::new(); let mut reader = stream.into_async_read(); reader.read_to_end(&mut evt).await.unwrap(); - let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await.or(Err(std::io::ErrorKind::Interrupted))?; - self.col.opportunistic_sync().await.or(Err(std::io::ErrorKind::ConnectionReset))?; + let (_token, entry) = self + .col + .put(self.filename.as_str(), evt.as_ref()) + .await + .or(Err(std::io::ErrorKind::Interrupted))?; + self.col + .opportunistic_sync() + .await + .or(Err(std::io::ErrorKind::ConnectionReset))?; Ok(entry.2) - }.boxed() + } + .boxed() } - fn content(&self) -> Content<'static> { - futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() } fn content_type(&self) -> &str { diff --git a/aero-proto/src/imap/command/anonymous.rs b/aero-proto/src/imap/command/anonymous.rs index 2848c30..f23ec17 100644 --- a/aero-proto/src/imap/command/anonymous.rs +++ b/aero-proto/src/imap/command/anonymous.rs @@ -4,8 +4,8 @@ use imap_codec::imap_types::core::AString; use imap_codec::imap_types::response::Code; use imap_codec::imap_types::secret::Secret; -use aero_user::login::ArcLoginProvider; use aero_collections::user::User; +use aero_user::login::ArcLoginProvider; use crate::imap::capability::ServerCapability; use crate::imap::command::anystate; diff --git a/aero-proto/src/imap/command/authenticated.rs b/aero-proto/src/imap/command/authenticated.rs index 4c8d8c1..5bd34cb 100644 --- a/aero-proto/src/imap/command/authenticated.rs +++ b/aero-proto/src/imap/command/authenticated.rs @@ -14,10 +14,10 @@ use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; use imap_codec::imap_types::response::{Code, CodeOther, Data}; use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName}; +use aero_collections::mail::namespace::MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW; use aero_collections::mail::uidindex::*; -use aero_collections::user::User; use aero_collections::mail::IMF; -use aero_collections::mail::namespace::MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW; +use aero_collections::user::User; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, MailboxName}; diff --git a/aero-proto/src/imap/mod.rs b/aero-proto/src/imap/mod.rs index 7183a78..6a768b0 100644 --- a/aero-proto/src/imap/mod.rs +++ b/aero-proto/src/imap/mod.rs @@ -17,14 +17,14 @@ use std::net::SocketAddr; use anyhow::{anyhow, bail, Result}; use futures::stream::{FuturesUnordered, StreamExt}; -use tokio::net::TcpListener; -use tokio::sync::mpsc; -use tokio::sync::watch; use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status}; use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; use rustls_pemfile::{certs, private_key}; +use tokio::net::TcpListener; +use tokio::sync::mpsc; +use tokio::sync::watch; use tokio_rustls::TlsAcceptor; use aero_user::config::{ImapConfig, ImapUnsecureConfig}; diff --git a/aero-proto/src/lmtp.rs b/aero-proto/src/lmtp.rs index 9d40296..a82a783 100644 --- a/aero-proto/src/lmtp.rs +++ b/aero-proto/src/lmtp.rs @@ -10,16 +10,16 @@ use futures::{ stream::{FuturesOrdered, FuturesUnordered}, StreamExt, }; +use smtp_message::{DataUnescaper, Email, EscapedDataReader, Reply, ReplyCode}; +use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata}; use tokio::net::TcpListener; use tokio::select; use tokio::sync::watch; use tokio_util::compat::*; -use smtp_message::{DataUnescaper, Email, EscapedDataReader, Reply, ReplyCode}; -use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata}; +use aero_collections::mail::incoming::EncryptedMessage; use aero_user::config::*; use aero_user::login::*; -use aero_collections::mail::incoming::EncryptedMessage; pub struct LmtpServer { bind_addr: SocketAddr, diff --git a/aero-proto/src/sasl.rs b/aero-proto/src/sasl.rs index dae89eb..48c0815 100644 --- a/aero-proto/src/sasl.rs +++ b/aero-proto/src/sasl.rs @@ -8,9 +8,9 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::watch; use tokio_util::bytes::BytesMut; +use aero_sasl::{decode::client_command, encode::Encode, flow::State}; use aero_user::config::AuthConfig; use aero_user::login::ArcLoginProvider; -use aero_sasl::{flow::State, decode::client_command, encode::Encode}; pub struct AuthServer { login_provider: ArcLoginProvider, -- cgit v1.2.3