From a2f5b451bd32780d60be69c6412cb351a54b765b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 28 May 2024 17:21:30 +0200 Subject: initial implementation of sync-collection --- aero-proto/src/dav/controller.rs | 56 ++++++++++++++++++++--- aero-proto/src/dav/node.rs | 12 +++-- aero-proto/src/dav/resource.rs | 96 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 151 insertions(+), 13 deletions(-) (limited to 'aero-proto/src') diff --git a/aero-proto/src/dav/controller.rs b/aero-proto/src/dav/controller.rs index 4bae68a..7e1f416 100644 --- a/aero-proto/src/dav/controller.rs +++ b/aero-proto/src/dav/controller.rs @@ -7,9 +7,10 @@ use hyper::body::Frame; use hyper::body::Incoming; use hyper::{body::Bytes, Request, Response}; -use aero_collections::user::User; +use aero_collections::{davdag::Token, user::User}; use aero_dav::caltypes as cal; use aero_dav::realization::{self, All}; +use aero_dav::synctypes as sync; use aero_dav::types as dav; use aero_dav::versioningtypes as vers; use aero_ical::query::is_component_match; @@ -17,7 +18,7 @@ use aero_ical::query::is_component_match; use crate::dav::codec; use crate::dav::codec::{depth, deserialize, serialize, text_body}; use crate::dav::node::DavNode; -use crate::dav::resource::RootNode; +use crate::dav::resource::{RootNode, BASE_TOKEN_URI}; pub(super) type ArcUser = std::sync::Arc; pub(super) type HttpResponse = Response>; @@ -109,6 +110,7 @@ impl Controller { // Internal representation that will handle processed request let (mut ok_node, mut not_found) = (Vec::new(), Vec::new()); let calprop: Option>; + let extension: Option; // Extracting request information match cal_report { @@ -136,15 +138,54 @@ impl Controller { }; } calprop = m.selector; + extension = None; } vers::Report::Extension(realization::ReportType::Cal(cal::ReportType::Query(q))) => { calprop = q.selector; + extension = None; ok_node = apply_filter(self.node.children(&self.user).await, &q.filter) .try_collect() .await?; } - vers::Report::Extension(realization::ReportType::Sync(_sync_col)) => { - todo!() + vers::Report::Extension(realization::ReportType::Sync(sync_col)) => { + calprop = Some(cal::CalendarSelector::Prop(sync_col.prop)); + + if sync_col.limit.is_some() { + tracing::warn!("limit is not supported, ignoring"); + } + if matches!(sync_col.sync_level, sync::SyncLevel::Infinite) { + tracing::debug!("aerogramme calendar collections are not nested"); + } + + let token = match sync_col.sync_token { + sync::SyncTokenRequest::InitialSync => None, + sync::SyncTokenRequest::IncrementalSync(token_raw) => { + // parse token + if token_raw.len() != BASE_TOKEN_URI.len() + 48 { + anyhow::bail!("invalid token length") + } + let token = token_raw[BASE_TOKEN_URI.len()..] + .parse() + .or(Err(anyhow::anyhow!("can't parse token")))?; + Some(token) + } + }; + // do the diff + let new_token: Token; + (new_token, ok_node, not_found) = match self.node.diff(token).await { + Ok(t) => t, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => return Ok(Response::builder() + .status(410) + .body(text_body("Diff failed, token might be expired"))?), + _ => return Ok(Response::builder() + .status(500) + .body(text_body("Server error, maybe this operation is not supported on this collection"))?), + }, + }; + extension = Some(realization::Multistatus::Sync(sync::Multistatus { + sync_token: sync::SyncToken(new_token.to_string()), + })); } _ => { return Ok(Response::builder() @@ -162,7 +203,7 @@ impl Controller { serialize( status, - Self::multistatus(&self.user, ok_node, not_found, props).await, + Self::multistatus(&self.user, ok_node, not_found, props, extension).await, ) } @@ -208,7 +249,7 @@ impl Controller { let not_found = vec![]; serialize( status, - Self::multistatus(&self.user, nodes, not_found, propname).await, + Self::multistatus(&self.user, nodes, not_found, propname, None).await, ) } @@ -277,6 +318,7 @@ impl Controller { nodes: Vec>, not_found: Vec, props: Option>, + extension: Option, ) -> dav::Multistatus { // Collect properties on existing objects let mut responses: Vec> = match props { @@ -309,7 +351,7 @@ impl Controller { dav::Multistatus:: { responses, responsedescription: None, - extension: None, + extension, } } } diff --git a/aero-proto/src/dav/node.rs b/aero-proto/src/dav/node.rs index 877342a..0a83f8c 100644 --- a/aero-proto/src/dav/node.rs +++ b/aero-proto/src/dav/node.rs @@ -3,7 +3,7 @@ use futures::future::{BoxFuture, FutureExt}; use futures::stream::{BoxStream, StreamExt}; use hyper::body::Bytes; -use aero_collections::davdag::Etag; +use aero_collections::davdag::{Etag, Token}; use aero_dav::realization::All; use aero_dav::types as dav; @@ -55,8 +55,14 @@ pub(crate) trait DavNode: Send { fn content<'a>(&self) -> Content<'a>; /// Delete fn delete(&self) -> BoxFuture>; - - //@FIXME maybe add etag, maybe add a way to set content + /// Sync + fn diff<'a>( + &self, + sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + >; /// Utility function to get a propname response from a node fn response_propname(&self, user: &ArcUser) -> dav::Response { diff --git a/aero-proto/src/dav/resource.rs b/aero-proto/src/dav/resource.rs index 1ae766c..297a1c1 100644 --- a/aero-proto/src/dav/resource.rs +++ b/aero-proto/src/dav/resource.rs @@ -8,7 +8,7 @@ use futures::{future::BoxFuture, future::FutureExt}; use aero_collections::{ calendar::Calendar, - davdag::{BlobId, Etag}, + davdag::{BlobId, Etag, SyncChange, Token}, user::User, }; use aero_dav::acltypes as acl; @@ -21,6 +21,8 @@ use aero_dav::versioningtypes as vers; use super::node::PropertyStream; use crate::dav::node::{Content, DavNode, PutPolicy}; +pub const BASE_TOKEN_URI: &str = "https://aerogramme.0/sync/"; + #[derive(Clone)] pub(crate) struct RootNode {} impl DavNode for RootNode { @@ -117,6 +119,16 @@ impl DavNode for RootNode { fn delete(&self) -> BoxFuture> { async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() } + + fn diff<'a>( + &self, + _sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } } #[derive(Clone)] @@ -229,6 +241,15 @@ impl DavNode for HomeNode { fn delete(&self) -> BoxFuture> { async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() } + fn diff<'a>( + &self, + _sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } } #[derive(Clone)] @@ -353,6 +374,15 @@ impl DavNode for CalendarListNode { fn delete(&self) -> BoxFuture> { async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() } + fn diff<'a>( + &self, + _sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } } #[derive(Clone)] @@ -480,8 +510,8 @@ impl DavNode for CalendarNode { )) => match col.token().await { Ok(token) => dav::Property::Extension(all::Property::Sync( sync::Property::SyncToken(sync::SyncToken(format!( - "https://aerogramme.0/sync/{}", - token + "{}{}", + BASE_TOKEN_URI, token ))), )), _ => return Err(n.clone()), @@ -535,6 +565,48 @@ impl DavNode for CalendarNode { fn delete(&self) -> BoxFuture> { async { Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied)) }.boxed() } + fn diff<'a>( + &self, + sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + > { + let col = self.col.clone(); + let calname = self.calname.clone(); + async move { + let sync_token = sync_token.unwrap(); + let (new_token, listed_changes) = match col.diff(sync_token).await { + Ok(v) => v, + Err(e) => { + tracing::info!(err=?e, "token resolution failed, maybe a forgotten token"); + return Err(std::io::Error::from(std::io::ErrorKind::NotFound)); + } + }; + + let mut ok_nodes: Vec> = vec![]; + let mut rm_nodes: Vec = vec![]; + for change in listed_changes.into_iter() { + match change { + SyncChange::Ok((filename, blob_id)) => { + let child = Box::new(EventNode { + col: col.clone(), + calname: calname.clone(), + filename, + blob_id, + }); + ok_nodes.push(child); + } + SyncChange::NotFound(filename) => { + rm_nodes.push(dav::Href(filename)); + } + } + } + + Ok((new_token, ok_nodes, rm_nodes)) + } + .boxed() + } } #[derive(Clone)] @@ -757,6 +829,15 @@ impl DavNode for EventNode { } .boxed() } + fn diff<'a>( + &self, + _sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } } #[derive(Clone)] @@ -849,4 +930,13 @@ impl DavNode for CreateEventNode { // Nothing to delete async { Ok(()) }.boxed() } + fn diff<'a>( + &self, + _sync_token: Option, + ) -> BoxFuture< + 'a, + std::result::Result<(Token, Vec>, Vec), std::io::Error>, + > { + async { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) }.boxed() + } } -- cgit v1.2.3