aboutsummaryrefslogtreecommitdiff
path: root/aero-proto
diff options
context:
space:
mode:
Diffstat (limited to 'aero-proto')
-rw-r--r--aero-proto/src/dav/controller.rs34
-rw-r--r--aero-proto/src/dav/node.rs11
-rw-r--r--aero-proto/src/dav/resource.rs56
3 files changed, 74 insertions, 27 deletions
diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs
index 243a455..c8432dd 100644
--- a/aero-proto/src/dav/controller.rs
+++ b/aero-proto/src/dav/controller.rs
@@ -2,7 +2,8 @@ use anyhow::Result;
use http_body_util::combinators::BoxBody;
use hyper::body::Incoming;
use hyper::{Request, Response, body::Bytes};
-use http_body_util::StreamBody;
+use http_body_util::BodyStream;
+use futures::stream::{StreamExt, TryStreamExt};
use aero_collections::user::User;
use aero_dav::types as dav;
@@ -10,7 +11,7 @@ use aero_dav::realization::All;
use aero_dav::caltypes as cal;
use crate::dav::codec::{serialize, deserialize, depth, text_body};
-use crate::dav::node::DavNode;
+use crate::dav::node::{DavNode, PutPolicy};
use crate::dav::resource::RootNode;
use crate::dav::codec;
@@ -65,10 +66,7 @@ impl Controller {
.status(404)
.body(codec::text_body(""))?)
},
- "PUT" => {
- let to_create = path_segments.last().expect("Bound checked earlier in this fx");
- ctrl.put(to_create).await
- },
+ "PUT" => ctrl.put().await,
"DELETE" => {
todo!();
},
@@ -177,8 +175,28 @@ impl Controller {
serialize(status, Self::multistatus(&self.user, nodes, not_found, propname))
}
- async fn put(self, child: &str) -> Result<Response<BoxBody<Bytes, std::io::Error>>> {
- todo!()
+ async fn put(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> {
+ //@FIXME temporary, look at If-None-Match & If-Match headers
+ let put_policy = PutPolicy::CreateOnly;
+
+ let stream_of_frames = BodyStream::new(self.req.into_body());
+ let stream_of_bytes = stream_of_frames
+ .map_ok(|frame| frame.into_data())
+ .map(|obj| match obj {
+ Ok(Ok(v)) => Ok(v),
+ Ok(Err(_)) => Err(std::io::Error::new(std::io::ErrorKind::Other, "conversion error")),
+ Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
+ }).boxed();
+
+ let etag = self.node.put(put_policy, stream_of_bytes).await?;
+
+ let response = Response::builder()
+ .status(201)
+ .header("ETag", etag)
+ //.header("content-type", "application/xml; charset=\"utf-8\"")
+ .body(text_body(""))?;
+
+ Ok(response)
}
async fn get(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> {
diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs
index 96bd52b..96586ad 100644
--- a/aero-proto/src/dav/node.rs
+++ b/aero-proto/src/dav/node.rs
@@ -1,17 +1,18 @@
use anyhow::Result;
-use futures::Stream;
+use futures::stream::{BoxStream, Stream};
use futures::future::BoxFuture;
+use hyper::body::Bytes;
use aero_dav::types as dav;
use aero_dav::realization::All;
-use aero_collections::user::User;
+use aero_collections::{user::User, davdag::Etag};
type ArcUser = std::sync::Arc<User>;
-pub(crate) type Content = Box<dyn Stream<Item=Result<u64>>>;
+pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>;
pub(crate) enum PutPolicy {
CreateOnly,
- ReplaceEtags(String),
+ ReplaceEtag(String),
}
/// A DAV node should implement the following methods
@@ -31,7 +32,7 @@ pub(crate) trait DavNode: Send {
/// Get the values for the given properties
fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> Vec<dav::AnyProperty<All>>;
/// Put an element (create or update)
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>>;
+ fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>>;
/// Get content
//fn content(&self) -> TryStream;
diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs
index 2269de2..02a246e 100644
--- a/aero-proto/src/dav/resource.rs
+++ b/aero-proto/src/dav/resource.rs
@@ -1,17 +1,17 @@
use std::sync::Arc;
type ArcUser = std::sync::Arc<User>;
-use anyhow::{anyhow, Result};
-use futures::stream::{TryStream, StreamExt};
+use anyhow::{anyhow, bail, Result};
+use futures::stream::{TryStream, TryStreamExt, StreamExt};
+use futures::io::AsyncReadExt;
use futures::{future::BoxFuture, future::FutureExt};
-use aero_collections::{user::User, calendar::Calendar, davdag::BlobId};
+use aero_collections::{user::User, calendar::Calendar, davdag::{BlobId, IndexEntry, Etag}};
use aero_dav::types as dav;
use aero_dav::caltypes as cal;
use aero_dav::acltypes as acl;
use aero_dav::realization::{All, self as all};
-
use crate::dav::node::{DavNode, PutPolicy, Content};
#[derive(Clone)]
@@ -61,7 +61,7 @@ impl DavNode for RootNode {
}).collect()
}
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> {
+ fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!()
}
}
@@ -124,7 +124,7 @@ impl DavNode for HomeNode {
}).collect()
}
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> {
+ fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!()
}
}
@@ -197,7 +197,7 @@ impl DavNode for CalendarListNode {
}).collect()
}
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> {
+ fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!()
}
}
@@ -287,7 +287,7 @@ impl DavNode for CalendarNode {
}).collect()
}
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> {
+ fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!()
}
}
@@ -330,6 +330,12 @@ pub(crate) struct EventNode {
filename: String,
blob_id: BlobId,
}
+impl EventNode {
+ async fn etag(&self) -> Result<Etag> {
+ self.col.dag().await.table.get(&self.blob_id).map(|(_, _, etag)| etag.to_string()).ok_or(anyhow!("Missing blob id in index"))
+ }
+}
+
impl DavNode for EventNode {
fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> {
if path.len() == 0 {
@@ -362,7 +368,7 @@ impl DavNode for EventNode {
dav::PropertyRequest::ResourceType => dav::AnyProperty::Value(dav::Property::ResourceType(vec![])),
dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("text/calendar".into())),
dav::PropertyRequest::GetEtag => dav::AnyProperty::Value(dav::Property::GetEtag("\"abcdefg\"".into())),
- dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(req))) =>
+ dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(_req))) =>
dav::AnyProperty::Value(dav::Property::Extension(all::Property::Cal(cal::Property::CalendarData(cal::CalendarDataPayload {
mime: None,
payload: FAKE_ICS.into()
@@ -371,8 +377,22 @@ impl DavNode for EventNode {
}).collect()
}
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> {
- todo!()
+ fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
+ async {
+ let existing_etag = self.etag().await?;
+ match policy {
+ PutPolicy::CreateOnly => bail!("Already existing"),
+ PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => bail!("Would overwrite something we don't know"),
+ _ => ()
+ };
+
+ //@FIXME for now, our storage interface does not allow for streaming
+ let mut evt = Vec::new();
+ let mut reader = stream.into_async_read();
+ reader.read_to_end(&mut evt).await.unwrap();
+ let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await?;
+ Ok(entry.2)
+ }.boxed()
}
}
@@ -408,8 +428,16 @@ impl DavNode for CreateEventNode {
vec![]
}
- fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> {
- //@TODO write file
- todo!()
+ fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
+ //@NOTE: policy might not be needed here: whatever we put, there is no known entries here
+
+ async {
+ //@FIXME for now, our storage interface does not allow for streaming
+ let mut evt = Vec::new();
+ let mut reader = stream.into_async_read();
+ reader.read_to_end(&mut evt).await.unwrap();
+ let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await?;
+ Ok(entry.2)
+ }.boxed()
}
}