From f179479c308876f2f41de695cc0375d7fd20b233 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 4 Apr 2024 11:28:15 +0200 Subject: WIP implem storage --- aero-collections/src/calendar/mod.rs | 79 ++++++++++++++++++++++++++++++++++-- aero-collections/src/davdag.rs | 61 +++++++++++++++++----------- 2 files changed, 113 insertions(+), 27 deletions(-) (limited to 'aero-collections') diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 6537a4e..936f8c3 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -9,7 +9,7 @@ use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::unique_ident::*; -use crate::davdag::DavDag; +use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange}; pub struct Calendar { pub(super) id: UniqueIdent, @@ -20,8 +20,49 @@ impl Calendar { pub(crate) async fn open( creds: &Credentials, id: UniqueIdent, - ) -> Result { - todo!(); + ) -> Result { + let bayou_path = format!("calendar/dag/{}", id); + let cal_path = format!("calendar/events/{}", id); + + let mut davdag = Bayou::::new(creds, bayou_path).await?; + davdag.sync().await?; + + let internal = RwLock::new(CalendarInternal { + id, + encryption_key: creds.keys.master.clone(), + storage: creds.storage.build().await?, + davdag, + cal_path, + }); + + Ok(Self { id, internal }) + } + + /// Sync data with backing store + pub async fn force_sync(&self) -> Result<()> { + self.internal.write().await.force_sync().await + } + + /// Sync data with backing store only if changes are detected + /// or last sync is too old + pub async fn opportunistic_sync(&self) -> Result<()> { + self.internal.write().await.opportunistic_sync().await + } + + pub async fn get(&self, blob_id: UniqueIdent, message_key: &Key) -> Result> { + self.internal.read().await.get(blob_id, message_key).await + } + + pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { + self.internal.read().await.diff(sync_token).await + } + + pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result { + self.internal.write().await.put(entry, evt).await + } + + pub async fn delete(&self, blob_id: UniqueIdent) -> Result { + self.internal.write().await.delete(blob_id).await } } @@ -30,5 +71,35 @@ struct CalendarInternal { cal_path: String, encryption_key: Key, storage: Store, - uid_index: Bayou, + davdag: Bayou, +} + +impl CalendarInternal { + async fn force_sync(&mut self) -> Result<()> { + self.davdag.sync().await?; + Ok(()) + } + + async fn opportunistic_sync(&mut self) -> Result<()> { + self.davdag.opportunistic_sync().await?; + Ok(()) + } + + async fn get(&self, blob_id: BlobId, message_key: &Key) -> Result> { + todo!() + } + + async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result { + //@TODO write event to S3 + //@TODO add entry into Bayou + todo!(); + } + + async fn delete(&mut self, blob_id: BlobId) -> Result { + todo!(); + } + + async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { + todo!(); + } } diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 63a76a8..f668831 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -20,20 +20,31 @@ pub type IndexEntry = (BlobId, FileName, Etag); #[derive(Clone, Default)] pub struct DavDag { /// Source of trust - pub table: OrdMap, + pub table: OrdMap, /// Indexes optimized for queries - pub idx_by_filename: OrdMap, + pub idx_by_filename: OrdMap, + + // ------------ Below this line, data is ephemeral, ie. not checkpointed /// Partial synchronization graph - pub ancestors: OrdMap>, + pub ancestors: OrdMap>, /// All nodes - pub all_nodes: OrdSet, + pub all_nodes: OrdSet, /// Head nodes - pub heads: OrdSet, + pub heads: OrdSet, /// Origin nodes - pub origins: OrdSet, + pub origins: OrdSet, + + /// File change token by token + pub change: OrdMap, +} + +#[derive(Clone, Debug)] +pub enum SyncChange { + Ok(FileName), + NotFound(FileName), } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -66,8 +77,8 @@ impl DavDag { DavDagOp::Put(self.sync_desc(), entry) } - pub fn op_delete(&self, ident: BlobId) -> DavDagOp { - DavDagOp::Delete(self.sync_desc(), ident) + pub fn op_delete(&self, blob_id: BlobId) -> DavDagOp { + DavDagOp::Delete(self.sync_desc(), blob_id) } // HELPER functions @@ -129,33 +140,41 @@ impl DavDag { // INTERNAL functions /// Register a WebDAV item (put, copy, move) - fn register(&mut self, entry: IndexEntry) { + fn register(&mut self, sync_token: Option, entry: IndexEntry) { let (blob_id, filename, _etag) = entry.clone(); // Insert item in the source of trust self.table.insert(blob_id, entry); // Update the cache - self.idx_by_filename.insert(filename, blob_id); + self.idx_by_filename.insert(filename.to_string(), blob_id); + + // Record the change in the ephemeral synchronization map + if let Some(sync_token) = sync_token { + self.change.insert(sync_token, SyncChange::Ok(filename)); + } } /// Unregister a WebDAV item (delete, move) - fn unregister(&mut self, ident: &UniqueIdent) { + fn unregister(&mut self, sync_token: Token, blob_id: &BlobId) { // Query the source of truth to get the information we // need to clean the indexes - let (_blob_id, filename, _etag) = match self.table.get(ident) { + let (_blob_id, filename, _etag) = match self.table.get(blob_id) { Some(v) => v, // Element does not exist, return early None => return, }; self.idx_by_filename.remove(filename); + // Record the change in the ephemeral synchronization map + self.change.insert(sync_token, SyncChange::NotFound(filename.to_string())); + // Finally clear item from the source of trust - self.table.remove(ident); + self.table.remove(blob_id); } /// When an event is processed, update the synchronization DAG - fn sync_dag(&mut self, sync_desc: &SyncDesc) -> bool { + fn sync_dag(&mut self, sync_desc: &SyncDesc) { let (parents, child) = sync_desc; // --- Update ANCESTORS @@ -180,8 +199,6 @@ impl DavDag { // --- Update ALL NODES self.all_nodes.insert(*child); - - true } } @@ -193,14 +210,12 @@ impl BayouState for DavDag { match op { DavDagOp::Put(sync_desc, entry) => { - if new.sync_dag(sync_desc) { - new.register(entry.clone()); - } + new.sync_dag(sync_desc); + new.register(Some(sync_desc.1), entry.clone()); }, DavDagOp::Delete(sync_desc, blob_id) => { - if new.sync_dag(sync_desc) { - new.unregister(blob_id); - } + new.sync_dag(sync_desc); + new.unregister(sync_desc.1, blob_id); }, DavDagOp::Merge(sync_desc) => { new.sync_dag(sync_desc); @@ -227,7 +242,7 @@ impl<'de> Deserialize<'de> for DavDag { let mut davdag = DavDag::default(); // Build the table + index - val.items.into_iter().for_each(|entry| davdag.register(entry)); + val.items.into_iter().for_each(|entry| davdag.register(None, entry)); // Initialize the synchronization DAG with its roots val.heads.into_iter().for_each(|ident| { -- cgit v1.2.3