aboutsummaryrefslogtreecommitdiff
path: root/aero-proto/src
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-05-16 17:38:34 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-05-16 17:38:34 +0200
commit32dfd25f570b7a55bf43752684d286be0f6b2dc2 (patch)
treedd77871cda851bb5795743a3f04be61cf4c3ad61 /aero-proto/src
parent6b9542088cd1b66af46e95b787493b601accb495 (diff)
downloadaerogramme-32dfd25f570b7a55bf43752684d286be0f6b2dc2.tar.gz
aerogramme-32dfd25f570b7a55bf43752684d286be0f6b2dc2.zip
format + WIP calendar-query
Diffstat (limited to 'aero-proto/src')
-rw-r--r--aero-proto/src/dav/codec.rs71
-rw-r--r--aero-proto/src/dav/controller.rs200
-rw-r--r--aero-proto/src/dav/middleware.rs28
-rw-r--r--aero-proto/src/dav/mod.rs61
-rw-r--r--aero-proto/src/dav/node.rs68
-rw-r--r--aero-proto/src/dav/resource.rs565
-rw-r--r--aero-proto/src/imap/command/anonymous.rs2
-rw-r--r--aero-proto/src/imap/command/authenticated.rs4
-rw-r--r--aero-proto/src/imap/mod.rs6
-rw-r--r--aero-proto/src/lmtp.rs6
-rw-r--r--aero-proto/src/sasl.rs2
11 files changed, 677 insertions, 336 deletions
diff --git a/aero-proto/src/dav/codec.rs b/aero-proto/src/dav/codec.rs
index 57c3808..a441e7e 100644
--- a/aero-proto/src/dav/codec.rs
+++ b/aero-proto/src/dav/codec.rs
@@ -1,26 +1,30 @@
use anyhow::{bail, Result};
-use hyper::{Request, Response, body::Bytes};
-use hyper::body::Incoming;
-use http_body_util::Full;
+use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
+use http_body_util::combinators::UnsyncBoxBody;
+use http_body_util::BodyExt;
use http_body_util::BodyStream;
+use http_body_util::Full;
use http_body_util::StreamBody;
-use http_body_util::combinators::UnsyncBoxBody;
use hyper::body::Frame;
-use tokio_util::sync::PollSender;
+use hyper::body::Incoming;
+use hyper::{body::Bytes, Request, Response};
use std::io::{Error, ErrorKind};
-use futures::sink::SinkExt;
-use tokio_util::io::{SinkWriter, CopyToBytes};
-use http_body_util::BodyExt;
+use tokio_util::io::{CopyToBytes, SinkWriter};
+use tokio_util::sync::PollSender;
-use aero_dav::types as dav;
-use aero_dav::xml as dxml;
use super::controller::HttpResponse;
use super::node::PutPolicy;
+use aero_dav::types as dav;
+use aero_dav::xml as dxml;
pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth {
- match req.headers().get("Depth").map(hyper::header::HeaderValue::to_str) {
+ match req
+ .headers()
+ .get("Depth")
+ .map(hyper::header::HeaderValue::to_str)
+ {
Some(Ok("0")) => dav::Depth::Zero,
Some(Ok("1")) => dav::Depth::One,
Some(Ok("Infinity")) => dav::Depth::Infinity,
@@ -29,20 +33,28 @@ pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth {
}
pub(crate) fn put_policy(req: &Request<impl hyper::body::Body>) -> Result<PutPolicy> {
- if let Some(maybe_txt_etag) = req.headers().get("If-Match").map(hyper::header::HeaderValue::to_str) {
+ if let Some(maybe_txt_etag) = req
+ .headers()
+ .get("If-Match")
+ .map(hyper::header::HeaderValue::to_str)
+ {
let etag = maybe_txt_etag?;
let dquote_count = etag.chars().filter(|c| *c == '"').count();
if dquote_count != 2 {
bail!("Either If-Match value is invalid or it's not supported (only single etag is supported)");
}
- return Ok(PutPolicy::ReplaceEtag(etag.into()))
+ return Ok(PutPolicy::ReplaceEtag(etag.into()));
}
- if let Some(maybe_txt_etag) = req.headers().get("If-None-Match").map(hyper::header::HeaderValue::to_str) {
+ if let Some(maybe_txt_etag) = req
+ .headers()
+ .get("If-None-Match")
+ .map(hyper::header::HeaderValue::to_str)
+ {
let etag = maybe_txt_etag?;
if etag == "*" {
- return Ok(PutPolicy::CreateOnly)
+ return Ok(PutPolicy::CreateOnly);
}
bail!("Either If-None-Match value is invalid or it's not supported (only asterisk is supported)")
}
@@ -54,7 +66,10 @@ pub(crate) fn text_body(txt: &'static str) -> UnsyncBoxBody<Bytes, std::io::Erro
UnsyncBoxBody::new(Full::new(Bytes::from(txt)).map_err(|e| match e {}))
}
-pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::StatusCode, elem: T) -> Result<HttpResponse> {
+pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(
+ status_ok: hyper::StatusCode,
+ elem: T,
+) -> Result<HttpResponse> {
let (tx, rx) = tokio::sync::mpsc::channel::<Bytes>(1);
// Build the writer
@@ -62,10 +77,21 @@ pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::Stat
let sink = PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::BrokenPipe));
let mut writer = SinkWriter::new(CopyToBytes::new(sink));
let q = quick_xml::writer::Writer::new_with_indent(&mut writer, b' ', 4);
- let ns_to_apply = vec![ ("xmlns:D".into(), "DAV:".into()), ("xmlns:C".into(), "urn:ietf:params:xml:ns:caldav".into()) ];
+ let ns_to_apply = vec![
+ ("xmlns:D".into(), "DAV:".into()),
+ ("xmlns:C".into(), "urn:ietf:params:xml:ns:caldav".into()),
+ ];
let mut qwriter = dxml::Writer { q, ns_to_apply };
- let decl = quick_xml::events::BytesDecl::from_start(quick_xml::events::BytesStart::from_content("xml version=\"1.0\" encoding=\"utf-8\"", 0));
- match qwriter.q.write_event_async(quick_xml::events::Event::Decl(decl)).await {
+ let decl =
+ quick_xml::events::BytesDecl::from_start(quick_xml::events::BytesStart::from_content(
+ "xml version=\"1.0\" encoding=\"utf-8\"",
+ 0,
+ ));
+ match qwriter
+ .q
+ .write_event_async(quick_xml::events::Event::Decl(decl))
+ .await
+ {
Ok(_) => (),
Err(e) => tracing::error!(err=?e, "unable to write XML declaration <?xml ... >"),
}
@@ -75,7 +101,6 @@ pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::Stat
}
});
-
// Build the reader
let recv = tokio_stream::wrappers::ReceiverStream::new(rx);
let stream = StreamBody::new(recv.map(|v| Ok(Frame::data(v))));
@@ -89,7 +114,6 @@ pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::Stat
Ok(response)
}
-
/// Deserialize a request body to an XML request
pub(crate) async fn deserialize<T: dxml::Node<T>>(req: Request<Incoming>) -> Result<T> {
let stream_of_frames = BodyStream::new(req.into_body());
@@ -97,7 +121,10 @@ pub(crate) async fn deserialize<T: dxml::Node<T>>(req: Request<Incoming>) -> Res
.map_ok(|frame| frame.into_data())
.map(|obj| match obj {
Ok(Ok(v)) => Ok(v),
- Ok(Err(_)) => Err(std::io::Error::new(std::io::ErrorKind::Other, "conversion error")),
+ Ok(Err(_)) => Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "conversion error",
+ )),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
});
let async_read = tokio_util::io::StreamReader::new(stream_of_bytes);
diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs
index 885828f..0bf7a7d 100644
--- a/aero-proto/src/dav/controller.rs
+++ b/aero-proto/src/dav/controller.rs
@@ -1,21 +1,21 @@
use anyhow::Result;
-use http_body_util::combinators::{UnsyncBoxBody, BoxBody};
-use hyper::body::Incoming;
-use hyper::{Request, Response, body::Bytes};
+use futures::stream::{StreamExt, TryStreamExt};
+use http_body_util::combinators::{BoxBody, UnsyncBoxBody};
use http_body_util::BodyStream;
use http_body_util::StreamBody;
use hyper::body::Frame;
-use futures::stream::{StreamExt, TryStreamExt};
+use hyper::body::Incoming;
+use hyper::{body::Bytes, Request, Response};
use aero_collections::user::User;
-use aero_dav::types as dav;
-use aero_dav::realization::All;
use aero_dav::caltypes as cal;
+use aero_dav::realization::All;
+use aero_dav::types as dav;
-use crate::dav::codec::{serialize, deserialize, depth, text_body};
+use crate::dav::codec;
+use crate::dav::codec::{depth, deserialize, serialize, text_body};
use crate::dav::node::{DavNode, PutPolicy};
use crate::dav::resource::RootNode;
-use crate::dav::codec;
pub(super) type ArcUser = std::sync::Arc<User>;
pub(super) type HttpResponse = Response<UnsyncBoxBody<Bytes, std::io::Error>>;
@@ -39,19 +39,22 @@ pub(crate) struct Controller {
req: Request<Incoming>,
}
impl Controller {
- pub(crate) async fn route(user: std::sync::Arc<User>, req: Request<Incoming>) -> Result<HttpResponse> {
+ pub(crate) async fn route(
+ user: std::sync::Arc<User>,
+ req: Request<Incoming>,
+ ) -> Result<HttpResponse> {
let path = req.uri().path().to_string();
let path_segments: Vec<_> = path.split("/").filter(|s| *s != "").collect();
let method = req.method().as_str().to_uppercase();
let can_create = matches!(method.as_str(), "PUT" | "MKCOL" | "MKCALENDAR");
- let node = match (RootNode {}).fetch(&user, &path_segments, can_create).await{
+ let node = match (RootNode {}).fetch(&user, &path_segments, can_create).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(err=?e, "dav node fetch failed");
return Ok(Response::builder()
.status(404)
- .body(codec::text_body("Resource not found"))?)
+ .body(codec::text_body("Resource not found"))?);
}
};
@@ -80,7 +83,6 @@ impl Controller {
}
}
-
// --- Per-method functions ---
/// REPORT has been first described in the "Versioning Extension" of WebDAV
@@ -89,7 +91,7 @@ impl Controller {
/// Note: current implementation is not generic at all, it is heavily tied to CalDAV.
/// A rewrite would be required to make it more generic (with the extension system that has
/// been introduced in aero-dav)
- async fn report(self) -> Result<HttpResponse> {
+ async fn report(self) -> Result<HttpResponse> {
let status = hyper::StatusCode::from_u16(207)?;
let report = match deserialize::<cal::Report<All>>(self.req).await {
@@ -97,54 +99,75 @@ impl Controller {
Err(e) => {
tracing::error!(err=?e, "unable to decode REPORT body");
return Ok(Response::builder()
- .status(400)
- .body(text_body("Bad request"))?)
+ .status(400)
+ .body(text_body("Bad request"))?);
}
};
- // Multiget is really like a propfind where Depth: 0|1|Infinity is replaced by an arbitrary
- // list of URLs
- // @FIXME
- let multiget = match report {
- cal::Report::Multiget(m) => m,
- cal::Report::Query(q) => todo!(),
- cal::Report::FreeBusy(_) => return Ok(Response::builder()
- .status(501)
- .body(text_body("Not implemented"))?),
- };
-
- // Getting the list of nodes
+ // Internal representation that will handle processed request
let (mut ok_node, mut not_found) = (Vec::new(), Vec::new());
- for h in multiget.href.into_iter() {
- let maybe_collected_node = match Path::new(h.0.as_str()) {
- Ok(Path::Abs(p)) => RootNode{}.fetch(&self.user, p.as_slice(), false).await.or(Err(h)),
- Ok(Path::Rel(p)) => self.node.fetch(&self.user, p.as_slice(), false).await.or(Err(h)),
- Err(_) => Err(h),
- };
-
- match maybe_collected_node {
- Ok(v) => ok_node.push(v),
- Err(h) => not_found.push(h),
- };
- }
+ let calprop: Option<cal::CalendarSelector<All>>;
+
+ // Extracting request information
+ match report {
+ cal::Report::Multiget(m) => {
+ // Multiget is really like a propfind where Depth: 0|1|Infinity is replaced by an arbitrary
+ // list of URLs
+ // Getting the list of nodes
+ for h in m.href.into_iter() {
+ let maybe_collected_node = match Path::new(h.0.as_str()) {
+ Ok(Path::Abs(p)) => RootNode {}
+ .fetch(&self.user, p.as_slice(), false)
+ .await
+ .or(Err(h)),
+ Ok(Path::Rel(p)) => self
+ .node
+ .fetch(&self.user, p.as_slice(), false)
+ .await
+ .or(Err(h)),
+ Err(_) => Err(h),
+ };
+
+ match maybe_collected_node {
+ Ok(v) => ok_node.push(v),
+ Err(h) => not_found.push(h),
+ };
+ }
+ calprop = m.selector;
+ }
+ cal::Report::Query(q) => {
+ calprop = q.selector;
+ ok_node = apply_filter(&self.user, self.node.children(&self.user).await, q.filter)
+ .try_collect()
+ .await?;
+ }
+ cal::Report::FreeBusy(_) => {
+ return Ok(Response::builder()
+ .status(501)
+ .body(text_body("Not implemented"))?)
+ }
+ };
// Getting props
- let props = match multiget.selector {
+ let props = match calprop {
None | Some(cal::CalendarSelector::AllProp) => Some(dav::PropName(ALLPROP.to_vec())),
Some(cal::CalendarSelector::PropName) => None,
Some(cal::CalendarSelector::Prop(inner)) => Some(inner),
};
- serialize(status, Self::multistatus(&self.user, ok_node, not_found, props).await)
+ serialize(
+ status,
+ Self::multistatus(&self.user, ok_node, not_found, props).await,
+ )
}
/// PROPFIND is the standard way to fetch WebDAV properties
- async fn propfind(self) -> Result<HttpResponse> {
+ async fn propfind(self) -> Result<HttpResponse> {
let depth = depth(&self.req);
if matches!(depth, dav::Depth::Infinity) {
return Ok(Response::builder()
.status(501)
- .body(text_body("Depth: Infinity not implemented"))?)
+ .body(text_body("Depth: Infinity not implemented"))?);
}
let status = hyper::StatusCode::from_u16(207)?;
@@ -153,7 +176,9 @@ impl Controller {
// request body MUST be treated as if it were an 'allprop' request.
// @FIXME here we handle any invalid data as an allprop, an empty request is thus correctly
// handled, but corrupted requests are also silently handled as allprop.
- let propfind = deserialize::<dav::PropFind<All>>(self.req).await.unwrap_or_else(|_| dav::PropFind::<All>::AllProp(None));
+ let propfind = deserialize::<dav::PropFind<All>>(self.req)
+ .await
+ .unwrap_or_else(|_| dav::PropFind::<All>::AllProp(None));
tracing::debug!(recv=?propfind, "inferred propfind request");
// Collect nodes as PROPFIND is not limited to the targeted node
@@ -170,29 +195,36 @@ impl Controller {
dav::PropFind::AllProp(Some(dav::Include(mut include))) => {
include.extend_from_slice(&ALLPROP);
Some(dav::PropName(include))
- },
+ }
dav::PropFind::Prop(inner) => Some(inner),
};
// Not Found is currently impossible considering the way we designed this function
let not_found = vec![];
- serialize(status, Self::multistatus(&self.user, nodes, not_found, propname).await)
+ serialize(
+ status,
+ Self::multistatus(&self.user, nodes, not_found, propname).await,
+ )
}
- async fn put(self) -> Result<HttpResponse> {
+ async fn put(self) -> Result<HttpResponse> {
let put_policy = codec::put_policy(&self.req)?;
let stream_of_frames = BodyStream::new(self.req.into_body());
let stream_of_bytes = stream_of_frames
- .map_ok(|frame| frame.into_data())
- .map(|obj| match obj {
- Ok(Ok(v)) => Ok(v),
- Ok(Err(_)) => Err(std::io::Error::new(std::io::ErrorKind::Other, "conversion error")),
- Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
- }).boxed();
+ .map_ok(|frame| frame.into_data())
+ .map(|obj| match obj {
+ Ok(Ok(v)) => Ok(v),
+ Ok(Err(_)) => Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "conversion error",
+ )),
+ Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
+ })
+ .boxed();
let etag = self.node.put(put_policy, stream_of_bytes).await?;
-
+
let response = Response::builder()
.status(201)
.header("ETag", etag)
@@ -202,7 +234,7 @@ impl Controller {
Ok(response)
}
- async fn get(self) -> Result<HttpResponse> {
+ async fn get(self) -> Result<HttpResponse> {
let stream_body = StreamBody::new(self.node.content().map_ok(|v| Frame::data(v)));
let boxed_body = UnsyncBoxBody::new(stream_body);
@@ -227,17 +259,33 @@ impl Controller {
// --- Common utility functions ---
/// Build a multistatus response from a list of DavNodes
- async fn multistatus(user: &ArcUser, nodes: Vec<Box<dyn DavNode>>, not_found: Vec<dav::Href>, props: Option<dav::PropName<All>>) -> dav::Multistatus<All> {
+ async fn multistatus(
+ user: &ArcUser,
+ nodes: Vec<Box<dyn DavNode>>,
+ not_found: Vec<dav::Href>,
+ props: Option<dav::PropName<All>>,
+ ) -> dav::Multistatus<All> {
// Collect properties on existing objects
let mut responses: Vec<dav::Response<All>> = match props {
- Some(props) => futures::stream::iter(nodes).then(|n| n.response_props(user, props.clone())).collect().await,
- None => nodes.into_iter().map(|n| n.response_propname(user)).collect(),
+ Some(props) => {
+ futures::stream::iter(nodes)
+ .then(|n| n.response_props(user, props.clone()))
+ .collect()
+ .await
+ }
+ None => nodes
+ .into_iter()
+ .map(|n| n.response_propname(user))
+ .collect(),
};
// Register not found objects only if relevant
if !not_found.is_empty() {
responses.push(dav::Response {
- status_or_propstat: dav::StatusOrPropstat::Status(not_found, dav::Status(hyper::StatusCode::NOT_FOUND)),
+ status_or_propstat: dav::StatusOrPropstat::Status(
+ not_found,
+ dav::Status(hyper::StatusCode::NOT_FOUND),
+ ),
error: None,
location: None,
responsedescription: None,
@@ -252,7 +300,6 @@ impl Controller {
}
}
-
/// Path is a voluntarily feature limited
/// compared to the expressiveness of a UNIX path
/// For example getting parent with ../ is not supported, scheme is not supported, etc.
@@ -271,8 +318,39 @@ impl<'a> Path<'a> {
let path_segments: Vec<_> = path.split("/").filter(|s| *s != "" && *s != ".").collect();
if path.starts_with("/") {
- return Ok(Path::Abs(path_segments))
+ return Ok(Path::Abs(path_segments));
}
Ok(Path::Rel(path_segments))
}
}
+
+//@FIXME move somewhere else
+//@FIXME naive implementation, must be refactored later
+use futures::stream::Stream;
+use icalendar;
+fn apply_filter(
+ user: &ArcUser,
+ nodes: Vec<Box<dyn DavNode>>,
+ filter: cal::Filter,
+) -> impl Stream<Item = std::result::Result<Box<dyn DavNode>, std::io::Error>> {
+ futures::stream::iter(nodes).filter_map(|single_node| async move {
+ // Get ICS
+ let chunks: Vec<_> = match single_node.content().try_collect().await {
+ Ok(v) => v,
+ Err(e) => return Some(Err(e)),
+ };
+ let raw_ics = chunks.iter().fold(String::new(), |mut acc, single_chunk| {
+ let str_fragment = std::str::from_utf8(single_chunk.as_ref());
+ acc.extend(str_fragment);
+ acc
+ });
+
+ // Parse ICS
+ let ics = icalendar::parser::read_calendar(&raw_ics).unwrap();
+
+ // Do checks
+
+ // Object has been kept
+ Some(Ok(single_node))
+ })
+}
diff --git a/aero-proto/src/dav/middleware.rs b/aero-proto/src/dav/middleware.rs
index e19ce14..8964699 100644
--- a/aero-proto/src/dav/middleware.rs
+++ b/aero-proto/src/dav/middleware.rs
@@ -1,10 +1,10 @@
use anyhow::{anyhow, Result};
use base64::Engine;
-use hyper::{Request, Response};
use hyper::body::Incoming;
+use hyper::{Request, Response};
-use aero_user::login::ArcLoginProvider;
use aero_collections::user::User;
+use aero_user::login::ArcLoginProvider;
use super::codec::text_body;
use super::controller::HttpResponse;
@@ -13,7 +13,7 @@ type ArcUser = std::sync::Arc<User>;
pub(super) async fn auth<'a>(
login: ArcLoginProvider,
- req: Request<Incoming>,
+ req: Request<Incoming>,
next: impl Fn(ArcUser, Request<Incoming>) -> futures::future::BoxFuture<'a, Result<HttpResponse>>,
) -> Result<HttpResponse> {
let auth_val = match req.headers().get(hyper::header::AUTHORIZATION) {
@@ -23,8 +23,8 @@ pub(super) async fn auth<'a>(
return Ok(Response::builder()
.status(401)
.header("WWW-Authenticate", "Basic realm=\"Aerogramme\"")
- .body(text_body("Missing Authorization field"))?)
- },
+ .body(text_body("Missing Authorization field"))?);
+ }
};
let b64_creds_maybe_padded = match auth_val.split_once(" ") {
@@ -33,8 +33,8 @@ pub(super) async fn auth<'a>(
tracing::info!("Unsupported authorization field");
return Ok(Response::builder()
.status(400)
- .body(text_body("Unsupported Authorization field"))?)
- },
+ .body(text_body("Unsupported Authorization field"))?);
+ }
};
// base64urlencoded may have trailing equals, base64urlsafe has not
@@ -44,22 +44,22 @@ pub(super) async fn auth<'a>(
// Decode base64
let creds = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64_creds_clean)?;
let str_creds = std::str::from_utf8(&creds)?;
-
+
// Split username and password
- let (username, password) = str_creds
- .split_once(':')
- .ok_or(anyhow!("Missing colon in Authorization, can't split decoded value into a username/password pair"))?;
+ let (username, password) = str_creds.split_once(':').ok_or(anyhow!(
+ "Missing colon in Authorization, can't split decoded value into a username/password pair"
+ ))?;
// Call login provider
let creds = match login.login(username, password).await {
Ok(c) => c,
Err(_) => {
- tracing::info!(user=username, "Wrong credentials");
+ tracing::info!(user = username, "Wrong credentials");
return Ok(Response::builder()
.status(401)
.header("WWW-Authenticate", "Basic realm=\"Aerogramme\"")
- .body(text_body("Wrong credentials"))?)
- },
+ .body(text_body("Wrong credentials"))?);
+ }
};
// Build a user
diff --git a/aero-proto/src/dav/mod.rs b/aero-proto/src/dav/mod.rs
index de2e690..43de3a5 100644
--- a/aero-proto/src/dav/mod.rs
+++ b/aero-proto/src/dav/mod.rs
@@ -1,6 +1,6 @@
-mod middleware;
-mod controller;
mod codec;
+mod controller;
+mod middleware;
mod node;
mod resource;
@@ -8,19 +8,19 @@ use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Result;
+use futures::future::FutureExt;
+use futures::stream::{FuturesUnordered, StreamExt};
+use hyper::rt::{Read, Write};
+use hyper::server::conn::http1 as http;
use hyper::service::service_fn;
use hyper::{Request, Response};
-use hyper::server::conn::http1 as http;
-use hyper::rt::{Read, Write};
use hyper_util::rt::TokioIo;
-use futures::stream::{FuturesUnordered, StreamExt};
+use rustls_pemfile::{certs, private_key};
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
+use tokio::net::TcpStream;
use tokio::sync::watch;
use tokio_rustls::TlsAcceptor;
-use tokio::net::TcpStream;
-use futures::future::FutureExt;
-use tokio::io::{AsyncRead, AsyncWrite};
-use rustls_pemfile::{certs, private_key};
use aero_user::config::{DavConfig, DavUnsecureConfig};
use aero_user::login::ArcLoginProvider;
@@ -90,7 +90,7 @@ impl Server {
Ok(v) => v,
Err(e) => {
tracing::error!(err=?e, "TLS acceptor failed");
- continue
+ continue;
}
};
@@ -100,21 +100,31 @@ impl Server {
//abitrarily bound
//@FIXME replace with a handler supporting http2 and TLS
- match http::Builder::new().serve_connection(stream, service_fn(|req: Request<hyper::body::Incoming>| {
- let login = login.clone();
- tracing::info!("{:?} {:?}", req.method(), req.uri());
- async {
- match middleware::auth(login, req, |user, request| async { Controller::route(user, request).await }.boxed()).await {
- Ok(v) => Ok(v),
- Err(e) => {
- tracing::error!(err=?e, "internal error");
- Response::builder()
- .status(500)
- .body(codec::text_body("Internal error"))
- },
- }
- }
- })).await {
+ match http::Builder::new()
+ .serve_connection(
+ stream,
+ service_fn(|req: Request<hyper::body::Incoming>| {
+ let login = login.clone();
+ tracing::info!("{:?} {:?}", req.method(), req.uri());
+ async {
+ match middleware::auth(login, req, |user, request| {
+ async { Controller::route(user, request).await }.boxed()
+ })
+ .await
+ {
+ Ok(v) => Ok(v),
+ Err(e) => {
+ tracing::error!(err=?e, "internal error");
+ Response::builder()
+ .status(500)
+ .body(codec::text_body("Internal error"))
+ }
+ }
+ }
+ }),
+ )
+ .await
+ {
Err(e) => tracing::warn!(err=?e, "connection failed"),
Ok(()) => tracing::trace!("connection terminated with success"),
}
@@ -149,7 +159,6 @@ impl Server {
// </D:prop>
// </D:propfind>
-
// <D:propfind xmlns:D='DAV:' xmlns:A='http://apple.com/ns/ical/' xmlns:C='urn:ietf:params:xml:ns:caldav'>
// <D:prop>
// <D:resourcetype/>
diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs
index d246280..877342a 100644
--- a/aero-proto/src/dav/node.rs
+++ b/aero-proto/src/dav/node.rs
@@ -1,16 +1,17 @@
use anyhow::Result;
-use futures::stream::{BoxStream, StreamExt};
use futures::future::{BoxFuture, FutureExt};
+use futures::stream::{BoxStream, StreamExt};
use hyper::body::Bytes;
-use aero_dav::types as dav;
-use aero_dav::realization::All;
use aero_collections::davdag::Etag;
+use aero_dav::realization::All;
+use aero_dav::types as dav;
use super::controller::ArcUser;
pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>;
-pub(crate) type PropertyStream<'a> = BoxStream<'a, std::result::Result<dav::Property<All>, dav::PropertyRequest<All>>>;
+pub(crate) type PropertyStream<'a> =
+ BoxStream<'a, std::result::Result<dav::Property<All>, dav::PropertyRequest<All>>>;
pub(crate) enum PutPolicy {
OverwriteAll,
@@ -25,7 +26,12 @@ pub(crate) trait DavNode: Send {
/// This node direct children
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>>;
/// Recursively fetch a child (progress inside the filesystem hierarchy)
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>>;
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>>;
// node properties
/// Get the path
@@ -36,13 +42,17 @@ pub(crate) trait DavNode: Send {
fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static>;
//fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> Vec<dav::AnyProperty<All>>;
/// Put an element (create or update)
- fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>>;
+ fn put<'a>(
+ &'a self,
+ policy: PutPolicy,
+ stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>>;
/// Content type of the element
fn content_type(&self) -> &str;
/// Get ETag
fn etag(&self) -> BoxFuture<Option<Etag>>;
/// Get content
- fn content(&self) -> Content<'static>;
+ fn content<'a>(&self) -> Content<'a>;
/// Delete
fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>>;
@@ -52,24 +62,32 @@ pub(crate) trait DavNode: Send {
fn response_propname(&self, user: &ArcUser) -> dav::Response<All> {
dav::Response {
status_or_propstat: dav::StatusOrPropstat::PropStat(
- dav::Href(self.path(user)),
- vec![
- dav::PropStat {
- status: dav::Status(hyper::StatusCode::OK),
- prop: dav::AnyProp(self.supported_properties(user).0.into_iter().map(dav::AnyProperty::Request).collect()),
- error: None,
- responsedescription: None,
- }
- ],
+ dav::Href(self.path(user)),
+ vec![dav::PropStat {
+ status: dav::Status(hyper::StatusCode::OK),
+ prop: dav::AnyProp(
+ self.supported_properties(user)
+ .0
+ .into_iter()
+ .map(dav::AnyProperty::Request)
+ .collect(),
+ ),
+ error: None,
+ responsedescription: None,
+ }],
),
error: None,
location: None,
- responsedescription: None
+ responsedescription: None,
}
}
/// Utility function to get a prop response from a node & a list of propname
- fn response_props(&self, user: &ArcUser, props: dav::PropName<All>) -> BoxFuture<'static, dav::Response<All>> {
+ fn response_props(
+ &self,
+ user: &ArcUser,
+ props: dav::PropName<All>,
+ ) -> BoxFuture<'static, dav::Response<All>> {
//@FIXME we should make the DAV parsed object a stream...
let mut result_stream = self.properties(user, props);
let path = self.path(user);
@@ -87,8 +105,8 @@ pub(crate) trait DavNode: Send {
// If at least one property has been found on this object, adding a HTTP 200 propstat to
// the response
if !found.is_empty() {
- prop_desc.push(dav::PropStat {
- status: dav::Status(hyper::StatusCode::OK),
+ prop_desc.push(dav::PropStat {
+ status: dav::Status(hyper::StatusCode::OK),
prop: dav::AnyProp(found),
error: None,
responsedescription: None,
@@ -98,8 +116,8 @@ pub(crate) trait DavNode: Send {
// If at least one property can't be found on this object, adding a HTTP 404 propstat to
// the response
if !not_found.is_empty() {
- prop_desc.push(dav::PropStat {
- status: dav::Status(hyper::StatusCode::NOT_FOUND),
+ prop_desc.push(dav::PropStat {
+ status: dav::Status(hyper::StatusCode::NOT_FOUND),
prop: dav::AnyProp(not_found),
error: None,
responsedescription: None,
@@ -111,9 +129,9 @@ pub(crate) trait DavNode: Send {
status_or_propstat: dav::StatusOrPropstat::PropStat(dav::Href(path), prop_desc),
error: None,
location: None,
- responsedescription: None
+ responsedescription: None,
}
- }.boxed()
+ }
+ .boxed()
}
}
-
diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs
index 944c6c8..d65ce38 100644
--- a/aero-proto/src/dav/resource.rs
+++ b/aero-proto/src/dav/resource.rs
@@ -2,23 +2,32 @@ use std::sync::Arc;
type ArcUser = std::sync::Arc<User>;
use anyhow::{anyhow, Result};
-use futures::stream::{TryStreamExt, StreamExt};
use futures::io::AsyncReadExt;
+use futures::stream::{StreamExt, TryStreamExt};
use futures::{future::BoxFuture, future::FutureExt};
-use aero_collections::{user::User, calendar::Calendar, davdag::{BlobId, Etag}};
-use aero_dav::types as dav;
-use aero_dav::caltypes as cal;
+use aero_collections::{
+ calendar::Calendar,
+ davdag::{BlobId, Etag},
+ user::User,
+};
use aero_dav::acltypes as acl;
-use aero_dav::realization::{All, self as all};
+use aero_dav::caltypes as cal;
+use aero_dav::realization::{self as all, All};
+use aero_dav::types as dav;
-use crate::dav::node::{DavNode, PutPolicy, Content};
use super::node::PropertyStream;
+use crate::dav::node::{Content, DavNode, PutPolicy};
#[derive(Clone)]
pub(crate) struct RootNode {}
impl DavNode for RootNode {
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
let this = self.clone();
return async { Ok(Box::new(this) as Box<dyn DavNode>) }.boxed();
@@ -34,7 +43,7 @@ impl DavNode for RootNode {
}
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> {
- async { vec![Box::new(HomeNode { }) as Box<dyn DavNode>] }.boxed()
+ async { vec![Box::new(HomeNode {}) as Box<dyn DavNode>] }.boxed()
}
fn path(&self, user: &ArcUser) -> String {
@@ -46,33 +55,53 @@ impl DavNode for RootNode {
dav::PropertyRequest::DisplayName,
dav::PropertyRequest::ResourceType,
dav::PropertyRequest::GetContentType,
- dav::PropertyRequest::Extension(all::PropertyRequest::Acl(acl::PropertyRequest::CurrentUserPrincipal)),
+ dav::PropertyRequest::Extension(all::PropertyRequest::Acl(
+ acl::PropertyRequest::CurrentUserPrincipal,
+ )),
])
}
fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
let user = user.clone();
- futures::stream::iter(prop.0).map(move |n| {
- let prop = match n {
- dav::PropertyRequest::DisplayName => dav::Property::DisplayName("DAV Root".to_string()),
- dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![
- dav::ResourceType::Collection,
- ]),
- dav::PropertyRequest::GetContentType => dav::Property::GetContentType("httpd/unix-directory".into()),
- dav::PropertyRequest::Extension(all::PropertyRequest::Acl(acl::PropertyRequest::CurrentUserPrincipal)) =>
- dav::Property::Extension(all::Property::Acl(acl::Property::CurrentUserPrincipal(acl::User::Authenticated(dav::Href(HomeNode{}.path(&user)))))),
- v => return Err(v),
- };
- Ok(prop)
- }).boxed()
+ futures::stream::iter(prop.0)
+ .map(move |n| {
+ let prop = match n {
+ dav::PropertyRequest::DisplayName => {
+ dav::Property::DisplayName("DAV Root".to_string())
+ }
+ dav::PropertyRequest::ResourceType => {
+ dav::Property::ResourceType(vec![dav::ResourceType::Collection])
+ }
+ dav::PropertyRequest::GetContentType => {
+ dav::Property::GetContentType("httpd/unix-directory".into())
+ }
+ dav::PropertyRequest::Extension(all::PropertyRequest::Acl(
+ acl::PropertyRequest::CurrentUserPrincipal,
+ )) => dav::Property::Extension(all::Property::Acl(
+ acl::Property::CurrentUserPrincipal(acl::User::Authenticated(dav::Href(
+ HomeNode {}.path(&user),
+ ))),
+ )),
+ v => return Err(v),
+ };
+ Ok(prop)
+ })
+ .boxed()
}
- fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
+ fn put<'a>(
+ &'a self,
+ _policy: PutPolicy,
+ stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed()
}
- fn content(&self) -> Content<'static> {
- futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
+ fn content<'a>(&self) -> Content<'a> {
+ futures::stream::once(futures::future::err(std::io::Error::from(
+ std::io::ErrorKind::Unsupported,
+ )))
+ .boxed()
}
fn content_type(&self) -> &str {
@@ -91,29 +120,37 @@ impl DavNode for RootNode {
#[derive(Clone)]
pub(crate) struct HomeNode {}
impl DavNode for HomeNode {
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
let node = Box::new(self.clone()) as Box<dyn DavNode>;
- return async { Ok(node) }.boxed()
+ return async { Ok(node) }.boxed();
}
if path[0] == "calendar" {
return async move {
let child = Box::new(CalendarListNode::new(user).await?);
child.fetch(user, &path[1..], create).await
- }.boxed();
+ }
+ .boxed();
}
-
+
//@NOTE: we can't create a node at this level
async { Err(anyhow!("Not found")) }.boxed()
}
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> {
- async {
- CalendarListNode::new(user).await
+ async {
+ CalendarListNode::new(user)
+ .await
.map(|c| vec![Box::new(c) as Box<dyn DavNode>])
- .unwrap_or(vec![])
- }.boxed()
+ .unwrap_or(vec![])
+ }
+ .boxed()
}
fn path(&self, user: &ArcUser) -> String {
@@ -125,38 +162,58 @@ impl DavNode for HomeNode {
dav::PropertyRequest::DisplayName,
dav::PropertyRequest::ResourceType,
dav::PropertyRequest::GetContentType,
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarHomeSet)),
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(
+ cal::PropertyRequest::CalendarHomeSet,
+ )),
])
}
fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
let user = user.clone();
- futures::stream::iter(prop.0).map(move |n| {
- let prop = match n {
- dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} home", user.username)),
- dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![
- dav::ResourceType::Collection,
- dav::ResourceType::Extension(all::ResourceType::Acl(acl::ResourceType::Principal)),
- ]),
- dav::PropertyRequest::GetContentType => dav::Property::GetContentType("httpd/unix-directory".into()),
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarHomeSet)) =>
- dav::Property::Extension(all::Property::Cal(cal::Property::CalendarHomeSet(dav::Href(
- //@FIXME we are hardcoding the calendar path, instead we would want to use
- //objects
- format!("/{}/calendar/", user.username)
- )))),
- v => return Err(v),
- };
- Ok(prop)
- }).boxed()
+ futures::stream::iter(prop.0)
+ .map(move |n| {
+ let prop = match n {
+ dav::PropertyRequest::DisplayName => {
+ dav::Property::DisplayName(format!("{} home", user.username))
+ }
+ dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![
+ dav::ResourceType::Collection,
+ dav::ResourceType::Extension(all::ResourceType::Acl(
+ acl::ResourceType::Principal,
+ )),
+ ]),
+ dav::PropertyRequest::GetContentType => {
+ dav::Property::GetContentType("httpd/unix-directory".into())
+ }
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(
+ cal::PropertyRequest::CalendarHomeSet,
+ )) => dav::Property::Extension(all::Property::Cal(
+ cal::Property::CalendarHomeSet(dav::Href(
+ //@FIXME we are hardcoding the calendar path, instead we would want to use
+ //objects
+ format!("/{}/calendar/", user.username),
+ )),
+ )),
+ v => return Err(v),
+ };
+ Ok(prop)
+ })
+ .boxed()
}
- fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
+ fn put<'a>(
+ &'a self,
+ _policy: PutPolicy,
+ stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed()
}
-
- fn content(&self) -> Content<'static> {
- futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
+
+ fn content<'a>(&self) -> Content<'a> {
+ futures::stream::once(futures::future::err(std::io::Error::from(
+ std::io::ErrorKind::Unsupported,
+ )))
+ .boxed()
}
fn content_type(&self) -> &str {
@@ -183,7 +240,12 @@ impl CalendarListNode {
}
}
impl DavNode for CalendarListNode {
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
let node = Box::new(self.clone()) as Box<dyn DavNode>;
return async { Ok(node) }.boxed();
@@ -191,13 +253,18 @@ impl DavNode for CalendarListNode {
async move {
//@FIXME: we should create a node if the open returns a "not found".
- let cal = user.calendars.open(user, path[0]).await?.ok_or(anyhow!("Not found"))?;
- let child = Box::new(CalendarNode {
+ let cal = user
+ .calendars
+ .open(user, path[0])
+ .await?
+ .ok_or(anyhow!("Not found"))?;
+ let child = Box::new(CalendarNode {
col: cal,
- calname: path[0].to_string()
+ calname: path[0].to_string(),
});
child.fetch(user, &path[1..], create).await
- }.boxed()
+ }
+ .boxed()
}
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> {
@@ -206,18 +273,23 @@ impl DavNode for CalendarListNode {
//@FIXME maybe we want to be lazy here?!
futures::stream::iter(list.iter())
.filter_map(|name| async move {
- user.calendars.open(user, name).await
+ user.calendars
+ .open(user, name)
+ .await
.ok()
.flatten()
.map(|v| (name, v))
})
- .map(|(name, cal)| Box::new(CalendarNode {
- col: cal,
- calname: name.to_string(),
- }) as Box<dyn DavNode>)
+ .map(|(name, cal)| {
+ Box::new(CalendarNode {
+ col: cal,
+ calname: name.to_string(),
+ }) as Box<dyn DavNode>
+ })
.collect::<Vec<Box<dyn DavNode>>>()
.await
- }.boxed()
+ }
+ .boxed()
}
fn path(&self, user: &ArcUser) -> String {
@@ -234,23 +306,38 @@ impl DavNode for CalendarListNode {
fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
let user = user.clone();
- futures::stream::iter(prop.0).map(move |n| {
- let prop = match n {
- dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} calendars", user.username)),
- dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![dav::ResourceType::Collection]),
- dav::PropertyRequest::GetContentType => dav::Property::GetContentType("httpd/unix-directory".into()),
- v => return Err(v),
- };
- Ok(prop)
- }).boxed()
+ futures::stream::iter(prop.0)
+ .map(move |n| {
+ let prop = match n {
+ dav::PropertyRequest::DisplayName => {
+ dav::Property::DisplayName(format!("{} calendars", user.username))
+ }
+ dav::PropertyRequest::ResourceType => {
+ dav::Property::ResourceType(vec![dav::ResourceType::Collection])
+ }
+ dav::PropertyRequest::GetContentType => {
+ dav::Property::GetContentType("httpd/unix-directory".into())
+ }
+ v => return Err(v),
+ };
+ Ok(prop)
+ })
+ .boxed()
}
- fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
+ fn put<'a>(
+ &'a self,
+ _policy: PutPolicy,
+ stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed()
}
- fn content(&self) -> Content<'static> {
- futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
+ fn content<'a>(&self) -> Content<'a> {
+ futures::stream::once(futures::future::err(std::io::Error::from(
+ std::io::ErrorKind::Unsupported,
+ )))
+ .boxed()
}
fn content_type(&self) -> &str {
@@ -272,17 +359,22 @@ pub(crate) struct CalendarNode {
calname: String,
}
impl DavNode for CalendarNode {
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
let node = Box::new(self.clone()) as Box<dyn DavNode>;
- return async { Ok(node) }.boxed()
+ return async { Ok(node) }.boxed();
}
let col = self.col.clone();
let calname = self.calname.clone();
async move {
match (col.dag().await.idx_by_filename.get(path[0]), create) {
- (Some(blob_id), _) => {
+ (Some(blob_id), _) => {
let child = Box::new(EventNode {
col: col.clone(),
calname,
@@ -290,7 +382,7 @@ impl DavNode for CalendarNode {
blob_id: *blob_id,
});
child.fetch(user, &path[1..], create).await
- },
+ }
(None, true) => {
let child = Box::new(CreateEventNode {
col: col.clone(),
@@ -298,11 +390,11 @@ impl DavNode for CalendarNode {
filename: path[0].to_string(),
});
child.fetch(user, &path[1..], create).await
- },
+ }
_ => Err(anyhow!("Not found")),
}
-
- }.boxed()
+ }
+ .boxed()
}
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> {
@@ -310,15 +402,21 @@ impl DavNode for CalendarNode {
let calname = self.calname.clone();
async move {
- col.dag().await.idx_by_filename.iter().map(|(filename, blob_id)| {
- Box::new(EventNode {
- col: col.clone(),
- calname: calname.clone(),
- filename: filename.to_string(),
- blob_id: *blob_id,
- }) as Box<dyn DavNode>
- }).collect()
- }.boxed()
+ col.dag()
+ .await
+ .idx_by_filename
+ .iter()
+ .map(|(filename, blob_id)| {
+ Box::new(EventNode {
+ col: col.clone(),
+ calname: calname.clone(),
+ filename: filename.to_string(),
+ blob_id: *blob_id,
+ }) as Box<dyn DavNode>
+ })
+ .collect()
+ }
+ .boxed()
}
fn path(&self, user: &ArcUser) -> String {
@@ -330,38 +428,58 @@ impl DavNode for CalendarNode {
dav::PropertyRequest::DisplayName,
dav::PropertyRequest::ResourceType,
dav::PropertyRequest::GetContentType,
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::SupportedCalendarComponentSet)),
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(
+ cal::PropertyRequest::SupportedCalendarComponentSet,
+ )),
])
}
fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
let calname = self.calname.to_string();
- futures::stream::iter(prop.0).map(move |n| {
- let prop = match n {
- dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} calendar", calname)),
- dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![
- dav::ResourceType::Collection,
- dav::ResourceType::Extension(all::ResourceType::Cal(cal::ResourceType::Calendar)),
- ]),
- //dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("httpd/unix-directory".into())),
- //@FIXME seems wrong but seems to be what Thunderbird expects...
- dav::PropertyRequest::GetContentType => dav::Property::GetContentType("text/calendar".into()),
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::SupportedCalendarComponentSet))
- => dav::Property::Extension(all::Property::Cal(cal::Property::SupportedCalendarComponentSet(vec![
- cal::CompSupport(cal::Component::VEvent),
- ]))),
- v => return Err(v),
- };
- Ok(prop)
- }).boxed()
+ futures::stream::iter(prop.0)
+ .map(move |n| {
+ let prop = match n {
+ dav::PropertyRequest::DisplayName => {
+ dav::Property::DisplayName(format!("{} calendar", calname))
+ }
+ dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![
+ dav::ResourceType::Collection,
+ dav::ResourceType::Extension(all::ResourceType::Cal(
+ cal::ResourceType::Calendar,
+ )),
+ ]),
+ //dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("httpd/unix-directory".into())),
+ //@FIXME seems wrong but seems to be what Thunderbird expects...
+ dav::PropertyRequest::GetContentType => {
+ dav::Property::GetContentType("text/calendar".into())
+ }
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(
+ cal::PropertyRequest::SupportedCalendarComponentSet,
+ )) => dav::Property::Extension(all::Property::Cal(
+ cal::Property::SupportedCalendarComponentSet(vec![cal::CompSupport(
+ cal::Component::VEvent,
+ )]),
+ )),
+ v => return Err(v),
+ };
+ Ok(prop)
+ })
+ .boxed()
}
- fn put<'a>(&'a self, _policy: PutPolicy, _stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
+ fn put<'a>(
+ &'a self,
+ _policy: PutPolicy,
+ _stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed()
}
- fn content<'a>(&'a self) -> Content<'static> {
- futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
+ fn content<'a>(&self) -> Content<'a> {
+ futures::stream::once(futures::future::err(std::io::Error::from(
+ std::io::ErrorKind::Unsupported,
+ )))
+ .boxed()
}
fn content_type(&self) -> &str {
@@ -386,13 +504,23 @@ pub(crate) struct EventNode {
}
impl DavNode for EventNode {
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
let node = Box::new(self.clone()) as Box<dyn DavNode>;
- return async { Ok(node) }.boxed()
+ return async { Ok(node) }.boxed();
}
- async { Err(anyhow!("Not supported: can't create a child on an event node")) }.boxed()
+ async {
+ Err(anyhow!(
+ "Not supported: can't create a child on an event node"
+ ))
+ }
+ .boxed()
}
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> {
@@ -400,7 +528,10 @@ impl DavNode for EventNode {
}
fn path(&self, user: &ArcUser) -> String {
- format!("/{}/calendar/{}/{}", user.username, self.calname, self.filename)
+ format!(
+ "/{}/calendar/{}/{}",
+ user.username, self.calname, self.filename
+ )
}
fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> {
@@ -408,66 +539,106 @@ impl DavNode for EventNode {
dav::PropertyRequest::DisplayName,
dav::PropertyRequest::ResourceType,
dav::PropertyRequest::GetEtag,
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(cal::CalendarDataRequest::default()))),
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(
+ cal::PropertyRequest::CalendarData(cal::CalendarDataRequest::default()),
+ )),
])
}
fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
let this = self.clone();
- futures::stream::iter(prop.0).then(move |n| {
- let this = this.clone();
-
- async move {
- let prop = match &n {
- dav::PropertyRequest::DisplayName => dav::Property::DisplayName(format!("{} event", this.filename)),
- dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![]),
- dav::PropertyRequest::GetContentType => dav::Property::GetContentType("text/calendar".into()),
- dav::PropertyRequest::GetEtag => {
- let etag = this.etag().await.ok_or(n.clone())?;
- dav::Property::GetEtag(etag)
- },
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(_req))) => {
- let ics = String::from_utf8(this.col.get(this.blob_id).await.or(Err(n.clone()))?).or(Err(n.clone()))?;
-
- dav::Property::Extension(all::Property::Cal(cal::Property::CalendarData(cal::CalendarDataPayload {
- mime: None,
- payload: ics,
- })))
- },
- _ => return Err(n),
- };
- Ok(prop)
- }
- }).boxed()
- }
-
- fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
+ futures::stream::iter(prop.0)
+ .then(move |n| {
+ let this = this.clone();
+
+ async move {
+ let prop = match &n {
+ dav::PropertyRequest::DisplayName => {
+ dav::Property::DisplayName(format!("{} event", this.filename))
+ }
+ dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![]),
+ dav::PropertyRequest::GetContentType => {
+ dav::Property::GetContentType("text/calendar".into())
+ }
+ dav::PropertyRequest::GetEtag => {
+ let etag = this.etag().await.ok_or(n.clone())?;
+ dav::Property::GetEtag(etag)
+ }
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(
+ cal::PropertyRequest::CalendarData(_req),
+ )) => {
+ let ics = String::from_utf8(
+ this.col.get(this.blob_id).await.or(Err(n.clone()))?,
+ )
+ .or(Err(n.clone()))?;
+
+ dav::Property::Extension(all::Property::Cal(
+ cal::Property::CalendarData(cal::CalendarDataPayload {
+ mime: None,
+ payload: ics,
+ }),
+ ))
+ }
+ _ => return Err(n),
+ };
+ Ok(prop)
+ }
+ })
+ .boxed()
+ }
+
+ fn put<'a>(
+ &'a self,
+ policy: PutPolicy,
+ stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
async {
- let existing_etag = self.etag().await.ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Etag error"))?;
+ let existing_etag = self
+ .etag()
+ .await
+ .ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Etag error"))?;
match policy {
- PutPolicy::CreateOnly => return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)),
- PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)),
- _ => ()
+ PutPolicy::CreateOnly => {
+ return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists))
+ }
+ PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => {
+ return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists))
+ }
+ _ => (),
};
//@FIXME for now, our storage interface does not allow streaming,
// so we load everything in memory
let mut evt = Vec::new();
let mut reader = stream.into_async_read();
- reader.read_to_end(&mut evt).await.or(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))?;
- let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await.or(Err(std::io::ErrorKind::Interrupted))?;
- self.col.opportunistic_sync().await.or(Err(std::io::ErrorKind::ConnectionReset))?;
+ reader
+ .read_to_end(&mut evt)
+ .await
+ .or(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))?;
+ let (_token, entry) = self
+ .col
+ .put(self.filename.as_str(), evt.as_ref())
+ .await
+ .or(Err(std::io::ErrorKind::Interrupted))?;
+ self.col
+ .opportunistic_sync()
+ .await
+ .or(Err(std::io::ErrorKind::ConnectionReset))?;
Ok(entry.2)
- }.boxed()
+ }
+ .boxed()
}
- fn content<'a>(&'a self) -> Content<'static> {
+ fn content<'a>(&self) -> Content<'a> {
//@FIXME for now, our storage interface does not allow streaming,
// so we load everything in memory
let calendar = self.col.clone();
let blob_id = self.blob_id.clone();
let r = async move {
- let content = calendar.get(blob_id).await.or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted)));
+ let content = calendar
+ .get(blob_id)
+ .await
+ .or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted)));
Ok(hyper::body::Bytes::from(content?))
};
futures::stream::once(Box::pin(r)).boxed()
@@ -481,8 +652,14 @@ impl DavNode for EventNode {
let calendar = self.col.clone();
async move {
- calendar.dag().await.table.get(&self.blob_id).map(|(_, _, etag)| etag.to_string())
- }.boxed()
+ calendar
+ .dag()
+ .await
+ .table
+ .get(&self.blob_id)
+ .map(|(_, _, etag)| etag.to_string())
+ }
+ .boxed()
}
fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> {
@@ -494,12 +671,16 @@ impl DavNode for EventNode {
Ok(v) => v,
Err(e) => {
tracing::error!(err=?e, "delete event node");
- return Err(std::io::Error::from(std::io::ErrorKind::Interrupted))
- },
+ return Err(std::io::Error::from(std::io::ErrorKind::Interrupted));
+ }
};
- calendar.opportunistic_sync().await.or(Err(std::io::ErrorKind::ConnectionReset))?;
+ calendar
+ .opportunistic_sync()
+ .await
+ .or(Err(std::io::ErrorKind::ConnectionReset))?;
Ok(())
- }.boxed()
+ }
+ .boxed()
}
}
@@ -510,13 +691,23 @@ pub(crate) struct CreateEventNode {
filename: String,
}
impl DavNode for CreateEventNode {
- fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
+ fn fetch<'a>(
+ &self,
+ user: &'a ArcUser,
+ path: &'a [&str],
+ create: bool,
+ ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
let node = Box::new(self.clone()) as Box<dyn DavNode>;
- return async { Ok(node) }.boxed()
+ return async { Ok(node) }.boxed();
}
- async { Err(anyhow!("Not supported: can't create a child on an event node")) }.boxed()
+ async {
+ Err(anyhow!(
+ "Not supported: can't create a child on an event node"
+ ))
+ }
+ .boxed()
}
fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> {
@@ -524,33 +715,51 @@ impl DavNode for CreateEventNode {
}
fn path(&self, user: &ArcUser) -> String {
- format!("/{}/calendar/{}/{}", user.username, self.calname, self.filename)
+ format!(
+ "/{}/calendar/{}/{}",
+ user.username, self.calname, self.filename
+ )
}
fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> {
dav::PropName(vec![])
}
- fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
+ fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> {
futures::stream::iter(vec![]).boxed()
}
- fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
+ fn put<'a>(
+ &'a self,
+ _policy: PutPolicy,
+ stream: Content<'a>,
+ ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> {
//@NOTE: policy might not be needed here: whatever we put, there is no known entries here
-
+
async {
//@FIXME for now, our storage interface does not allow for streaming
let mut evt = Vec::new();
let mut reader = stream.into_async_read();
reader.read_to_end(&mut evt).await.unwrap();
- let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await.or(Err(std::io::ErrorKind::Interrupted))?;
- self.col.opportunistic_sync().await.or(Err(std::io::ErrorKind::ConnectionReset))?;
+ let (_token, entry) = self
+ .col
+ .put(self.filename.as_str(), evt.as_ref())
+ .await
+ .or(Err(std::io::ErrorKind::Interrupted))?;
+ self.col
+ .opportunistic_sync()
+ .await
+ .or(Err(std::io::ErrorKind::ConnectionReset))?;
Ok(entry.2)
- }.boxed()
+ }
+ .boxed()
}
- fn content(&self) -> Content<'static> {
- futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
+ fn content<'a>(&self) -> Content<'a> {
+ futures::stream::once(futures::future::err(std::io::Error::from(
+ std::io::ErrorKind::Unsupported,
+ )))
+ .boxed()
}
fn content_type(&self) -> &str {
diff --git a/aero-proto/src/imap/command/anonymous.rs b/aero-proto/src/imap/command/anonymous.rs
index 2848c30..f23ec17 100644
--- a/aero-proto/src/imap/command/anonymous.rs
+++ b/aero-proto/src/imap/command/anonymous.rs
@@ -4,8 +4,8 @@ use imap_codec::imap_types::core::AString;
use imap_codec::imap_types::response::Code;
use imap_codec::imap_types::secret::Secret;
-use aero_user::login::ArcLoginProvider;
use aero_collections::user::User;
+use aero_user::login::ArcLoginProvider;
use crate::imap::capability::ServerCapability;
use crate::imap::command::anystate;
diff --git a/aero-proto/src/imap/command/authenticated.rs b/aero-proto/src/imap/command/authenticated.rs
index 4c8d8c1..5bd34cb 100644
--- a/aero-proto/src/imap/command/authenticated.rs
+++ b/aero-proto/src/imap/command/authenticated.rs
@@ -14,10 +14,10 @@ use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
use imap_codec::imap_types::response::{Code, CodeOther, Data};
use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName};
+use aero_collections::mail::namespace::MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW;
use aero_collections::mail::uidindex::*;
-use aero_collections::user::User;
use aero_collections::mail::IMF;
-use aero_collections::mail::namespace::MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW;
+use aero_collections::user::User;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, MailboxName};
diff --git a/aero-proto/src/imap/mod.rs b/aero-proto/src/imap/mod.rs
index 7183a78..6a768b0 100644
--- a/aero-proto/src/imap/mod.rs
+++ b/aero-proto/src/imap/mod.rs
@@ -17,14 +17,14 @@ use std::net::SocketAddr;
use anyhow::{anyhow, bail, Result};
use futures::stream::{FuturesUnordered, StreamExt};
-use tokio::net::TcpListener;
-use tokio::sync::mpsc;
-use tokio::sync::watch;
use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
use rustls_pemfile::{certs, private_key};
+use tokio::net::TcpListener;
+use tokio::sync::mpsc;
+use tokio::sync::watch;
use tokio_rustls::TlsAcceptor;
use aero_user::config::{ImapConfig, ImapUnsecureConfig};
diff --git a/aero-proto/src/lmtp.rs b/aero-proto/src/lmtp.rs
index 9d40296..a82a783 100644
--- a/aero-proto/src/lmtp.rs
+++ b/aero-proto/src/lmtp.rs
@@ -10,16 +10,16 @@ use futures::{
stream::{FuturesOrdered, FuturesUnordered},
StreamExt,
};
+use smtp_message::{DataUnescaper, Email, EscapedDataReader, Reply, ReplyCode};
+use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata};
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::watch;
use tokio_util::compat::*;
-use smtp_message::{DataUnescaper, Email, EscapedDataReader, Reply, ReplyCode};
-use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata};
+use aero_collections::mail::incoming::EncryptedMessage;
use aero_user::config::*;
use aero_user::login::*;
-use aero_collections::mail::incoming::EncryptedMessage;
pub struct LmtpServer {
bind_addr: SocketAddr,
diff --git a/aero-proto/src/sasl.rs b/aero-proto/src/sasl.rs
index dae89eb..48c0815 100644
--- a/aero-proto/src/sasl.rs
+++ b/aero-proto/src/sasl.rs
@@ -8,9 +8,9 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::watch;
use tokio_util::bytes::BytesMut;
+use aero_sasl::{decode::client_command, encode::Encode, flow::State};
use aero_user::config::AuthConfig;
use aero_user::login::ArcLoginProvider;
-use aero_sasl::{flow::State, decode::client_command, encode::Encode};
pub struct AuthServer {
login_provider: ArcLoginProvider,