diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-05-29 10:14:51 +0200 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-05-29 10:14:51 +0200 |
commit | b9ce5886033677f6c65a4b873e17574fdb8df31d (patch) | |
tree | 9ed1d721361027d7d6fef0ecad65d7e1b74a7ddb /aero-proto/src/dav | |
parent | 0dcf69f180f5a7b71b6ad2ac67e4cdd81e5154f1 (diff) | |
parent | 5954de6efbb040b8b47daf0c7663a60f3db1da6e (diff) | |
download | aerogramme-b9ce5886033677f6c65a4b873e17574fdb8df31d.tar.gz aerogramme-b9ce5886033677f6c65a4b873e17574fdb8df31d.zip |
Merge branch 'caldav'
Diffstat (limited to 'aero-proto/src/dav')
-rw-r--r-- | aero-proto/src/dav/codec.rs | 135 | ||||
-rw-r--r-- | aero-proto/src/dav/controller.rs | 436 | ||||
-rw-r--r-- | aero-proto/src/dav/middleware.rs | 70 | ||||
-rw-r--r-- | aero-proto/src/dav/mod.rs | 195 | ||||
-rw-r--r-- | aero-proto/src/dav/node.rs | 145 | ||||
-rw-r--r-- | aero-proto/src/dav/resource.rs | 999 |
6 files changed, 1980 insertions, 0 deletions
diff --git a/aero-proto/src/dav/codec.rs b/aero-proto/src/dav/codec.rs new file mode 100644 index 0000000..a441e7e --- /dev/null +++ b/aero-proto/src/dav/codec.rs @@ -0,0 +1,135 @@ +use anyhow::{bail, Result}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::BodyExt; +use http_body_util::BodyStream; +use http_body_util::Full; +use http_body_util::StreamBody; +use hyper::body::Frame; +use hyper::body::Incoming; +use hyper::{body::Bytes, Request, Response}; +use std::io::{Error, ErrorKind}; +use tokio_util::io::{CopyToBytes, SinkWriter}; +use tokio_util::sync::PollSender; + +use super::controller::HttpResponse; +use super::node::PutPolicy; +use aero_dav::types as dav; +use aero_dav::xml as dxml; + +pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth { + match req + .headers() + .get("Depth") + .map(hyper::header::HeaderValue::to_str) + { + Some(Ok("0")) => dav::Depth::Zero, + Some(Ok("1")) => dav::Depth::One, + Some(Ok("Infinity")) => dav::Depth::Infinity, + _ => dav::Depth::Zero, + } +} + +pub(crate) fn put_policy(req: &Request<impl hyper::body::Body>) -> Result<PutPolicy> { + if let Some(maybe_txt_etag) = req + .headers() + .get("If-Match") + .map(hyper::header::HeaderValue::to_str) + { + let etag = maybe_txt_etag?; + let dquote_count = etag.chars().filter(|c| *c == '"').count(); + if dquote_count != 2 { + bail!("Either If-Match value is invalid or it's not supported (only single etag is supported)"); + } + + return Ok(PutPolicy::ReplaceEtag(etag.into())); + } + + if let Some(maybe_txt_etag) = req + .headers() + .get("If-None-Match") + .map(hyper::header::HeaderValue::to_str) + { + let etag = maybe_txt_etag?; + if etag == "*" { + return Ok(PutPolicy::CreateOnly); + } + bail!("Either If-None-Match value is invalid or it's not supported (only asterisk is supported)") + } + + Ok(PutPolicy::OverwriteAll) +} + +pub(crate) fn text_body(txt: &'static str) -> UnsyncBoxBody<Bytes, std::io::Error> { + UnsyncBoxBody::new(Full::new(Bytes::from(txt)).map_err(|e| match e {})) +} + +pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>( + status_ok: hyper::StatusCode, + elem: T, +) -> Result<HttpResponse> { + let (tx, rx) = tokio::sync::mpsc::channel::<Bytes>(1); + + // Build the writer + tokio::task::spawn(async move { + let sink = PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::BrokenPipe)); + let mut writer = SinkWriter::new(CopyToBytes::new(sink)); + let q = quick_xml::writer::Writer::new_with_indent(&mut writer, b' ', 4); + let ns_to_apply = vec![ + ("xmlns:D".into(), "DAV:".into()), + ("xmlns:C".into(), "urn:ietf:params:xml:ns:caldav".into()), + ]; + let mut qwriter = dxml::Writer { q, ns_to_apply }; + let decl = + quick_xml::events::BytesDecl::from_start(quick_xml::events::BytesStart::from_content( + "xml version=\"1.0\" encoding=\"utf-8\"", + 0, + )); + match qwriter + .q + .write_event_async(quick_xml::events::Event::Decl(decl)) + .await + { + Ok(_) => (), + Err(e) => tracing::error!(err=?e, "unable to write XML declaration <?xml ... >"), + } + match elem.qwrite(&mut qwriter).await { + Ok(_) => tracing::debug!("fully serialized object"), + Err(e) => tracing::error!(err=?e, "failed to serialize object"), + } + }); + + // Build the reader + let recv = tokio_stream::wrappers::ReceiverStream::new(rx); + let stream = StreamBody::new(recv.map(|v| Ok(Frame::data(v)))); + let boxed_body = UnsyncBoxBody::new(stream); + + let response = Response::builder() + .status(status_ok) + .header("content-type", "application/xml; charset=\"utf-8\"") + .body(boxed_body)?; + + Ok(response) +} + +/// Deserialize a request body to an XML request +pub(crate) async fn deserialize<T: dxml::Node<T>>(req: Request<Incoming>) -> Result<T> { + let stream_of_frames = BodyStream::new(req.into_body()); + let stream_of_bytes = stream_of_frames + .map_ok(|frame| frame.into_data()) + .map(|obj| match obj { + Ok(Ok(v)) => Ok(v), + Ok(Err(_)) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "conversion error", + )), + Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)), + }); + let async_read = tokio_util::io::StreamReader::new(stream_of_bytes); + let async_read = std::pin::pin!(async_read); + let mut rdr = dxml::Reader::new(quick_xml::reader::NsReader::from_reader(async_read)).await?; + let parsed = rdr.find::<T>().await?; + Ok(parsed) +} diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs new file mode 100644 index 0000000..8c53c6b --- /dev/null +++ b/aero-proto/src/dav/controller.rs @@ -0,0 +1,436 @@ +use anyhow::Result; +use futures::stream::{StreamExt, TryStreamExt}; +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::BodyStream; +use http_body_util::StreamBody; +use hyper::body::Frame; +use hyper::body::Incoming; +use hyper::{body::Bytes, Request, Response}; + +use aero_collections::{davdag::Token, user::User}; +use aero_dav::caltypes as cal; +use aero_dav::realization::{self, All}; +use aero_dav::synctypes as sync; +use aero_dav::types as dav; +use aero_dav::versioningtypes as vers; +use aero_ical::query::is_component_match; + +use crate::dav::codec; +use crate::dav::codec::{depth, deserialize, serialize, text_body}; +use crate::dav::node::DavNode; +use crate::dav::resource::{RootNode, BASE_TOKEN_URI}; + +pub(super) type ArcUser = std::sync::Arc<User>; +pub(super) type HttpResponse = Response<UnsyncBoxBody<Bytes, std::io::Error>>; + +const ALLPROP: [dav::PropertyRequest<All>; 10] = [ + dav::PropertyRequest::CreationDate, + dav::PropertyRequest::DisplayName, + dav::PropertyRequest::GetContentLanguage, + dav::PropertyRequest::GetContentLength, + dav::PropertyRequest::GetContentType, + dav::PropertyRequest::GetEtag, + dav::PropertyRequest::GetLastModified, + dav::PropertyRequest::LockDiscovery, + dav::PropertyRequest::ResourceType, + dav::PropertyRequest::SupportedLock, +]; + +pub(crate) struct Controller { + node: Box<dyn DavNode>, + user: std::sync::Arc<User>, + req: Request<Incoming>, +} +impl Controller { + pub(crate) async fn route( + user: std::sync::Arc<User>, + req: Request<Incoming>, + ) -> Result<HttpResponse> { + let path = req.uri().path().to_string(); + let path_segments: Vec<_> = path.split("/").filter(|s| *s != "").collect(); + let method = req.method().as_str().to_uppercase(); + + let can_create = matches!(method.as_str(), "PUT" | "MKCOL" | "MKCALENDAR"); + let node = match (RootNode {}).fetch(&user, &path_segments, can_create).await { + Ok(v) => v, + Err(e) => { + tracing::warn!(err=?e, "dav node fetch failed"); + return Ok(Response::builder() + .status(404) + .body(codec::text_body("Resource not found"))?); + } + }; + + let dav_hdrs = node.dav_header(); + let ctrl = Self { node, user, req }; + + match method.as_str() { + "OPTIONS" => Ok(Response::builder() + .status(200) + .header("DAV", dav_hdrs) + .header("Allow", "HEAD,GET,PUT,OPTIONS,DELETE,PROPFIND,PROPPATCH,MKCOL,COPY,MOVE,LOCK,UNLOCK,MKCALENDAR,REPORT") + .body(codec::text_body(""))?), + "HEAD" => { + tracing::warn!("HEAD might not correctly implemented: should return ETags & co"); + Ok(Response::builder() + .status(200) + .body(codec::text_body(""))?) + }, + "GET" => ctrl.get().await, + "PUT" => ctrl.put().await, + "DELETE" => ctrl.delete().await, + "PROPFIND" => ctrl.propfind().await, + "REPORT" => ctrl.report().await, + _ => Ok(Response::builder() + .status(501) + .body(codec::text_body("HTTP Method not implemented"))?), + } + } + + // --- Per-method functions --- + + /// REPORT has been first described in the "Versioning Extension" of WebDAV + /// It allows more complex queries compared to PROPFIND + /// + /// Note: current implementation is not generic at all, it is heavily tied to CalDAV. + /// A rewrite would be required to make it more generic (with the extension system that has + /// been introduced in aero-dav) + async fn report(self) -> Result<HttpResponse> { + let status = hyper::StatusCode::from_u16(207)?; + + let cal_report = match deserialize::<vers::Report<All>>(self.req).await { + Ok(v) => v, + Err(e) => { + tracing::error!(err=?e, "unable to decode REPORT body"); + return Ok(Response::builder() + .status(400) + .body(text_body("Bad request"))?); + } + }; + + // Internal representation that will handle processed request + let (mut ok_node, mut not_found) = (Vec::new(), Vec::new()); + let calprop: Option<cal::CalendarSelector<All>>; + let extension: Option<realization::Multistatus>; + + // Extracting request information + match cal_report { + vers::Report::Extension(realization::ReportType::Cal(cal::ReportType::Multiget(m))) => { + // Multiget is really like a propfind where Depth: 0|1|Infinity is replaced by an arbitrary + // list of URLs + // Getting the list of nodes + for h in m.href.into_iter() { + let maybe_collected_node = match Path::new(h.0.as_str()) { + Ok(Path::Abs(p)) => RootNode {} + .fetch(&self.user, p.as_slice(), false) + .await + .or(Err(h)), + Ok(Path::Rel(p)) => self + .node + .fetch(&self.user, p.as_slice(), false) + .await + .or(Err(h)), + Err(_) => Err(h), + }; + + match maybe_collected_node { + Ok(v) => ok_node.push(v), + Err(h) => not_found.push(h), + }; + } + calprop = m.selector; + extension = None; + } + vers::Report::Extension(realization::ReportType::Cal(cal::ReportType::Query(q))) => { + calprop = q.selector; + extension = None; + ok_node = apply_filter(self.node.children(&self.user).await, &q.filter) + .try_collect() + .await?; + } + vers::Report::Extension(realization::ReportType::Sync(sync_col)) => { + calprop = Some(cal::CalendarSelector::Prop(sync_col.prop)); + + if sync_col.limit.is_some() { + tracing::warn!("limit is not supported, ignoring"); + } + if matches!(sync_col.sync_level, sync::SyncLevel::Infinite) { + tracing::debug!("aerogramme calendar collections are not nested"); + } + + let token = match sync_col.sync_token { + sync::SyncTokenRequest::InitialSync => None, + sync::SyncTokenRequest::IncrementalSync(token_raw) => { + // parse token + if token_raw.len() != BASE_TOKEN_URI.len() + 48 { + anyhow::bail!("invalid token length") + } + let token = token_raw[BASE_TOKEN_URI.len()..] + .parse() + .or(Err(anyhow::anyhow!("can't parse token")))?; + Some(token) + } + }; + // do the diff + let new_token: Token; + (new_token, ok_node, not_found) = match self.node.diff(token).await { + Ok(t) => t, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => return Ok(Response::builder() + .status(410) + .body(text_body("Diff failed, token might be expired"))?), + _ => return Ok(Response::builder() + .status(500) + .body(text_body("Server error, maybe this operation is not supported on this collection"))?), + }, + }; + extension = Some(realization::Multistatus::Sync(sync::Multistatus { + sync_token: sync::SyncToken(format!("{}{}", BASE_TOKEN_URI, new_token)), + })); + } + _ => { + return Ok(Response::builder() + .status(501) + .body(text_body("Not implemented"))?) + } + }; + + // Getting props + let props = match calprop { + None | Some(cal::CalendarSelector::AllProp) => Some(dav::PropName(ALLPROP.to_vec())), + Some(cal::CalendarSelector::PropName) => None, + Some(cal::CalendarSelector::Prop(inner)) => Some(inner), + }; + + serialize( + status, + Self::multistatus(&self.user, ok_node, not_found, props, extension).await, + ) + } + + /// PROPFIND is the standard way to fetch WebDAV properties + async fn propfind(self) -> Result<HttpResponse> { + let depth = depth(&self.req); + if matches!(depth, dav::Depth::Infinity) { + return Ok(Response::builder() + .status(501) + .body(text_body("Depth: Infinity not implemented"))?); + } + + let status = hyper::StatusCode::from_u16(207)?; + + // A client may choose not to submit a request body. An empty PROPFIND + // request body MUST be treated as if it were an 'allprop' request. + // @FIXME here we handle any invalid data as an allprop, an empty request is thus correctly + // handled, but corrupted requests are also silently handled as allprop. + let propfind = deserialize::<dav::PropFind<All>>(self.req) + .await + .unwrap_or_else(|_| dav::PropFind::<All>::AllProp(None)); + tracing::debug!(recv=?propfind, "inferred propfind request"); + + // Collect nodes as PROPFIND is not limited to the targeted node + let mut nodes = vec![]; + if matches!(depth, dav::Depth::One | dav::Depth::Infinity) { + nodes.extend(self.node.children(&self.user).await); + } + nodes.push(self.node); + + // Expand properties request + let propname = match propfind { + dav::PropFind::PropName => None, + dav::PropFind::AllProp(None) => Some(dav::PropName(ALLPROP.to_vec())), + dav::PropFind::AllProp(Some(dav::Include(mut include))) => { + include.extend_from_slice(&ALLPROP); + Some(dav::PropName(include)) + } + dav::PropFind::Prop(inner) => Some(inner), + }; + + // Not Found is currently impossible considering the way we designed this function + let not_found = vec![]; + serialize( + status, + Self::multistatus(&self.user, nodes, not_found, propname, None).await, + ) + } + + async fn put(self) -> Result<HttpResponse> { + let put_policy = codec::put_policy(&self.req)?; + + let stream_of_frames = BodyStream::new(self.req.into_body()); + let stream_of_bytes = stream_of_frames + .map_ok(|frame| frame.into_data()) + .map(|obj| match obj { + Ok(Ok(v)) => Ok(v), + Ok(Err(_)) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "conversion error", + )), + Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)), + }) + .boxed(); + + let etag = match self.node.put(put_policy, stream_of_bytes).await { + Ok(etag) => etag, + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + tracing::warn!("put pre-condition failed"); + let response = Response::builder().status(412).body(text_body(""))?; + return Ok(response); + } + Err(e) => Err(e)?, + }; + + let response = Response::builder() + .status(201) + .header("ETag", etag) + //.header("content-type", "application/xml; charset=\"utf-8\"") + .body(text_body(""))?; + + Ok(response) + } + + async fn get(self) -> Result<HttpResponse> { + let stream_body = StreamBody::new(self.node.content().map_ok(|v| Frame::data(v))); + let boxed_body = UnsyncBoxBody::new(stream_body); + + let mut builder = Response::builder().status(200); + builder = builder.header("content-type", self.node.content_type()); + if let Some(etag) = self.node.etag().await { + builder = builder.header("etag", etag); + } + let response = builder.body(boxed_body)?; + + Ok(response) + } + + async fn delete(self) -> Result<HttpResponse> { + self.node.delete().await?; + let response = Response::builder() + .status(204) + //.header("content-type", "application/xml; charset=\"utf-8\"") + .body(text_body(""))?; + Ok(response) + } + + // --- Common utility functions --- + /// Build a multistatus response from a list of DavNodes + async fn multistatus( + user: &ArcUser, + nodes: Vec<Box<dyn DavNode>>, + not_found: Vec<dav::Href>, + props: Option<dav::PropName<All>>, + extension: Option<realization::Multistatus>, + ) -> dav::Multistatus<All> { + // Collect properties on existing objects + let mut responses: Vec<dav::Response<All>> = match props { + Some(props) => { + futures::stream::iter(nodes) + .then(|n| n.response_props(user, props.clone())) + .collect() + .await + } + None => nodes + .into_iter() + .map(|n| n.response_propname(user)) + .collect(), + }; + + // Register not found objects only if relevant + if !not_found.is_empty() { + responses.push(dav::Response { + status_or_propstat: dav::StatusOrPropstat::Status( + not_found, + dav::Status(hyper::StatusCode::NOT_FOUND), + ), + error: None, + location: None, + responsedescription: None, + }); + } + + // Build response + let multistatus = dav::Multistatus::<All> { + responses, + responsedescription: None, + extension, + }; + + tracing::debug!(multistatus=?multistatus, "multistatus response"); + multistatus + } +} + +/// Path is a voluntarily feature limited +/// compared to the expressiveness of a UNIX path +/// For example getting parent with ../ is not supported, scheme is not supported, etc. +/// More complex support could be added later if needed by clients +enum Path<'a> { + Abs(Vec<&'a str>), + Rel(Vec<&'a str>), +} +impl<'a> Path<'a> { + fn new(path: &'a str) -> Result<Self> { + // This check is naive, it does not aim at detecting all fully qualified + // URL or protect from any attack, its only goal is to help debugging. + if path.starts_with("http://") || path.starts_with("https://") { + anyhow::bail!("Full URL are not supported") + } + + let path_segments: Vec<_> = path.split("/").filter(|s| *s != "" && *s != ".").collect(); + if path.starts_with("/") { + return Ok(Path::Abs(path_segments)); + } + Ok(Path::Rel(path_segments)) + } +} + +//@FIXME naive implementation, must be refactored later +use futures::stream::Stream; +fn apply_filter<'a>( + nodes: Vec<Box<dyn DavNode>>, + filter: &'a cal::Filter, +) -> impl Stream<Item = std::result::Result<Box<dyn DavNode>, std::io::Error>> + 'a { + futures::stream::iter(nodes).filter_map(move |single_node| async move { + // Get ICS + let chunks: Vec<_> = match single_node.content().try_collect().await { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + let raw_ics = chunks.iter().fold(String::new(), |mut acc, single_chunk| { + let str_fragment = std::str::from_utf8(single_chunk.as_ref()); + acc.extend(str_fragment); + acc + }); + + // Parse ICS + let ics = match icalendar::parser::read_calendar(&raw_ics) { + Ok(v) => v, + Err(e) => { + tracing::warn!(err=?e, "Unable to parse ICS in calendar-query"); + return Some(Err(std::io::Error::from(std::io::ErrorKind::InvalidData))); + } + }; + + // Do checks + // @FIXME: icalendar does not consider VCALENDAR as a component + // but WebDAV does... + // Build a fake VCALENDAR component for icalendar compatibility, it's a hack + let root_filter = &filter.0; + let fake_vcal_component = icalendar::parser::Component { + name: cal::Component::VCalendar.as_str().into(), + properties: ics.properties, + components: ics.components, + }; + tracing::debug!(filter=?root_filter, "calendar-query filter"); + + // Adjust return value according to filter + match is_component_match( + &fake_vcal_component, + &[fake_vcal_component.clone()], + root_filter, + ) { + true => Some(Ok(single_node)), + _ => None, + } + }) +} diff --git a/aero-proto/src/dav/middleware.rs b/aero-proto/src/dav/middleware.rs new file mode 100644 index 0000000..8964699 --- /dev/null +++ b/aero-proto/src/dav/middleware.rs @@ -0,0 +1,70 @@ +use anyhow::{anyhow, Result}; +use base64::Engine; +use hyper::body::Incoming; +use hyper::{Request, Response}; + +use aero_collections::user::User; +use aero_user::login::ArcLoginProvider; + +use super::codec::text_body; +use super::controller::HttpResponse; + +type ArcUser = std::sync::Arc<User>; + +pub(super) async fn auth<'a>( + login: ArcLoginProvider, + req: Request<Incoming>, + next: impl Fn(ArcUser, Request<Incoming>) -> futures::future::BoxFuture<'a, Result<HttpResponse>>, +) -> Result<HttpResponse> { + let auth_val = match req.headers().get(hyper::header::AUTHORIZATION) { + Some(hv) => hv.to_str()?, + None => { + tracing::info!("Missing authorization field"); + return Ok(Response::builder() + .status(401) + .header("WWW-Authenticate", "Basic realm=\"Aerogramme\"") + .body(text_body("Missing Authorization field"))?); + } + }; + + let b64_creds_maybe_padded = match auth_val.split_once(" ") { + Some(("Basic", b64)) => b64, + _ => { + tracing::info!("Unsupported authorization field"); + return Ok(Response::builder() + .status(400) + .body(text_body("Unsupported Authorization field"))?); + } + }; + + // base64urlencoded may have trailing equals, base64urlsafe has not + // theoretically authorization is padded but "be liberal in what you accept" + let b64_creds_clean = b64_creds_maybe_padded.trim_end_matches('='); + + // Decode base64 + let creds = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64_creds_clean)?; + let str_creds = std::str::from_utf8(&creds)?; + + // Split username and password + let (username, password) = str_creds.split_once(':').ok_or(anyhow!( + "Missing colon in Authorization, can't split decoded value into a username/password pair" + ))?; + + // Call login provider + let creds = match login.login(username, password).await { + Ok(c) => c, + Err(_) => { + tracing::info!(user = username, "Wrong credentials"); + return Ok(Response::builder() + .status(401) + .header("WWW-Authenticate", "Basic realm=\"Aerogramme\"") + .body(text_body("Wrong credentials"))?); + } + }; + + // Build a user + let user = User::new(username.into(), creds).await?; + + // Call router with user + next(user, req).await +} diff --git a/aero-proto/src/dav/mod.rs b/aero-proto/src/dav/mod.rs new file mode 100644 index 0000000..a3dd58d --- /dev/null +++ b/aero-proto/src/dav/mod.rs @@ -0,0 +1,195 @@ +mod codec; +mod controller; +mod middleware; +mod node; +mod resource; + +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::Result; +use futures::future::FutureExt; +use futures::stream::{FuturesUnordered, StreamExt}; +use hyper::rt::{Read, Write}; +use hyper::server::conn::http1 as http; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use rustls_pemfile::{certs, private_key}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::sync::watch; +use tokio_rustls::TlsAcceptor; + +use aero_user::config::{DavConfig, DavUnsecureConfig}; +use aero_user::login::ArcLoginProvider; + +use crate::dav::controller::Controller; + +pub struct Server { + bind_addr: SocketAddr, + login_provider: ArcLoginProvider, + tls: Option<TlsAcceptor>, +} + +pub fn new_unsecure(config: DavUnsecureConfig, login: ArcLoginProvider) -> Server { + Server { + bind_addr: config.bind_addr, + login_provider: login, + tls: None, + } +} + +pub fn new(config: DavConfig, login: ArcLoginProvider) -> Result<Server> { + let loaded_certs = certs(&mut std::io::BufReader::new(std::fs::File::open( + config.certs, + )?)) + .collect::<Result<Vec<_>, _>>()?; + let loaded_key = private_key(&mut std::io::BufReader::new(std::fs::File::open( + config.key, + )?))? + .unwrap(); + + let tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(loaded_certs, loaded_key)?; + let acceptor = TlsAcceptor::from(Arc::new(tls_config)); + + Ok(Server { + bind_addr: config.bind_addr, + login_provider: login, + tls: Some(acceptor), + }) +} + +trait Stream: Read + Write + Send + Unpin {} +impl<T: Unpin + AsyncRead + AsyncWrite + Send> Stream for TokioIo<T> {} + +impl Server { + pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + tracing::info!("DAV server listening on {:#}", self.bind_addr); + + let mut connections = FuturesUnordered::new(); + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = tokio::select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + tracing::info!("Accepted connection from {}", remote_addr); + let stream = match self.build_stream(socket).await { + Ok(v) => v, + Err(e) => { + tracing::error!(err=?e, "TLS acceptor failed"); + continue; + } + }; + + let login = self.login_provider.clone(); + let conn = tokio::spawn(async move { + //@FIXME should create a generic "public web" server on which "routers" could be + //abitrarily bound + //@FIXME replace with a handler supporting http2 + + match http::Builder::new() + .serve_connection( + stream, + service_fn(|req: Request<hyper::body::Incoming>| { + let login = login.clone(); + tracing::info!("{:?} {:?}", req.method(), req.uri()); + tracing::debug!(req=?req, "full request"); + async { + let response = match middleware::auth(login, req, |user, request| { + async { Controller::route(user, request).await }.boxed() + }) + .await + { + Ok(v) => Ok(v), + Err(e) => { + tracing::error!(err=?e, "internal error"); + Response::builder() + .status(500) + .body(codec::text_body("Internal error")) + } + }; + tracing::debug!(resp=?response, "full response"); + response + } + }), + ) + .await + { + Err(e) => tracing::warn!(err=?e, "connection failed"), + Ok(()) => tracing::trace!("connection terminated with success"), + } + }); + connections.push(conn); + } + drop(tcp); + + tracing::info!("Server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} + + Ok(()) + } + + async fn build_stream(&self, socket: TcpStream) -> Result<Box<dyn Stream>> { + match self.tls.clone() { + Some(acceptor) => { + let stream = acceptor.accept(socket).await?; + Ok(Box::new(TokioIo::new(stream))) + } + None => Ok(Box::new(TokioIo::new(socket))), + } + } +} + +// <D:propfind xmlns:D='DAV:' xmlns:A='http://apple.com/ns/ical/'> +// <D:prop> +// <D:getcontenttype/> +// <D:resourcetype/> +// <D:displayname/> +// <A:calendar-color/> +// </D:prop> +// </D:propfind> + +// <D:propfind xmlns:D='DAV:' xmlns:A='http://apple.com/ns/ical/' xmlns:C='urn:ietf:params:xml:ns:caldav'> +// <D:prop> +// <D:resourcetype/> +// <D:owner/> +// <D:displayname/> +// <D:current-user-principal/> +// <D:current-user-privilege-set/> +// <A:calendar-color/> +// <C:calendar-home-set/> +// </D:prop> +// </D:propfind> + +// <D:propfind xmlns:D='DAV:' xmlns:C='urn:ietf:params:xml:ns:caldav' xmlns:CS='http://calendarserver.org/ns/'> +// <D:prop> +// <D:resourcetype/> +// <D:owner/> +// <D:current-user-principal/> +// <D:current-user-privilege-set/> +// <D:supported-report-set/> +// <C:supported-calendar-component-set/> +// <CS:getctag/> +// </D:prop> +// </D:propfind> + +// <C:calendar-multiget xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav"> +// <D:prop> +// <D:getetag/> +// <C:calendar-data/> +// </D:prop> +// <D:href>/alice/calendar/personal/something.ics</D:href> +// </C:calendar-multiget> diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs new file mode 100644 index 0000000..3af3b81 --- /dev/null +++ b/aero-proto/src/dav/node.rs @@ -0,0 +1,145 @@ +use anyhow::Result; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{BoxStream, StreamExt}; +use hyper::body::Bytes; + +use aero_collections::davdag::{Etag, Token}; +use aero_dav::realization::All; +use aero_dav::types as dav; + +use super::controller::ArcUser; + +pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>; +pub(crate) type PropertyStream<'a> = + BoxStream<'a, std::result::Result<dav::Property<All>, dav::PropertyRequest<All>>>; + +pub(crate) enum PutPolicy { + OverwriteAll, + CreateOnly, + ReplaceEtag(String), +} + +/// A DAV node should implement the following methods +/// @FIXME not satisfied by BoxFutures but I have no better idea currently +pub(crate) trait DavNode: Send { + // recurence, filesystem hierarchy + /// This node direct children + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>>; + /// Recursively fetch a child (progress inside the filesystem hierarchy) + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>>; + + // node properties + /// Get the path + fn path(&self, user: &ArcUser) -> String; + /// Get the supported WebDAV properties + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All>; + /// Get the values for the given properties + fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static>; + /// Get the value of the DAV header to return + fn dav_header(&self) -> String; + + /// Put an element (create or update) + fn put<'a>( + &'a self, + policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>>; + /// Content type of the element + fn content_type(&self) -> &str; + /// Get ETag + fn etag(&self) -> BoxFuture<Option<Etag>>; + /// Get content + fn content<'a>(&self) -> Content<'a>; + /// Delete + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>>; + /// Sync + fn diff<'a>( + &self, + sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + >; + + /// Utility function to get a propname response from a node + fn response_propname(&self, user: &ArcUser) -> dav::Response<All> { + dav::Response { + status_or_propstat: dav::StatusOrPropstat::PropStat( + dav::Href(self.path(user)), + vec![dav::PropStat { + status: dav::Status(hyper::StatusCode::OK), + prop: dav::AnyProp( + self.supported_properties(user) + .0 + .into_iter() + .map(dav::AnyProperty::Request) + .collect(), + ), + error: None, + responsedescription: None, + }], + ), + error: None, + location: None, + responsedescription: None, + } + } + + /// Utility function to get a prop response from a node & a list of propname + fn response_props( + &self, + user: &ArcUser, + props: dav::PropName<All>, + ) -> BoxFuture<'static, dav::Response<All>> { + //@FIXME we should make the DAV parsed object a stream... + let mut result_stream = self.properties(user, props); + let path = self.path(user); + + async move { + let mut prop_desc = vec![]; + let (mut found, mut not_found) = (vec![], vec![]); + while let Some(maybe_prop) = result_stream.next().await { + match maybe_prop { + Ok(v) => found.push(dav::AnyProperty::Value(v)), + Err(v) => not_found.push(dav::AnyProperty::Request(v)), + } + } + + // If at least one property has been found on this object, adding a HTTP 200 propstat to + // the response + if !found.is_empty() { + prop_desc.push(dav::PropStat { + status: dav::Status(hyper::StatusCode::OK), + prop: dav::AnyProp(found), + error: None, + responsedescription: None, + }); + } + + // If at least one property can't be found on this object, adding a HTTP 404 propstat to + // the response + if !not_found.is_empty() { + prop_desc.push(dav::PropStat { + status: dav::Status(hyper::StatusCode::NOT_FOUND), + prop: dav::AnyProp(not_found), + error: None, + responsedescription: None, + }) + } + + // Build the finale response + dav::Response { + status_or_propstat: dav::StatusOrPropstat::PropStat(dav::Href(path), prop_desc), + error: None, + location: None, + responsedescription: None, + } + } + .boxed() + } +} diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs new file mode 100644 index 0000000..b5ae029 --- /dev/null +++ b/aero-proto/src/dav/resource.rs @@ -0,0 +1,999 @@ +use std::sync::Arc; +type ArcUser = std::sync::Arc<User>; + +use anyhow::{anyhow, Result}; +use futures::io::AsyncReadExt; +use futures::stream::{StreamExt, TryStreamExt}; +use futures::{future::BoxFuture, future::FutureExt}; + +use aero_collections::{ + calendar::Calendar, + davdag::{BlobId, Etag, SyncChange, Token}, + user::User, +}; +use aero_dav::acltypes as acl; +use aero_dav::caltypes as cal; +use aero_dav::realization::{self as all, All}; +use aero_dav::synctypes as sync; +use aero_dav::types as dav; +use aero_dav::versioningtypes as vers; + +use super::node::PropertyStream; +use crate::dav::node::{Content, DavNode, PutPolicy}; + +/// Why "https://aerogramme.0"? +/// Because tokens must be valid URI. +/// And numeric TLD are ~mostly valid in URI (check the .42 TLD experience) +/// and at the same time, they are not used sold by the ICANN and there is no plan to use them. +/// So I am sure that the URL remains invalid, avoiding leaking requests to an hardcoded URL in the +/// future. +/// The best option would be to make it configurable ofc, so someone can put a domain name +/// that they control, it would probably improve compatibility (maybe some WebDAV spec tells us +/// how to handle/resolve this URI but I am not aware of that...). But that's not the plan for +/// now. So here we are: https://aerogramme.0. +pub const BASE_TOKEN_URI: &str = "https://aerogramme.0/sync/"; + +#[derive(Clone)] +pub(crate) struct RootNode {} +impl DavNode for RootNode { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { + if path.len() == 0 { + let this = self.clone(); + return async { Ok(Box::new(this) as Box<dyn DavNode>) }.boxed(); + } + + if path[0] == user.username { + let child = Box::new(HomeNode {}); + return child.fetch(user, &path[1..], create); + } + + //@NOTE: We can't create a node at this level + async { Err(anyhow!("Not found")) }.boxed() + } + + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> { + async { vec![Box::new(HomeNode {}) as Box<dyn DavNode>] }.boxed() + } + + fn path(&self, user: &ArcUser) -> String { + "/".into() + } + + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> { + dav::PropName(vec![ + dav::PropertyRequest::DisplayName, + dav::PropertyRequest::ResourceType, + dav::PropertyRequest::GetContentType, + dav::PropertyRequest::Extension(all::PropertyRequest::Acl( + acl::PropertyRequest::CurrentUserPrincipal, + )), + ]) + } + + fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> { + let user = user.clone(); + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName("DAV Root".to_string()) + } + dav::PropertyRequest::ResourceType => { + dav::Property::ResourceType(vec![dav::ResourceType::Collection]) + } + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("httpd/unix-directory".into()) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Acl( + acl::PropertyRequest::CurrentUserPrincipal, + )) => dav::Property::Extension(all::Property::Acl( + acl::Property::CurrentUserPrincipal(acl::User::Authenticated(dav::Href( + HomeNode {}.path(&user), + ))), + )), + v => return Err(v), + }; + Ok(prop) + }) + .boxed() + } + + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> { + futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() + } + + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() + } + + fn content_type(&self) -> &str { + "text/plain" + } + + fn etag(&self) -> BoxFuture<Option<Etag>> { + async { None }.boxed() + } + + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> { + async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() + } + + fn diff<'a>( + &self, + _sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } + + fn dav_header(&self) -> String { + "1".into() + } +} + +#[derive(Clone)] +pub(crate) struct HomeNode {} +impl DavNode for HomeNode { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { + if path.len() == 0 { + let node = Box::new(self.clone()) as Box<dyn DavNode>; + return async { Ok(node) }.boxed(); + } + + if path[0] == "calendar" { + return async move { + let child = Box::new(CalendarListNode::new(user).await?); + child.fetch(user, &path[1..], create).await + } + .boxed(); + } + + //@NOTE: we can't create a node at this level + async { Err(anyhow!("Not found")) }.boxed() + } + + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> { + async { + CalendarListNode::new(user) + .await + .map(|c| vec![Box::new(c) as Box<dyn DavNode>]) + .unwrap_or(vec![]) + } + .boxed() + } + + fn path(&self, user: &ArcUser) -> String { + format!("/{}/", user.username) + } + + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> { + dav::PropName(vec![ + dav::PropertyRequest::DisplayName, + dav::PropertyRequest::ResourceType, + dav::PropertyRequest::GetContentType, + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarHomeSet, + )), + ]) + } + fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> { + let user = user.clone(); + + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} home", user.username)) + } + dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ + dav::ResourceType::Collection, + dav::ResourceType::Extension(all::ResourceType::Acl( + acl::ResourceType::Principal, + )), + ]), + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("httpd/unix-directory".into()) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarHomeSet, + )) => dav::Property::Extension(all::Property::Cal( + cal::Property::CalendarHomeSet(dav::Href( + //@FIXME we are hardcoding the calendar path, instead we would want to use + //objects + format!("/{}/calendar/", user.username), + )), + )), + v => return Err(v), + }; + Ok(prop) + }) + .boxed() + } + + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> { + futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() + } + + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() + } + + fn content_type(&self) -> &str { + "text/plain" + } + + fn etag(&self) -> BoxFuture<Option<Etag>> { + async { None }.boxed() + } + + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> { + async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() + } + fn diff<'a>( + &self, + _sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } + + fn dav_header(&self) -> String { + "1, access-control, calendar-access".into() + } +} + +#[derive(Clone)] +pub(crate) struct CalendarListNode { + list: Vec<String>, +} +impl CalendarListNode { + async fn new(user: &ArcUser) -> Result<Self> { + let list = user.calendars.list(user).await?; + Ok(Self { list }) + } +} +impl DavNode for CalendarListNode { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { + if path.len() == 0 { + let node = Box::new(self.clone()) as Box<dyn DavNode>; + return async { Ok(node) }.boxed(); + } + + async move { + //@FIXME: we should create a node if the open returns a "not found". + let cal = user + .calendars + .open(user, path[0]) + .await? + .ok_or(anyhow!("Not found"))?; + let child = Box::new(CalendarNode { + col: cal, + calname: path[0].to_string(), + }); + child.fetch(user, &path[1..], create).await + } + .boxed() + } + + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> { + let list = self.list.clone(); + async move { + //@FIXME maybe we want to be lazy here?! + futures::stream::iter(list.iter()) + .filter_map(|name| async move { + user.calendars + .open(user, name) + .await + .ok() + .flatten() + .map(|v| (name, v)) + }) + .map(|(name, cal)| { + Box::new(CalendarNode { + col: cal, + calname: name.to_string(), + }) as Box<dyn DavNode> + }) + .collect::<Vec<Box<dyn DavNode>>>() + .await + } + .boxed() + } + + fn path(&self, user: &ArcUser) -> String { + format!("/{}/calendar/", user.username) + } + + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> { + dav::PropName(vec![ + dav::PropertyRequest::DisplayName, + dav::PropertyRequest::ResourceType, + dav::PropertyRequest::GetContentType, + ]) + } + fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> { + let user = user.clone(); + + futures::stream::iter(prop.0) + .map(move |n| { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} calendars", user.username)) + } + dav::PropertyRequest::ResourceType => { + dav::Property::ResourceType(vec![dav::ResourceType::Collection]) + } + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("httpd/unix-directory".into()) + } + v => return Err(v), + }; + Ok(prop) + }) + .boxed() + } + + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> { + futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() + } + + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() + } + + fn content_type(&self) -> &str { + "text/plain" + } + + fn etag(&self) -> BoxFuture<Option<Etag>> { + async { None }.boxed() + } + + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> { + async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() + } + fn diff<'a>( + &self, + _sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } + + fn dav_header(&self) -> String { + "1, access-control, calendar-access".into() + } +} + +#[derive(Clone)] +pub(crate) struct CalendarNode { + col: Arc<Calendar>, + calname: String, +} +impl DavNode for CalendarNode { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { + if path.len() == 0 { + let node = Box::new(self.clone()) as Box<dyn DavNode>; + return async { Ok(node) }.boxed(); + } + + let col = self.col.clone(); + let calname = self.calname.clone(); + async move { + match (col.dag().await.idx_by_filename.get(path[0]), create) { + (Some(blob_id), _) => { + let child = Box::new(EventNode { + col: col.clone(), + calname, + filename: path[0].to_string(), + blob_id: *blob_id, + }); + child.fetch(user, &path[1..], create).await + } + (None, true) => { + let child = Box::new(CreateEventNode { + col: col.clone(), + calname, + filename: path[0].to_string(), + }); + child.fetch(user, &path[1..], create).await + } + _ => Err(anyhow!("Not found")), + } + } + .boxed() + } + + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> { + let col = self.col.clone(); + let calname = self.calname.clone(); + + async move { + col.dag() + .await + .idx_by_filename + .iter() + .map(|(filename, blob_id)| { + Box::new(EventNode { + col: col.clone(), + calname: calname.clone(), + filename: filename.to_string(), + blob_id: *blob_id, + }) as Box<dyn DavNode> + }) + .collect() + } + .boxed() + } + + fn path(&self, user: &ArcUser) -> String { + format!("/{}/calendar/{}/", user.username, self.calname) + } + + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> { + dav::PropName(vec![ + dav::PropertyRequest::DisplayName, + dav::PropertyRequest::ResourceType, + dav::PropertyRequest::GetContentType, + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::SupportedCalendarComponentSet, + )), + dav::PropertyRequest::Extension(all::PropertyRequest::Sync( + sync::PropertyRequest::SyncToken, + )), + dav::PropertyRequest::Extension(all::PropertyRequest::Vers( + vers::PropertyRequest::SupportedReportSet, + )), + ]) + } + fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> { + let calname = self.calname.to_string(); + let col = self.col.clone(); + + futures::stream::iter(prop.0) + .then(move |n| { + let calname = calname.clone(); + let col = col.clone(); + + async move { + let prop = match n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} calendar", calname)) + } + dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![ + dav::ResourceType::Collection, + dav::ResourceType::Extension(all::ResourceType::Cal( + cal::ResourceType::Calendar, + )), + ]), + //dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("httpd/unix-directory".into())), + //@FIXME seems wrong but seems to be what Thunderbird expects... + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("text/calendar".into()) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::SupportedCalendarComponentSet, + )) => dav::Property::Extension(all::Property::Cal( + cal::Property::SupportedCalendarComponentSet(vec![ + cal::CompSupport(cal::Component::VEvent), + cal::CompSupport(cal::Component::VTodo), + cal::CompSupport(cal::Component::VJournal), + ]), + )), + dav::PropertyRequest::Extension(all::PropertyRequest::Sync( + sync::PropertyRequest::SyncToken, + )) => match col.token().await { + Ok(token) => dav::Property::Extension(all::Property::Sync( + sync::Property::SyncToken(sync::SyncToken(format!( + "{}{}", + BASE_TOKEN_URI, token + ))), + )), + _ => return Err(n.clone()), + }, + dav::PropertyRequest::Extension(all::PropertyRequest::Vers( + vers::PropertyRequest::SupportedReportSet, + )) => dav::Property::Extension(all::Property::Vers( + vers::Property::SupportedReportSet(vec![ + vers::SupportedReport(vers::ReportName::Extension( + all::ReportTypeName::Cal(cal::ReportTypeName::Multiget), + )), + vers::SupportedReport(vers::ReportName::Extension( + all::ReportTypeName::Cal(cal::ReportTypeName::Query), + )), + vers::SupportedReport(vers::ReportName::Extension( + all::ReportTypeName::Sync(sync::ReportTypeName::SyncCollection), + )), + ]), + )), + v => return Err(v), + }; + Ok(prop) + } + }) + .boxed() + } + + fn put<'a>( + &'a self, + _policy: PutPolicy, + _stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> { + futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported)).boxed() + } + + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() + } + + fn content_type(&self) -> &str { + "text/plain" + } + + fn etag(&self) -> BoxFuture<Option<Etag>> { + async { None }.boxed() + } + + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> { + async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() + } + fn diff<'a>( + &self, + sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + > { + let col = self.col.clone(); + let calname = self.calname.clone(); + async move { + let sync_token = match sync_token { + Some(v) => v, + None => { + let token = col + .token() + .await + .or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted)))?; + let ok_nodes = col + .dag() + .await + .idx_by_filename + .iter() + .map(|(filename, blob_id)| { + Box::new(EventNode { + col: col.clone(), + calname: calname.clone(), + filename: filename.to_string(), + blob_id: *blob_id, + }) as Box<dyn DavNode> + }) + .collect(); + + return Ok((token, ok_nodes, vec![])); + } + }; + let (new_token, listed_changes) = match col.diff(sync_token).await { + Ok(v) => v, + Err(e) => { + tracing::info!(err=?e, "token resolution failed, maybe a forgotten token"); + return Err(std::io::Error::from(std::io::ErrorKind::NotFound)); + } + }; + + let mut ok_nodes: Vec<Box<dyn DavNode>> = vec![]; + let mut rm_nodes: Vec<dav::Href> = vec![]; + for change in listed_changes.into_iter() { + match change { + SyncChange::Ok((filename, blob_id)) => { + let child = Box::new(EventNode { + col: col.clone(), + calname: calname.clone(), + filename, + blob_id, + }); + ok_nodes.push(child); + } + SyncChange::NotFound(filename) => { + rm_nodes.push(dav::Href(filename)); + } + } + } + + Ok((new_token, ok_nodes, rm_nodes)) + } + .boxed() + } + fn dav_header(&self) -> String { + "1, access-control, calendar-access".into() + } +} + +#[derive(Clone)] +pub(crate) struct EventNode { + col: Arc<Calendar>, + calname: String, + filename: String, + blob_id: BlobId, +} + +impl DavNode for EventNode { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { + if path.len() == 0 { + let node = Box::new(self.clone()) as Box<dyn DavNode>; + return async { Ok(node) }.boxed(); + } + + async { + Err(anyhow!( + "Not supported: can't create a child on an event node" + )) + } + .boxed() + } + + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> { + async { vec![] }.boxed() + } + + fn path(&self, user: &ArcUser) -> String { + format!( + "/{}/calendar/{}/{}", + user.username, self.calname, self.filename + ) + } + + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> { + dav::PropName(vec![ + dav::PropertyRequest::DisplayName, + dav::PropertyRequest::ResourceType, + dav::PropertyRequest::GetEtag, + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarData(cal::CalendarDataRequest::default()), + )), + ]) + } + fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> { + let this = self.clone(); + + futures::stream::iter(prop.0) + .then(move |n| { + let this = this.clone(); + + async move { + let prop = match &n { + dav::PropertyRequest::DisplayName => { + dav::Property::DisplayName(format!("{} event", this.filename)) + } + dav::PropertyRequest::ResourceType => dav::Property::ResourceType(vec![]), + dav::PropertyRequest::GetContentType => { + dav::Property::GetContentType("text/calendar".into()) + } + dav::PropertyRequest::GetEtag => { + let etag = this.etag().await.ok_or(n.clone())?; + dav::Property::GetEtag(etag) + } + dav::PropertyRequest::Extension(all::PropertyRequest::Cal( + cal::PropertyRequest::CalendarData(req), + )) => { + let ics = String::from_utf8( + this.col.get(this.blob_id).await.or(Err(n.clone()))?, + ) + .or(Err(n.clone()))?; + + let new_ics = match &req.comp { + None => ics, + Some(prune_comp) => { + // parse content + let ics = match icalendar::parser::read_calendar(&ics) { + Ok(v) => v, + Err(e) => { + tracing::warn!(err=?e, "Unable to parse ICS in calendar-query"); + return Err(n.clone()) + } + }; + + // build a fake vcal component for caldav compat + let fake_vcal_component = icalendar::parser::Component { + name: cal::Component::VCalendar.as_str().into(), + properties: ics.properties, + components: ics.components, + }; + + // rebuild component + let new_comp = match aero_ical::prune::component(&fake_vcal_component, prune_comp) { + Some(v) => v, + None => return Err(n.clone()), + }; + + // reserialize + format!("{}", icalendar::parser::Calendar { properties: new_comp.properties, components: new_comp.components }) + }, + }; + + + + dav::Property::Extension(all::Property::Cal( + cal::Property::CalendarData(cal::CalendarDataPayload { + mime: None, + payload: new_ics, + }), + )) + } + _ => return Err(n), + }; + Ok(prop) + } + }) + .boxed() + } + + fn put<'a>( + &'a self, + policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> { + async { + let existing_etag = self + .etag() + .await + .ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Etag error"))?; + match policy { + PutPolicy::CreateOnly => { + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)) + } + PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => { + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)) + } + _ => (), + }; + + //@FIXME for now, our storage interface does not allow streaming, + // so we load everything in memory + let mut evt = Vec::new(); + let mut reader = stream.into_async_read(); + reader + .read_to_end(&mut evt) + .await + .or(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))?; + let (_token, entry) = self + .col + .put(self.filename.as_str(), evt.as_ref()) + .await + .or(Err(std::io::ErrorKind::Interrupted))?; + self.col + .opportunistic_sync() + .await + .or(Err(std::io::ErrorKind::ConnectionReset))?; + Ok(entry.2) + } + .boxed() + } + + fn content<'a>(&self) -> Content<'a> { + //@FIXME for now, our storage interface does not allow streaming, + // so we load everything in memory + let calendar = self.col.clone(); + let blob_id = self.blob_id.clone(); + let calblob = async move { + let raw_ics = calendar + .get(blob_id) + .await + .or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted)))?; + + Ok(hyper::body::Bytes::from(raw_ics)) + }; + futures::stream::once(Box::pin(calblob)).boxed() + } + + fn content_type(&self) -> &str { + "text/calendar" + } + + fn etag(&self) -> BoxFuture<Option<Etag>> { + let calendar = self.col.clone(); + + async move { + calendar + .dag() + .await + .table + .get(&self.blob_id) + .map(|(_, _, etag)| etag.to_string()) + } + .boxed() + } + + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> { + let calendar = self.col.clone(); + let blob_id = self.blob_id.clone(); + + async move { + let _token = match calendar.delete(blob_id).await { + Ok(v) => v, + Err(e) => { + tracing::error!(err=?e, "delete event node"); + return Err(std::io::Error::from(std::io::ErrorKind::Interrupted)); + } + }; + calendar + .opportunistic_sync() + .await + .or(Err(std::io::ErrorKind::ConnectionReset))?; + Ok(()) + } + .boxed() + } + fn diff<'a>( + &self, + _sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } + + fn dav_header(&self) -> String { + "1, access-control".into() + } +} + +#[derive(Clone)] +pub(crate) struct CreateEventNode { + col: Arc<Calendar>, + calname: String, + filename: String, +} +impl DavNode for CreateEventNode { + fn fetch<'a>( + &self, + user: &'a ArcUser, + path: &'a [&str], + create: bool, + ) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { + if path.len() == 0 { + let node = Box::new(self.clone()) as Box<dyn DavNode>; + return async { Ok(node) }.boxed(); + } + + async { + Err(anyhow!( + "Not supported: can't create a child on an event node" + )) + } + .boxed() + } + + fn children<'a>(&self, user: &'a ArcUser) -> BoxFuture<'a, Vec<Box<dyn DavNode>>> { + async { vec![] }.boxed() + } + + fn path(&self, user: &ArcUser) -> String { + format!( + "/{}/calendar/{}/{}", + user.username, self.calname, self.filename + ) + } + + fn supported_properties(&self, user: &ArcUser) -> dav::PropName<All> { + dav::PropName(vec![]) + } + + fn properties(&self, _user: &ArcUser, prop: dav::PropName<All>) -> PropertyStream<'static> { + futures::stream::iter(vec![]).boxed() + } + + fn put<'a>( + &'a self, + _policy: PutPolicy, + stream: Content<'a>, + ) -> BoxFuture<'a, std::result::Result<Etag, std::io::Error>> { + //@NOTE: policy might not be needed here: whatever we put, there is no known entries here + + async { + //@FIXME for now, our storage interface does not allow for streaming + let mut evt = Vec::new(); + let mut reader = stream.into_async_read(); + reader.read_to_end(&mut evt).await.unwrap(); + let (_token, entry) = self + .col + .put(self.filename.as_str(), evt.as_ref()) + .await + .or(Err(std::io::ErrorKind::Interrupted))?; + self.col + .opportunistic_sync() + .await + .or(Err(std::io::ErrorKind::ConnectionReset))?; + Ok(entry.2) + } + .boxed() + } + + fn content<'a>(&self) -> Content<'a> { + futures::stream::once(futures::future::err(std::io::Error::from( + std::io::ErrorKind::Unsupported, + ))) + .boxed() + } + + fn content_type(&self) -> &str { + "text/plain" + } + + fn etag(&self) -> BoxFuture<Option<Etag>> { + async { None }.boxed() + } + + fn delete(&self) -> BoxFuture<std::result::Result<(), std::io::Error>> { + // Nothing to delete + async { Ok(()) }.boxed() + } + fn diff<'a>( + &self, + _sync_token: Option<Token>, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec<Box<dyn DavNode>>, Vec<dav::Href>), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } + + fn dav_header(&self) -> String { + "1, access-control".into() + } +} |