aboutsummaryrefslogtreecommitdiff
path: root/aero-collections
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-03-27 15:09:18 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-03-27 15:09:18 +0100
commita146a0babc25547f269c784e090e308fa831ab32 (patch)
tree2a57238c3dde7859c73839a06203dda6562ddd0d /aero-collections
parent0b57200eeb6780e843c5f798bdc53781eb83d51f (diff)
downloadaerogramme-a146a0babc25547f269c784e090e308fa831ab32.tar.gz
aerogramme-a146a0babc25547f269c784e090e308fa831ab32.zip
Sync algorithm
Diffstat (limited to 'aero-collections')
-rw-r--r--aero-collections/src/davdag.rs82
1 files changed, 77 insertions, 5 deletions
diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs
index 696b985..59dcc7b 100644
--- a/aero-collections/src/davdag.rs
+++ b/aero-collections/src/davdag.rs
@@ -1,3 +1,4 @@
+use anyhow::{bail, Result};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use im::{OrdMap, OrdSet, ordset};
@@ -23,9 +24,14 @@ pub struct DavDag {
/// Partial synchronization graph
/// parent -> direct children
pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
-
+ pub ancestors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
+
+ /// All nodes
+ pub all_nodes: OrdSet<UniqueIdent>,
/// Head nodes
pub heads: OrdSet<UniqueIdent>,
+ /// Origin nodes
+ pub origins: OrdSet<UniqueIdent>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -54,11 +60,57 @@ impl DavDag {
}
// HELPER functions
+
+ /// All HEAD events
pub fn heads_vec(&self) -> Vec<UniqueIdent> {
self.heads.clone().into_iter().collect()
}
+ /// Resolve a sync token
+ pub fn resolve(&self, known: UniqueIdent) -> Result<OrdSet<UniqueIdent>> {
+ let already_known = self.all_ancestors(known);
+
+ // We can't capture all missing events if we are not connected
+ // to all sinks of the graph, ie. if we don't already know all the sinks.
+ if !self.origins.is_subset(already_known.clone()) {
+ bail!("Not enough history to produce a correct diff, a full resync is needed");
+ }
+
+ // Missing items are all existing graph items from which
+ // we removed all items known by the given node.
+ // In other words, all values in all_nodes that are not in already_known.
+ Ok(self.all_nodes.clone().relative_complement(already_known))
+ }
+
+ /// Find all ancestors of a given
+ fn all_ancestors(&self, known: UniqueIdent) -> OrdSet<UniqueIdent> {
+ let mut all_known: OrdSet<UniqueIdent> = OrdSet::new();
+ let mut to_collect = vec![known];
+ loop {
+ let cursor = match to_collect.pop() {
+ // Loop stops here
+ None => break,
+ Some(v) => v,
+ };
+
+ if all_known.insert(cursor).is_some() {
+ // Item already processed
+ continue
+ }
+
+ // Collect parents
+ let parents = match self.ancestors.get(&cursor) {
+ None => continue,
+ Some(c) => c,
+ };
+ to_collect.extend(parents.iter());
+ }
+ all_known
+ }
+
// INTERNAL functions
+
+ /// Register a WebDAV item (put, copy, move)
fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) {
// Insert item in the source of trust
self.table.insert(ident, entry.clone());
@@ -68,6 +120,7 @@ impl DavDag {
self.idx_by_filename.insert(filename, ident);
}
+ /// Unregister a WebDAV item (delete, move)
fn unregister(&mut self, ident: &UniqueIdent) {
// Query the source of truth to get the information we
// need to clean the indexes
@@ -84,8 +137,11 @@ impl DavDag {
// @FIXME: maybe in case of error we could simply disable the sync graph
// and ask the client to rely on manual sync. For now, we are skipping the event
// which is midly satisfying.
+
+ /// When an event is processed, update the synchronization DAG
fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool {
- // All parents must exist in successors otherwise we can't accept item
+ // --- Update SUCCESSORS
+ // All parents must exist in successors otherwise we can't accept item:
// do the check + update successors
let mut try_successors = self.successors.clone();
for par in parents.iter() {
@@ -99,15 +155,29 @@ impl DavDag {
}
self.successors = try_successors;
+ // This event is also a future successor
+ self.successors.insert(*child, ordset![]);
+
+ // --- Update ANCESTORS
+ // We register ancestors as it is required for the sync algorithm
+ self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| {
+ acc.insert(*p);
+ acc
+ }));
+
+ // --- Update ORIGINS
+ // If this event has no parents, it's an origin
+ if parents.is_empty() {
+ self.origins.insert(*child);
+ }
+
+ // --- Update HEADS
// Remove from HEADS this event's parents
parents.iter().for_each(|par| { self.heads.remove(par); });
// This event becomes a new HEAD in turn
self.heads.insert(*child);
- // This event is also a future successor
- self.successors.insert(*child, ordset![]);
-
true
}
}
@@ -160,6 +230,8 @@ impl<'de> Deserialize<'de> for DavDag {
val.heads.into_iter().for_each(|ident| {
davdag.successors.insert(ident, ordset![]);
davdag.heads.insert(ident);
+ davdag.origins.insert(ident);
+ davdag.all_nodes.insert(ident);
});
Ok(davdag)