diff options
Diffstat (limited to 'aero-collections')
-rw-r--r-- | aero-collections/src/davdag.rs | 174 |
1 files changed, 113 insertions, 61 deletions
diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 59dcc7b..63a76a8 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -4,14 +4,18 @@ use im::{OrdMap, OrdSet, ordset}; use aero_bayou::*; -use crate::unique_ident::UniqueIdent; +use crate::unique_ident::{gen_ident, UniqueIdent}; /// Parents are only persisted in the event log, /// not in the checkpoints. -pub type Parents = Vec<UniqueIdent>; +pub type Token = UniqueIdent; +pub type Parents = Vec<Token>; +pub type SyncDesc = (Parents, Token); + +pub type BlobId = UniqueIdent; pub type Etag = String; pub type FileName = String; -pub type IndexEntry = (FileName, Etag); +pub type IndexEntry = (BlobId, FileName, Etag); #[derive(Clone, Default)] pub struct DavDag { @@ -22,8 +26,6 @@ pub struct DavDag { pub idx_by_filename: OrdMap<FileName, UniqueIdent>, /// Partial synchronization graph - /// parent -> direct children - pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>, pub ancestors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>, /// All nodes @@ -37,33 +39,46 @@ pub struct DavDag { #[derive(Clone, Serialize, Deserialize, Debug)] pub enum DavDagOp { /// Merge is a virtual operation run when multiple heads are discovered - Merge(Parents, UniqueIdent), + Merge(SyncDesc), /// Add an item to the collection - Put(Parents, UniqueIdent, IndexEntry), + Put(SyncDesc, IndexEntry), /// Delete an item from the collection - Delete(Parents, UniqueIdent), + Delete(SyncDesc, BlobId), +} +impl DavDagOp { + pub fn token(&self) -> Token { + match self { + Self::Merge((_, t)) => *t, + Self::Put((_, t), _) => *t, + Self::Delete((_, t), _) => *t, + } + } } impl DavDag { - pub fn op_merge(&self, ident: UniqueIdent) -> DavDagOp { - DavDagOp::Merge(self.heads_vec(), ident) + pub fn op_merge(&self) -> DavDagOp { + DavDagOp::Merge(self.sync_desc()) } - pub fn op_put(&self, ident: UniqueIdent, entry: IndexEntry) -> DavDagOp { - DavDagOp::Put(self.heads_vec(), ident, entry) + pub fn op_put(&self, entry: IndexEntry) -> DavDagOp { + DavDagOp::Put(self.sync_desc(), entry) } - pub fn op_delete(&self, ident: UniqueIdent) -> DavDagOp { - DavDagOp::Delete(self.heads_vec(), ident) + pub fn op_delete(&self, ident: BlobId) -> DavDagOp { + DavDagOp::Delete(self.sync_desc(), ident) } // HELPER functions - /// All HEAD events - pub fn heads_vec(&self) -> Vec<UniqueIdent> { - self.heads.clone().into_iter().collect() + pub fn heads_vec(&self) -> Vec<Token> { + self.heads.clone().into_iter().collect() + } + + /// A sync descriptor + pub fn sync_desc(&self) -> SyncDesc { + (self.heads_vec(), gen_ident()) } /// Resolve a sync token @@ -71,18 +86,21 @@ impl DavDag { let already_known = self.all_ancestors(known); // We can't capture all missing events if we are not connected - // to all sinks of the graph, ie. if we don't already know all the sinks. + // to all sinks of the graph, + // ie. if we don't already know all the sinks, + // ie. if we are missing so much history that + // the event log has been transformed into a checkpoint if !self.origins.is_subset(already_known.clone()) { bail!("Not enough history to produce a correct diff, a full resync is needed"); } - // Missing items are all existing graph items from which - // we removed all items known by the given node. - // In other words, all values in all_nodes that are not in already_known. + // Missing items are *all existing graph items* from which + // we removed *all items known by the given node*. + // In other words, all values in `all_nodes` that are not in `already_known`. Ok(self.all_nodes.clone().relative_complement(already_known)) } - /// Find all ancestors of a given + /// Find all ancestors of a given node fn all_ancestors(&self, known: UniqueIdent) -> OrdSet<UniqueIdent> { let mut all_known: OrdSet<UniqueIdent> = OrdSet::new(); let mut to_collect = vec![known]; @@ -111,21 +129,23 @@ impl DavDag { // INTERNAL functions /// Register a WebDAV item (put, copy, move) - fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { + fn register(&mut self, entry: IndexEntry) { + let (blob_id, filename, _etag) = entry.clone(); + // Insert item in the source of trust - self.table.insert(ident, entry.clone()); + self.table.insert(blob_id, entry); // Update the cache - let (filename, _etag) = entry; - self.idx_by_filename.insert(filename, ident); + self.idx_by_filename.insert(filename, blob_id); } /// Unregister a WebDAV item (delete, move) fn unregister(&mut self, ident: &UniqueIdent) { // Query the source of truth to get the information we // need to clean the indexes - let (filename, _etag) = match self.table.get(ident) { + let (_blob_id, filename, _etag) = match self.table.get(ident) { Some(v) => v, + // Element does not exist, return early None => return, }; self.idx_by_filename.remove(filename); @@ -134,29 +154,9 @@ impl DavDag { self.table.remove(ident); } - // @FIXME: maybe in case of error we could simply disable the sync graph - // and ask the client to rely on manual sync. For now, we are skipping the event - // which is midly satisfying. - /// When an event is processed, update the synchronization DAG - fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { - // --- Update SUCCESSORS - // All parents must exist in successors otherwise we can't accept item: - // do the check + update successors - let mut try_successors = self.successors.clone(); - for par in parents.iter() { - match try_successors.get_mut(par) { - None => { - tracing::warn!("Unable to push a Dav DAG sync op into the graph, an event is missing, it's a bug"); - return false - }, - Some(v) => v.insert(*child), - }; - } - self.successors = try_successors; - - // This event is also a future successor - self.successors.insert(*child, ordset![]); + fn sync_dag(&mut self, sync_desc: &SyncDesc) -> bool { + let (parents, child) = sync_desc; // --- Update ANCESTORS // We register ancestors as it is required for the sync algorithm @@ -177,6 +177,9 @@ impl DavDag { // This event becomes a new HEAD in turn self.heads.insert(*child); + + // --- Update ALL NODES + self.all_nodes.insert(*child); true } @@ -189,18 +192,18 @@ impl BayouState for DavDag { let mut new = self.clone(); match op { - DavDagOp::Put(parents, ident, entry) => { - if new.sync_dag(ident, parents.as_slice()) { - new.register(*ident, entry.clone()); + DavDagOp::Put(sync_desc, entry) => { + if new.sync_dag(sync_desc) { + new.register(entry.clone()); } }, - DavDagOp::Delete(parents, ident) => { - if new.sync_dag(ident, parents.as_slice()) { - new.unregister(ident); + DavDagOp::Delete(sync_desc, blob_id) => { + if new.sync_dag(sync_desc) { + new.unregister(blob_id); } }, - DavDagOp::Merge(parents, ident) => { - new.sync_dag(ident, parents.as_slice()); + DavDagOp::Merge(sync_desc) => { + new.sync_dag(sync_desc); } } @@ -211,7 +214,7 @@ impl BayouState for DavDag { // CUSTOM SERIALIZATION & DESERIALIZATION #[derive(Serialize, Deserialize)] struct DavDagSerializedRepr { - items: Vec<(UniqueIdent, IndexEntry)>, + items: Vec<IndexEntry>, heads: Vec<UniqueIdent>, } @@ -224,11 +227,10 @@ impl<'de> Deserialize<'de> for DavDag { let mut davdag = DavDag::default(); // Build the table + index - val.items.into_iter().for_each(|(ident, entry)| davdag.register(ident, entry)); + val.items.into_iter().for_each(|entry| davdag.register(entry)); // Initialize the synchronization DAG with its roots val.heads.into_iter().for_each(|ident| { - davdag.successors.insert(ident, ordset![]); davdag.heads.insert(ident); davdag.origins.insert(ident); davdag.all_nodes.insert(ident); @@ -244,7 +246,7 @@ impl Serialize for DavDag { S: Serializer, { // Indexes are rebuilt on the fly, we serialize only the core database - let items = self.table.iter().map(|(ident, entry)| (*ident, entry.clone())).collect(); + let items = self.table.iter().map(|(_, entry)| entry.clone()).collect(); // We keep only the head entries from the sync graph, // these entries will be used to initialize it back when deserializing @@ -255,3 +257,53 @@ impl Serialize for DavDag { val.serialize(serializer) } } + +// ---- TESTS ---- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn base() { + let mut state = DavDag::default(); + + // Add item 1 + { + let m = UniqueIdent([0x01; 24]); + let ev = state.op_put((m, "cal.ics".into(), "321-321".into())); + state = state.apply(&ev); + + assert_eq!(state.table.len(), 1); + assert_eq!(state.resolve(ev.token()).unwrap().len(), 0); + } + + // Add 2 concurrent items + let (t1, t2) = { + let blob1 = UniqueIdent([0x02; 24]); + let ev1 = state.op_put((blob1, "cal2.ics".into(), "321-321".into())); + + let blob2 = UniqueIdent([0x01; 24]); + let ev2 = state.op_delete(blob2); + + state = state.apply(&ev1); + state = state.apply(&ev2); + + assert_eq!(state.table.len(), 1); + assert_eq!(state.resolve(ev1.token()).unwrap(), ordset![ev2.token()]); + + (ev1.token(), ev2.token()) + }; + + // Add later a new item + { + let blob3 = UniqueIdent([0x03; 24]); + let ev = state.op_put((blob3, "cal3.ics".into(), "321-321".into())); + + state = state.apply(&ev); + assert_eq!(state.table.len(), 2); + assert_eq!(state.resolve(ev.token()).unwrap().len(), 0); + assert_eq!(state.resolve(t1).unwrap(), ordset![t2, ev.token()]); + } + } +} |