diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/crdt/lww.rs | 7 | ||||
-rw-r--r-- | src/util/crdt/lww_map.rs | 62 | ||||
-rw-r--r-- | src/util/time.rs | 5 |
3 files changed, 54 insertions, 20 deletions
diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs index bc686e05..99bd8e7c 100644 --- a/src/util/crdt/lww.rs +++ b/src/util/crdt/lww.rs @@ -61,7 +61,7 @@ where /// /// Compared to new, the CRDT's timestamp is not set to now /// but must be set to the previous, non-compatible, CRDT's timestamp. - pub fn migrate_from_raw(ts: u64, value: T) -> Self { + pub fn raw(ts: u64, value: T) -> Self { Self { ts, v: value } } @@ -77,6 +77,11 @@ where self.v = new_value; } + /// Get the timestamp currently associated with the value + pub fn timestamp(&self) -> u64 { + self.ts + } + /// Get the CRDT value pub fn get(&self) -> &T { &self.v diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs index 21cb6e12..f3a90591 100644 --- a/src/util/crdt/lww_map.rs +++ b/src/util/crdt/lww_map.rs @@ -37,11 +37,12 @@ where pub fn new() -> Self { Self { vals: vec![] } } + /// Used to migrate from a map defined in an incompatible format. This produces /// a map that contains a single item with the specified timestamp (copied from /// the incompatible format). Do this as many times as you have items to migrate, /// and put them all together using the CRDT merge operator. - pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + pub fn raw_item(k: K, ts: u64, v: V) -> Self { Self { vals: vec![(k, ts, v)], } @@ -74,9 +75,37 @@ where Self { vals: new_vals } } + /// Updates a value in place in the map (this generates + /// a new timestamp) pub fn update_in_place(&mut self, k: K, new_v: V) { self.merge(&self.update_mutator(k, new_v)); } + + /// Updates a value in place in the map, from a + /// (key, timestamp, value) triple, only if the given + /// timestamp is larger than the timestamp currently + /// in the map + pub fn merge_raw(&mut self, k: &K, ts2: u64, v2: &V) { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { + Ok(i) => { + let (_, ts1, _v1) = &self.vals[i]; + match ts2.cmp(ts1) { + Ordering::Greater => { + self.vals[i].1 = ts2; + self.vals[i].2 = v2.clone(); + } + Ordering::Equal => { + self.vals[i].2.merge(v2); + } + Ordering::Less => (), + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), ts2, v2.clone())); + } + } + } + /// Takes all of the values of the map and returns them. The current map is reset to the /// empty map. This is very usefull to produce in-place a new map that contains only a delta /// that modifies a certain value: @@ -103,10 +132,12 @@ where let vals = std::mem::take(&mut self.vals); Self { vals } } + /// Removes all values from the map pub fn clear(&mut self) { self.vals.clear(); } + /// Get a reference to the value assigned to a key pub fn get(&self, k: &K) -> Option<&V> { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { @@ -114,6 +145,16 @@ where Err(_) => None, } } + + /// Get the timestamp of the value assigned to a key, or 0 if + /// no value is assigned + pub fn get_timestamp(&self, k: &K) -> u64 { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { + Ok(i) => self.vals[i].1, + Err(_) => 0, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. /// In most case you will want to ignore the timestamp (second item of the tuple). pub fn items(&self) -> &[(K, u64, V)] { @@ -138,24 +179,7 @@ where { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { - Ok(i) => { - let (_, ts1, _v1) = &self.vals[i]; - match ts2.cmp(ts1) { - Ordering::Greater => { - self.vals[i].1 = *ts2; - self.vals[i].2 = v2.clone(); - } - Ordering::Equal => { - self.vals[i].2.merge(v2); - } - Ordering::Less => (), - } - } - Err(i) => { - self.vals.insert(i, (k.clone(), *ts2, v2.clone())); - } - } + self.merge_raw(k, *ts2, v2); } } } diff --git a/src/util/time.rs b/src/util/time.rs index d9192443..257b4d2a 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -15,6 +15,11 @@ pub fn increment_logical_clock(prev: u64) -> u64 { std::cmp::max(prev + 1, now_msec()) } +/// Increment two logical clocks +pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 { + std::cmp::max(prev2 + 1, std::cmp::max(prev + 1, now_msec())) +} + /// Convert a timestamp represented as milliseconds since UNIX Epoch to /// its RFC3339 representation, such as "2021-01-01T12:30:00Z" pub fn msec_to_rfc3339(msecs: u64) -> String { |