diff options
-rw-r--r-- | aero-collections/src/calendar/mod.rs | 16 | ||||
-rw-r--r-- | aero-proto/src/dav/controller.rs | 34 | ||||
-rw-r--r-- | aero-proto/src/dav/node.rs | 11 | ||||
-rw-r--r-- | aero-proto/src/dav/resource.rs | 56 | ||||
-rw-r--r-- | aero-user/src/storage/garage.rs | 14 | ||||
-rw-r--r-- | aero-user/src/storage/in_memory.rs | 16 | ||||
-rw-r--r-- | aero-user/src/storage/mod.rs | 2 |
7 files changed, 106 insertions, 43 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 127f41b..feae73e 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -71,8 +71,8 @@ impl Calendar { } /// Put a specific event - pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> { - self.internal.write().await.put(entry, evt).await + pub async fn put<'a>(&self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> { + self.internal.write().await.put(name, evt).await } /// Delete a specific event @@ -123,8 +123,9 @@ impl CalendarInternal { cryptoblob::open(&body, &message_key) } - async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> { + async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> { let message_key = gen_key(); + let blob_id = gen_ident(); let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?; let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key); @@ -132,22 +133,23 @@ impl CalendarInternal { // Write event to S3 let message_blob = cryptoblob::seal(evt, &message_key)?; let blob_val = BlobVal::new( - BlobRef(format!("{}/{}", self.cal_path, entry.0)), + BlobRef(format!("{}/{}", self.cal_path, blob_id)), message_blob, ) .with_meta(MESSAGE_KEY.to_string(), key_header); - self.storage + let etag = self.storage .blob_insert(blob_val) .await?; // Add entry to Bayou + let entry: IndexEntry = (blob_id, name.to_string(), etag); let davstate = self.davdag.state(); - let put_op = davstate.op_put(entry); + let put_op = davstate.op_put(entry.clone()); let token = put_op.token(); self.davdag.push(put_op).await?; - Ok(token) + Ok((token, entry)) } async fn delete(&mut self, blob_id: BlobId) -> Result<Token> { diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs index 243a455..c8432dd 100644 --- a/aero-proto/src/dav/controller.rs +++ b/aero-proto/src/dav/controller.rs @@ -2,7 +2,8 @@ use anyhow::Result; use http_body_util::combinators::BoxBody; use hyper::body::Incoming; use hyper::{Request, Response, body::Bytes}; -use http_body_util::StreamBody; +use http_body_util::BodyStream; +use futures::stream::{StreamExt, TryStreamExt}; use aero_collections::user::User; use aero_dav::types as dav; @@ -10,7 +11,7 @@ use aero_dav::realization::All; use aero_dav::caltypes as cal; use crate::dav::codec::{serialize, deserialize, depth, text_body}; -use crate::dav::node::DavNode; +use crate::dav::node::{DavNode, PutPolicy}; use crate::dav::resource::RootNode; use crate::dav::codec; @@ -65,10 +66,7 @@ impl Controller { .status(404) .body(codec::text_body(""))?) }, - "PUT" => { - let to_create = path_segments.last().expect("Bound checked earlier in this fx"); - ctrl.put(to_create).await - }, + "PUT" => ctrl.put().await, "DELETE" => { todo!(); }, @@ -177,8 +175,28 @@ impl Controller { serialize(status, Self::multistatus(&self.user, nodes, not_found, propname)) } - async fn put(self, child: &str) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { - todo!() + async fn put(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { + //@FIXME temporary, look at If-None-Match & If-Match headers + let put_policy = PutPolicy::CreateOnly; + + let stream_of_frames = BodyStream::new(self.req.into_body()); + let stream_of_bytes = stream_of_frames + .map_ok(|frame| frame.into_data()) + .map(|obj| match obj { + Ok(Ok(v)) => Ok(v), + Ok(Err(_)) => Err(std::io::Error::new(std::io::ErrorKind::Other, "conversion error")), + Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)), + }).boxed(); + + let etag = self.node.put(put_policy, stream_of_bytes).await?; + + let response = Response::builder() + .status(201) + .header("ETag", etag) + //.header("content-type", "application/xml; charset=\"utf-8\"") + .body(text_body(""))?; + + Ok(response) } async fn get(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs index 96bd52b..96586ad 100644 --- a/aero-proto/src/dav/node.rs +++ b/aero-proto/src/dav/node.rs @@ -1,17 +1,18 @@ use anyhow::Result; -use futures::Stream; +use futures::stream::{BoxStream, Stream}; use futures::future::BoxFuture; +use hyper::body::Bytes; use aero_dav::types as dav; use aero_dav::realization::All; -use aero_collections::user::User; +use aero_collections::{user::User, davdag::Etag}; type ArcUser = std::sync::Arc<User>; -pub(crate) type Content = Box<dyn Stream<Item=Result<u64>>>; +pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>; pub(crate) enum PutPolicy { CreateOnly, - ReplaceEtags(String), + ReplaceEtag(String), } /// A DAV node should implement the following methods @@ -31,7 +32,7 @@ pub(crate) trait DavNode: Send { /// Get the values for the given properties fn properties(&self, user: &ArcUser, prop: dav::PropName<All>) -> Vec<dav::AnyProperty<All>>; /// Put an element (create or update) - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>>; + fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>>; /// Get content //fn content(&self) -> TryStream; diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs index 2269de2..02a246e 100644 --- a/aero-proto/src/dav/resource.rs +++ b/aero-proto/src/dav/resource.rs @@ -1,17 +1,17 @@ use std::sync::Arc; type ArcUser = std::sync::Arc<User>; -use anyhow::{anyhow, Result}; -use futures::stream::{TryStream, StreamExt}; +use anyhow::{anyhow, bail, Result}; +use futures::stream::{TryStream, TryStreamExt, StreamExt}; +use futures::io::AsyncReadExt; use futures::{future::BoxFuture, future::FutureExt}; -use aero_collections::{user::User, calendar::Calendar, davdag::BlobId}; +use aero_collections::{user::User, calendar::Calendar, davdag::{BlobId, IndexEntry, Etag}}; use aero_dav::types as dav; use aero_dav::caltypes as cal; use aero_dav::acltypes as acl; use aero_dav::realization::{All, self as all}; - use crate::dav::node::{DavNode, PutPolicy, Content}; #[derive(Clone)] @@ -61,7 +61,7 @@ impl DavNode for RootNode { }).collect() } - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> { + fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } } @@ -124,7 +124,7 @@ impl DavNode for HomeNode { }).collect() } - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> { + fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } } @@ -197,7 +197,7 @@ impl DavNode for CalendarListNode { }).collect() } - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> { + fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } } @@ -287,7 +287,7 @@ impl DavNode for CalendarNode { }).collect() } - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> { + fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { todo!() } } @@ -330,6 +330,12 @@ pub(crate) struct EventNode { filename: String, blob_id: BlobId, } +impl EventNode { + async fn etag(&self) -> Result<Etag> { + self.col.dag().await.table.get(&self.blob_id).map(|(_, _, etag)| etag.to_string()).ok_or(anyhow!("Missing blob id in index")) + } +} + impl DavNode for EventNode { fn fetch<'a>(&self, user: &'a ArcUser, path: &'a [&str], create: bool) -> BoxFuture<'a, Result<Box<dyn DavNode>>> { if path.len() == 0 { @@ -362,7 +368,7 @@ impl DavNode for EventNode { dav::PropertyRequest::ResourceType => dav::AnyProperty::Value(dav::Property::ResourceType(vec![])), dav::PropertyRequest::GetContentType => dav::AnyProperty::Value(dav::Property::GetContentType("text/calendar".into())), dav::PropertyRequest::GetEtag => dav::AnyProperty::Value(dav::Property::GetEtag("\"abcdefg\"".into())), - dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(req))) => + dav::PropertyRequest::Extension(all::PropertyRequest::Cal(cal::PropertyRequest::CalendarData(_req))) => dav::AnyProperty::Value(dav::Property::Extension(all::Property::Cal(cal::Property::CalendarData(cal::CalendarDataPayload { mime: None, payload: FAKE_ICS.into() @@ -371,8 +377,22 @@ impl DavNode for EventNode { }).collect() } - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> { - todo!() + fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { + async { + let existing_etag = self.etag().await?; + match policy { + PutPolicy::CreateOnly => bail!("Already existing"), + PutPolicy::ReplaceEtag(etag) if etag != existing_etag.as_str() => bail!("Would overwrite something we don't know"), + _ => () + }; + + //@FIXME for now, our storage interface does not allow for streaming + let mut evt = Vec::new(); + let mut reader = stream.into_async_read(); + reader.read_to_end(&mut evt).await.unwrap(); + let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await?; + Ok(entry.2) + }.boxed() } } @@ -408,8 +428,16 @@ impl DavNode for CreateEventNode { vec![] } - fn put(&self, policy: PutPolicy, stream: Content) -> BoxFuture<Result<()>> { - //@TODO write file - todo!() + fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { + //@NOTE: policy might not be needed here: whatever we put, there is no known entries here + + async { + //@FIXME for now, our storage interface does not allow for streaming + let mut evt = Vec::new(); + let mut reader = stream.into_async_read(); + reader.read_to_end(&mut evt).await.unwrap(); + let (_token, entry) = self.col.put(self.filename.as_str(), evt.as_ref()).await?; + Ok(entry.2) + }.boxed() } } diff --git a/aero-user/src/storage/garage.rs b/aero-user/src/storage/garage.rs index 7e930c3..1164839 100644 --- a/aero-user/src/storage/garage.rs +++ b/aero-user/src/storage/garage.rs @@ -426,15 +426,16 @@ impl IStore for GarageStore { tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0); Ok(bv) } - async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + async fn blob_insert(&self, blob_val: BlobVal) -> Result<String, StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + let obj_key = blob_val.blob_ref.0; let maybe_send = self .s3 .put_object() .bucket(self.bucket.to_string()) - .key(blob_val.blob_ref.0.to_string()) + .key(obj_key.to_string()) .set_metadata(Some(blob_val.meta)) .body(streamable_value) .send() @@ -445,9 +446,12 @@ impl IStore for GarageStore { tracing::error!("unable to send object: {}", e); Err(StorageError::Internal) } - Ok(_) => { - tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0); - Ok(()) + Ok(put_output) => { + tracing::debug!("Inserted {}/{}", self.bucket, obj_key); + Ok(put_output + .e_tag() + .map(|v| format!("\"{}\"", v)) + .unwrap_or(format!("W/\"{}\"", obj_key))) } } } diff --git a/aero-user/src/storage/in_memory.rs b/aero-user/src/storage/in_memory.rs index a676797..9ef2721 100644 --- a/aero-user/src/storage/in_memory.rs +++ b/aero-user/src/storage/in_memory.rs @@ -1,9 +1,12 @@ -use crate::storage::*; use std::collections::BTreeMap; use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::sync::RwLock; + +use sodiumoxide::{hex, crypto::hash}; use tokio::sync::Notify; +use crate::storage::*; + /// This implementation is very inneficient, and not completely correct /// Indeed, when the connector is dropped, the memory is freed. /// It means that when a user disconnects, its data are lost. @@ -80,6 +83,12 @@ impl InternalBlobVal { value: self.data.clone(), } } + fn etag(&self) -> String { + let digest = hash::hash(self.data.as_ref()); + let buff = digest.as_ref(); + let hexstr = hex::encode(buff); + format!("\"{}\"", hexstr) + } } type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>; @@ -300,13 +309,14 @@ impl IStore for MemStore { .ok_or(StorageError::NotFound) .map(|v| v.to_blob_val(blob_ref)) } - async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + async fn blob_insert(&self, blob_val: BlobVal) -> Result<String, StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let mut store = self.blob.write().or(Err(StorageError::Internal))?; let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); entry.data = blob_val.value.clone(); entry.metadata = blob_val.meta.clone(); - Ok(()) + + Ok(entry.etag()) } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { tracing::trace!(src=%src, dst=%dst, command="blob_copy"); diff --git a/aero-user/src/storage/mod.rs b/aero-user/src/storage/mod.rs index f5eb8d3..527765f 100644 --- a/aero-user/src/storage/mod.rs +++ b/aero-user/src/storage/mod.rs @@ -159,7 +159,7 @@ pub trait IStore { async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>; async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>; - async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>; + async fn blob_insert(&self, blob_val: BlobVal) -> Result<String, StorageError>; async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>; async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; |