diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-03-27 10:33:46 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-03-27 10:33:46 +0100 |
commit | 0b57200eeb6780e843c5f798bdc53781eb83d51f (patch) | |
tree | 9db7c91d04d543b8f2683cf53615b1a026bb8a0b | |
parent | bc0f897803cbb9b7537010e9d4714a2a0b2a6872 (diff) | |
download | aerogramme-0b57200eeb6780e843c5f798bdc53781eb83d51f.tar.gz aerogramme-0b57200eeb6780e843c5f798bdc53781eb83d51f.zip |
Dav DAG wip
-rw-r--r-- | aero-collections/src/calendar/mod.rs | 16 | ||||
-rw-r--r-- | aero-collections/src/davdag.rs | 185 | ||||
-rw-r--r-- | aero-collections/src/lib.rs | 1 |
3 files changed, 201 insertions, 1 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index d2217b8..6537a4e 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -1,13 +1,19 @@ pub mod namespace; use anyhow::Result; +use tokio::sync::RwLock; +use aero_bayou::Bayou; use aero_user::login::Credentials; +use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; +use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; use crate::unique_ident::*; +use crate::davdag::DavDag; pub struct Calendar { - a: u64, + pub(super) id: UniqueIdent, + internal: RwLock<CalendarInternal>, } impl Calendar { @@ -18,3 +24,11 @@ impl Calendar { todo!(); } } + +struct CalendarInternal { + id: UniqueIdent, + cal_path: String, + encryption_key: Key, + storage: Store, + uid_index: Bayou<DavDag>, +} diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs new file mode 100644 index 0000000..696b985 --- /dev/null +++ b/aero-collections/src/davdag.rs @@ -0,0 +1,185 @@ +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use im::{OrdMap, OrdSet, ordset}; + +use aero_bayou::*; + +use crate::unique_ident::UniqueIdent; + +/// Parents are only persisted in the event log, +/// not in the checkpoints. +pub type Parents = Vec<UniqueIdent>; +pub type Etag = String; +pub type FileName = String; +pub type IndexEntry = (FileName, Etag); + +#[derive(Clone, Default)] +pub struct DavDag { + /// Source of trust + pub table: OrdMap<UniqueIdent, IndexEntry>, + + /// Indexes optimized for queries + pub idx_by_filename: OrdMap<FileName, UniqueIdent>, + + /// Partial synchronization graph + /// parent -> direct children + pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>, + + /// Head nodes + pub heads: OrdSet<UniqueIdent>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum DavDagOp { + /// Merge is a virtual operation run when multiple heads are discovered + Merge(Parents, UniqueIdent), + + /// Add an item to the collection + Put(Parents, UniqueIdent, IndexEntry), + + /// Delete an item from the collection + Delete(Parents, UniqueIdent), +} + +impl DavDag { + pub fn op_merge(&self, ident: UniqueIdent) -> DavDagOp { + DavDagOp::Merge(self.heads_vec(), ident) + } + + pub fn op_put(&self, ident: UniqueIdent, entry: IndexEntry) -> DavDagOp { + DavDagOp::Put(self.heads_vec(), ident, entry) + } + + pub fn op_delete(&self, ident: UniqueIdent) -> DavDagOp { + DavDagOp::Delete(self.heads_vec(), ident) + } + + // HELPER functions + pub fn heads_vec(&self) -> Vec<UniqueIdent> { + self.heads.clone().into_iter().collect() + } + + // INTERNAL functions + fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { + // Insert item in the source of trust + self.table.insert(ident, entry.clone()); + + // Update the cache + let (filename, _etag) = entry; + self.idx_by_filename.insert(filename, ident); + } + + fn unregister(&mut self, ident: &UniqueIdent) { + // Query the source of truth to get the information we + // need to clean the indexes + let (filename, _etag) = match self.table.get(ident) { + Some(v) => v, + None => return, + }; + self.idx_by_filename.remove(filename); + + // Finally clear item from the source of trust + self.table.remove(ident); + } + + // @FIXME: maybe in case of error we could simply disable the sync graph + // and ask the client to rely on manual sync. For now, we are skipping the event + // which is midly satisfying. + fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { + // All parents must exist in successors otherwise we can't accept item + // do the check + update successors + let mut try_successors = self.successors.clone(); + for par in parents.iter() { + match try_successors.get_mut(par) { + None => { + tracing::warn!("Unable to push a Dav DAG sync op into the graph, an event is missing, it's a bug"); + return false + }, + Some(v) => v.insert(*child), + }; + } + self.successors = try_successors; + + // Remove from HEADS this event's parents + parents.iter().for_each(|par| { self.heads.remove(par); }); + + // This event becomes a new HEAD in turn + self.heads.insert(*child); + + // This event is also a future successor + self.successors.insert(*child, ordset![]); + + true + } +} + +impl BayouState for DavDag { + type Op = DavDagOp; + + fn apply(&self, op: &Self::Op) -> Self { + let mut new = self.clone(); + + match op { + DavDagOp::Put(parents, ident, entry) => { + if new.sync_dag(ident, parents.as_slice()) { + new.register(*ident, entry.clone()); + } + }, + DavDagOp::Delete(parents, ident) => { + if new.sync_dag(ident, parents.as_slice()) { + new.unregister(ident); + } + }, + DavDagOp::Merge(parents, ident) => { + new.sync_dag(ident, parents.as_slice()); + } + } + + new + } +} + +// CUSTOM SERIALIZATION & DESERIALIZATION +#[derive(Serialize, Deserialize)] +struct DavDagSerializedRepr { + items: Vec<(UniqueIdent, IndexEntry)>, + heads: Vec<UniqueIdent>, +} + +impl<'de> Deserialize<'de> for DavDag { + fn deserialize<D>(d: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let val: DavDagSerializedRepr = DavDagSerializedRepr::deserialize(d)?; + let mut davdag = DavDag::default(); + + // Build the table + index + val.items.into_iter().for_each(|(ident, entry)| davdag.register(ident, entry)); + + // Initialize the synchronization DAG with its roots + val.heads.into_iter().for_each(|ident| { + davdag.successors.insert(ident, ordset![]); + davdag.heads.insert(ident); + }); + + Ok(davdag) + } +} + +impl Serialize for DavDag { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + // Indexes are rebuilt on the fly, we serialize only the core database + let items = self.table.iter().map(|(ident, entry)| (*ident, entry.clone())).collect(); + + // We keep only the head entries from the sync graph, + // these entries will be used to initialize it back when deserializing + let heads = self.heads_vec(); + + // Finale serialization object + let val = DavDagSerializedRepr { items, heads }; + val.serialize(serializer) + } +} diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs index 269cd13..ef8b8d8 100644 --- a/aero-collections/src/lib.rs +++ b/aero-collections/src/lib.rs @@ -1,4 +1,5 @@ pub mod unique_ident; +pub mod davdag; pub mod user; pub mod mail; pub mod calendar; |