diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-04-23 15:20:29 +0200 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-04-23 15:20:29 +0200 |
commit | 50ce8621c2eaf91c46be0a2a9c2b82b19e66880b (patch) | |
tree | a2a90b18d23b11e92818a0bdced912e33b1d576d | |
parent | 4594e068dbba3d3d704728449fc6ccaaadaa82f1 (diff) | |
download | aerogramme-50ce8621c2eaf91c46be0a2a9c2b82b19e66880b.tar.gz aerogramme-50ce8621c2eaf91c46be0a2a9c2b82b19e66880b.zip |
GET implementation
-rw-r--r-- | aero-proto/src/dav/codec.rs | 11 | ||||
-rw-r--r-- | aero-proto/src/dav/controller.rs | 31 | ||||
-rw-r--r-- | aero-proto/src/dav/middleware.rs | 8 | ||||
-rw-r--r-- | aero-proto/src/dav/node.rs | 7 | ||||
-rw-r--r-- | aero-proto/src/dav/resource.rs | 46 |
5 files changed, 76 insertions, 27 deletions
diff --git a/aero-proto/src/dav/codec.rs b/aero-proto/src/dav/codec.rs index e317a03..9082d0a 100644 --- a/aero-proto/src/dav/codec.rs +++ b/aero-proto/src/dav/codec.rs @@ -6,7 +6,7 @@ use futures::stream::StreamExt; use futures::stream::TryStreamExt; use http_body_util::BodyStream; use http_body_util::StreamBody; -use http_body_util::combinators::BoxBody; +use http_body_util::combinators::UnsyncBoxBody; use hyper::body::Frame; use tokio_util::sync::PollSender; use std::io::{Error, ErrorKind}; @@ -16,6 +16,7 @@ use http_body_util::BodyExt; use aero_dav::types as dav; use aero_dav::xml as dxml; +use super::controller::HttpResponse; pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth { match req.headers().get("Depth").map(hyper::header::HeaderValue::to_str) { @@ -26,11 +27,11 @@ pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth { } } -pub(crate) fn text_body(txt: &'static str) -> BoxBody<Bytes, std::io::Error> { - BoxBody::new(Full::new(Bytes::from(txt)).map_err(|e| match e {})) +pub(crate) fn text_body(txt: &'static str) -> UnsyncBoxBody<Bytes, std::io::Error> { + UnsyncBoxBody::new(Full::new(Bytes::from(txt)).map_err(|e| match e {})) } -pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::StatusCode, elem: T) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { +pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::StatusCode, elem: T) -> Result<HttpResponse> { let (tx, rx) = tokio::sync::mpsc::channel::<Bytes>(1); // Build the writer @@ -55,7 +56,7 @@ pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::Stat // Build the reader let recv = tokio_stream::wrappers::ReceiverStream::new(rx); let stream = StreamBody::new(recv.map(|v| Ok(Frame::data(v)))); - let boxed_body = BoxBody::new(stream); + let boxed_body = UnsyncBoxBody::new(stream); let response = Response::builder() .status(status_ok) diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs index c8432dd..de6403e 100644 --- a/aero-proto/src/dav/controller.rs +++ b/aero-proto/src/dav/controller.rs @@ -1,8 +1,10 @@ use anyhow::Result; -use http_body_util::combinators::BoxBody; +use http_body_util::combinators::{UnsyncBoxBody, BoxBody}; use hyper::body::Incoming; use hyper::{Request, Response, body::Bytes}; use http_body_util::BodyStream; +use http_body_util::StreamBody; +use hyper::body::Frame; use futures::stream::{StreamExt, TryStreamExt}; use aero_collections::user::User; @@ -15,7 +17,8 @@ use crate::dav::node::{DavNode, PutPolicy}; use crate::dav::resource::RootNode; use crate::dav::codec; -type ArcUser = std::sync::Arc<User>; +pub(super) type ArcUser = std::sync::Arc<User>; +pub(super) type HttpResponse = Response<UnsyncBoxBody<Bytes, std::io::Error>>; const ALLPROP: [dav::PropertyRequest<All>; 10] = [ dav::PropertyRequest::CreationDate, @@ -36,7 +39,7 @@ pub(crate) struct Controller { req: Request<Incoming>, } impl Controller { - pub(crate) async fn route(user: std::sync::Arc<User>, req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { + pub(crate) async fn route(user: std::sync::Arc<User>, req: Request<Incoming>) -> Result<HttpResponse> { let path = req.uri().path().to_string(); let path_segments: Vec<_> = path.split("/").filter(|s| *s != "").collect(); let method = req.method().as_str().to_uppercase(); @@ -60,12 +63,13 @@ impl Controller { .header("DAV", "1") .header("Allow", "HEAD,GET,PUT,OPTIONS,DELETE,PROPFIND,PROPPATCH,MKCOL,COPY,MOVE,LOCK,UNLOCK,MKCALENDAR,REPORT") .body(codec::text_body(""))?), - "HEAD" | "GET" => { - tracing::warn!("HEAD+GET not correctly implemented"); + "HEAD" => { + tracing::warn!("HEAD not correctly implemented"); Ok(Response::builder() .status(404) .body(codec::text_body(""))?) }, + "GET" => ctrl.get().await, "PUT" => ctrl.put().await, "DELETE" => { todo!(); @@ -87,7 +91,7 @@ impl Controller { /// Note: current implementation is not generic at all, it is heavily tied to CalDAV. /// A rewrite would be required to make it more generic (with the extension system that has /// been introduced in aero-dav) - async fn report(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { + async fn report(self) -> Result<HttpResponse> { let status = hyper::StatusCode::from_u16(207)?; let report = match deserialize::<cal::Report<All>>(self.req).await { @@ -135,7 +139,7 @@ impl Controller { } /// PROPFIND is the standard way to fetch WebDAV properties - async fn propfind(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { + async fn propfind(self) -> Result<HttpResponse> { let depth = depth(&self.req); if matches!(depth, dav::Depth::Infinity) { return Ok(Response::builder() @@ -175,7 +179,7 @@ impl Controller { serialize(status, Self::multistatus(&self.user, nodes, not_found, propname)) } - async fn put(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { + async fn put(self) -> Result<HttpResponse> { //@FIXME temporary, look at If-None-Match & If-Match headers let put_policy = PutPolicy::CreateOnly; @@ -199,20 +203,19 @@ impl Controller { Ok(response) } - async fn get(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { - todo!() - /*let stream = StreamBody::new(self.node.get().map(|v| Ok(Frame::data(v)))); - let boxed_body = BoxBody::new(stream); + async fn get(self) -> Result<HttpResponse> { + let stream_body = StreamBody::new(self.node.content().await.map_ok(|v| Frame::data(v))); + let boxed_body = UnsyncBoxBody::new(stream_body); let response = Response::builder() .status(200) //.header("content-type", "application/xml; charset=\"utf-8\"") .body(boxed_body)?; - Ok(response)*/ + Ok(response) } - // --- Common utulity functions --- + // --- Common utility functions --- /// Build a multistatus response from a list of DavNodes fn multistatus(user: &ArcUser, nodes: Vec<Box<dyn DavNode>>, not_found: Vec<dav::Href>, props: Option<dav::PropName<All>>) -> dav::Multistatus<All> { // Collect properties on existing objects diff --git a/aero-proto/src/dav/middleware.rs b/aero-proto/src/dav/middleware.rs index c4edbd8..e19ce14 100644 --- a/aero-proto/src/dav/middleware.rs +++ b/aero-proto/src/dav/middleware.rs @@ -1,21 +1,21 @@ use anyhow::{anyhow, Result}; use base64::Engine; -use hyper::{Request, Response, body::Bytes}; +use hyper::{Request, Response}; use hyper::body::Incoming; -use http_body_util::combinators::BoxBody; use aero_user::login::ArcLoginProvider; use aero_collections::user::User; use super::codec::text_body; +use super::controller::HttpResponse; type ArcUser = std::sync::Arc<User>; pub(super) async fn auth<'a>( login: ArcLoginProvider, req: Request<Incoming>, - next: impl Fn(ArcUser, Request<Incoming>) -> futures::future::BoxFuture<'a, Result<Response<BoxBody<Bytes, std::io::Error>>>>, -) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { + next: impl Fn(ArcUser, Request<Incoming>) -> futures::future::BoxFuture<'a, Result<HttpResponse>>, +) -> Result<HttpResponse> { let auth_val = match req.headers().get(hyper::header::AUTHORIZATION) { Some(hv) => hv.to_str()?, None => { diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs index 96586ad..e2835e9 100644 --- a/aero-proto/src/dav/node.rs +++ b/aero-proto/src/dav/node.rs @@ -5,9 +5,10 @@ use hyper::body::Bytes; use aero_dav::types as dav; use aero_dav::realization::All; -use aero_collections::{user::User, davdag::Etag}; +use aero_collections::davdag::Etag; + +use super::controller::ArcUser; -type ArcUser = std::sync::Arc<User>; pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>; pub(crate) enum PutPolicy { @@ -34,7 +35,7 @@ pub(crate) trait DavNode: Send { /// Put an element (create or update) fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>>; /// Get content - //fn content(&self) -> TryStream; + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>>; //@FIXME maybe add etag, maybe add a way to set content diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs index 02a246e..bd377fb 100644 --- a/aero-proto/src/dav/resource.rs +++ b/aero-proto/src/dav/resource.rs @@ -64,6 +64,12 @@ impl DavNode for RootNode { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } + + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> { + async { + futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + }.boxed() + } } #[derive(Clone)] @@ -127,6 +133,12 @@ impl DavNode for HomeNode { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } + + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> { + async { + futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + }.boxed() + } } #[derive(Clone)] @@ -200,6 +212,12 @@ impl DavNode for CalendarListNode { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } + + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> { + async { + futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + }.boxed() + } } #[derive(Clone)] @@ -290,6 +308,12 @@ impl DavNode for CalendarNode { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } + + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> { + async { + futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + }.boxed() + } } const FAKE_ICS: &str = r#"BEGIN:VCALENDAR @@ -386,7 +410,8 @@ impl DavNode for EventNode { _ => () }; - //@FIXME for now, our storage interface does not allow for streaming + //@FIXME for now, our storage interface does not allow streaming, + // so we load everything in memory let mut evt = Vec::new(); let mut reader = stream.into_async_read(); reader.read_to_end(&mut evt).await.unwrap(); @@ -394,6 +419,19 @@ impl DavNode for EventNode { Ok(entry.2) }.boxed() } + + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> { + async { + //@FIXME for now, our storage interface does not allow streaming, + // so we load everything in memory + let content = self.col.get(self.blob_id).await.or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted))); + let r = async { + Ok(hyper::body::Bytes::from(content?)) + }; + //tokio::pin!(r); + futures::stream::once(Box::pin(r)).boxed() + }.boxed() + } } #[derive(Clone)] @@ -440,4 +478,10 @@ impl DavNode for CreateEventNode { Ok(entry.2) }.boxed() } + + fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> { + async { + futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed() + }.boxed() + } } |