diff options
Diffstat (limited to 'aero-collections/src/davdag.rs')
-rw-r--r-- | aero-collections/src/davdag.rs | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs new file mode 100644 index 0000000..74e745f --- /dev/null +++ b/aero-collections/src/davdag.rs @@ -0,0 +1,342 @@ +use anyhow::{bail, Result}; +use im::{ordset, OrdMap, OrdSet}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use aero_bayou::*; + +use crate::unique_ident::{gen_ident, UniqueIdent}; + +/// Parents are only persisted in the event log, +/// not in the checkpoints. +pub type Token = UniqueIdent; +pub type Parents = Vec<Token>; +pub type SyncDesc = (Parents, Token); + +pub type BlobId = UniqueIdent; +pub type Etag = String; +pub type FileName = String; +pub type IndexEntry = (BlobId, FileName, Etag); + +#[derive(Clone, Default)] +pub struct DavDag { + /// Source of trust + pub table: OrdMap<BlobId, IndexEntry>, + + /// Indexes optimized for queries + pub idx_by_filename: OrdMap<FileName, BlobId>, + + // ------------ Below this line, data is ephemeral, ie. not checkpointed + /// Partial synchronization graph + pub ancestors: OrdMap<Token, OrdSet<Token>>, + + /// All nodes + pub all_nodes: OrdSet<Token>, + /// Head nodes + pub heads: OrdSet<Token>, + /// Origin nodes + pub origins: OrdSet<Token>, + + /// File change token by token + pub change: OrdMap<Token, SyncChange>, +} + +#[derive(Clone, Debug)] +pub enum SyncChange { + Ok((FileName, BlobId)), + NotFound(FileName), +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum DavDagOp { + /// Merge is a virtual operation run when multiple heads are discovered + Merge(SyncDesc), + + /// Add an item to the collection + Put(SyncDesc, IndexEntry), + + /// Delete an item from the collection + Delete(SyncDesc, BlobId), +} +impl DavDagOp { + pub fn token(&self) -> Token { + match self { + Self::Merge((_, t)) => *t, + Self::Put((_, t), _) => *t, + Self::Delete((_, t), _) => *t, + } + } +} + +impl DavDag { + pub fn op_merge(&self) -> DavDagOp { + DavDagOp::Merge(self.sync_desc()) + } + + pub fn op_put(&self, entry: IndexEntry) -> DavDagOp { + DavDagOp::Put(self.sync_desc(), entry) + } + + pub fn op_delete(&self, blob_id: BlobId) -> DavDagOp { + DavDagOp::Delete(self.sync_desc(), blob_id) + } + + // HELPER functions + + pub fn heads_vec(&self) -> Vec<Token> { + self.heads.clone().into_iter().collect() + } + + /// A sync descriptor + pub fn sync_desc(&self) -> SyncDesc { + (self.heads_vec(), gen_ident()) + } + + /// Resolve a sync token + pub fn resolve(&self, known: Token) -> Result<OrdSet<Token>> { + let already_known = self.all_ancestors(known); + + // We can't capture all missing events if we are not connected + // to all sinks of the graph, + // ie. if we don't already know all the sinks, + // ie. if we are missing so much history that + // the event log has been transformed into a checkpoint + if !self.origins.is_subset(already_known.clone()) { + bail!("Not enough history to produce a correct diff, a full resync is needed"); + } + + // Missing items are *all existing graph items* from which + // we removed *all items known by the given node*. + // In other words, all values in `all_nodes` that are not in `already_known`. + Ok(self.all_nodes.clone().relative_complement(already_known)) + } + + /// Find all ancestors of a given node + fn all_ancestors(&self, known: Token) -> OrdSet<Token> { + let mut all_known: OrdSet<UniqueIdent> = OrdSet::new(); + let mut to_collect = vec![known]; + loop { + let cursor = match to_collect.pop() { + // Loop stops here + None => break, + Some(v) => v, + }; + + if all_known.insert(cursor).is_some() { + // Item already processed + continue; + } + + // Collect parents + let parents = match self.ancestors.get(&cursor) { + None => continue, + Some(c) => c, + }; + to_collect.extend(parents.iter()); + } + all_known + } + + // INTERNAL functions + + /// Register a WebDAV item (put, copy, move) + fn register(&mut self, sync_token: Option<Token>, entry: IndexEntry) { + let (blob_id, filename, _etag) = entry.clone(); + + // Insert item in the source of trust + self.table.insert(blob_id, entry); + + // Update the cache + self.idx_by_filename.insert(filename.to_string(), blob_id); + + // Record the change in the ephemeral synchronization map + if let Some(sync_token) = sync_token { + self.change + .insert(sync_token, SyncChange::Ok((filename, blob_id))); + } + } + + /// Unregister a WebDAV item (delete, move) + fn unregister(&mut self, sync_token: Token, blob_id: &BlobId) { + // Query the source of truth to get the information we + // need to clean the indexes + let (_blob_id, filename, _etag) = match self.table.get(blob_id) { + Some(v) => v, + // Element does not exist, return early + None => return, + }; + self.idx_by_filename.remove(filename); + + // Record the change in the ephemeral synchronization map + self.change + .insert(sync_token, SyncChange::NotFound(filename.to_string())); + + // Finally clear item from the source of trust + self.table.remove(blob_id); + } + + /// When an event is processed, update the synchronization DAG + fn sync_dag(&mut self, sync_desc: &SyncDesc) { + let (parents, child) = sync_desc; + + // --- Update ANCESTORS + // We register ancestors as it is required for the sync algorithm + self.ancestors.insert( + *child, + parents.iter().fold(ordset![], |mut acc, p| { + acc.insert(*p); + acc + }), + ); + + // --- Update ORIGINS + // If this event has no parents, it's an origin + if parents.is_empty() { + self.origins.insert(*child); + } + + // --- Update HEADS + // Remove from HEADS this event's parents + parents.iter().for_each(|par| { + self.heads.remove(par); + }); + + // This event becomes a new HEAD in turn + self.heads.insert(*child); + + // --- Update ALL NODES + self.all_nodes.insert(*child); + } +} + +impl std::fmt::Debug for DavDag { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DavDag\n")?; + for elem in self.table.iter() { + f.write_fmt(format_args!("\t{:?} => {:?}", elem.0, elem.1))?; + } + Ok(()) + } +} + +impl BayouState for DavDag { + type Op = DavDagOp; + + fn apply(&self, op: &Self::Op) -> Self { + let mut new = self.clone(); + + match op { + DavDagOp::Put(sync_desc, entry) => { + new.sync_dag(sync_desc); + new.register(Some(sync_desc.1), entry.clone()); + } + DavDagOp::Delete(sync_desc, blob_id) => { + new.sync_dag(sync_desc); + new.unregister(sync_desc.1, blob_id); + } + DavDagOp::Merge(sync_desc) => { + new.sync_dag(sync_desc); + } + } + + new + } +} + +// CUSTOM SERIALIZATION & DESERIALIZATION +#[derive(Serialize, Deserialize)] +struct DavDagSerializedRepr { + items: Vec<IndexEntry>, + heads: Vec<UniqueIdent>, +} + +impl<'de> Deserialize<'de> for DavDag { + fn deserialize<D>(d: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let val: DavDagSerializedRepr = DavDagSerializedRepr::deserialize(d)?; + let mut davdag = DavDag::default(); + + // Build the table + index + val.items + .into_iter() + .for_each(|entry| davdag.register(None, entry)); + + // Initialize the synchronization DAG with its roots + val.heads.into_iter().for_each(|ident| { + davdag.heads.insert(ident); + davdag.origins.insert(ident); + davdag.all_nodes.insert(ident); + }); + + Ok(davdag) + } +} + +impl Serialize for DavDag { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + // Indexes are rebuilt on the fly, we serialize only the core database + let items = self.table.iter().map(|(_, entry)| entry.clone()).collect(); + + // We keep only the head entries from the sync graph, + // these entries will be used to initialize it back when deserializing + let heads = self.heads_vec(); + + // Finale serialization object + let val = DavDagSerializedRepr { items, heads }; + val.serialize(serializer) + } +} + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn base() { + let mut state = DavDag::default(); + + // Add item 1 + { + let m = UniqueIdent([0x01; 24]); + let ev = state.op_put((m, "cal.ics".into(), "321-321".into())); + state = state.apply(&ev); + + assert_eq!(state.table.len(), 1); + assert_eq!(state.resolve(ev.token()).unwrap().len(), 0); + } + + // Add 2 concurrent items + let (t1, t2) = { + let blob1 = UniqueIdent([0x02; 24]); + let ev1 = state.op_put((blob1, "cal2.ics".into(), "321-321".into())); + + let blob2 = UniqueIdent([0x01; 24]); + let ev2 = state.op_delete(blob2); + + state = state.apply(&ev1); + state = state.apply(&ev2); + + assert_eq!(state.table.len(), 1); + assert_eq!(state.resolve(ev1.token()).unwrap(), ordset![ev2.token()]); + + (ev1.token(), ev2.token()) + }; + + // Add later a new item + { + let blob3 = UniqueIdent([0x03; 24]); + let ev = state.op_put((blob3, "cal3.ics".into(), "321-321".into())); + + state = state.apply(&ev); + assert_eq!(state.table.len(), 2); + assert_eq!(state.resolve(ev.token()).unwrap().len(), 0); + assert_eq!(state.resolve(t1).unwrap(), ordset![t2, ev.token()]); + } + } +} |