aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/crdt/lww.rs7
-rw-r--r--src/util/crdt/lww_map.rs62
-rw-r--r--src/util/time.rs5
3 files changed, 54 insertions, 20 deletions
diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs
index bc686e05..99bd8e7c 100644
--- a/src/util/crdt/lww.rs
+++ b/src/util/crdt/lww.rs
@@ -61,7 +61,7 @@ where
///
/// Compared to new, the CRDT's timestamp is not set to now
/// but must be set to the previous, non-compatible, CRDT's timestamp.
- pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ pub fn raw(ts: u64, value: T) -> Self {
Self { ts, v: value }
}
@@ -77,6 +77,11 @@ where
self.v = new_value;
}
+ /// Get the timestamp currently associated with the value
+ pub fn timestamp(&self) -> u64 {
+ self.ts
+ }
+
/// Get the CRDT value
pub fn get(&self) -> &T {
&self.v
diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
index 21cb6e12..f3a90591 100644
--- a/src/util/crdt/lww_map.rs
+++ b/src/util/crdt/lww_map.rs
@@ -37,11 +37,12 @@ where
pub fn new() -> Self {
Self { vals: vec![] }
}
+
/// Used to migrate from a map defined in an incompatible format. This produces
/// a map that contains a single item with the specified timestamp (copied from
/// the incompatible format). Do this as many times as you have items to migrate,
/// and put them all together using the CRDT merge operator.
- pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ pub fn raw_item(k: K, ts: u64, v: V) -> Self {
Self {
vals: vec![(k, ts, v)],
}
@@ -74,9 +75,37 @@ where
Self { vals: new_vals }
}
+ /// Updates a value in place in the map (this generates
+ /// a new timestamp)
pub fn update_in_place(&mut self, k: K, new_v: V) {
self.merge(&self.update_mutator(k, new_v));
}
+
+ /// Updates a value in place in the map, from a
+ /// (key, timestamp, value) triple, only if the given
+ /// timestamp is larger than the timestamp currently
+ /// in the map
+ pub fn merge_raw(&mut self, k: &K, ts2: u64, v2: &V) {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ match ts2.cmp(ts1) {
+ Ordering::Greater => {
+ self.vals[i].1 = ts2;
+ self.vals[i].2 = v2.clone();
+ }
+ Ordering::Equal => {
+ self.vals[i].2.merge(v2);
+ }
+ Ordering::Less => (),
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), ts2, v2.clone()));
+ }
+ }
+ }
+
/// Takes all of the values of the map and returns them. The current map is reset to the
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
/// that modifies a certain value:
@@ -103,10 +132,12 @@ where
let vals = std::mem::take(&mut self.vals);
Self { vals }
}
+
/// Removes all values from the map
pub fn clear(&mut self) {
self.vals.clear();
}
+
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
@@ -114,6 +145,16 @@ where
Err(_) => None,
}
}
+
+ /// Get the timestamp of the value assigned to a key, or 0 if
+ /// no value is assigned
+ pub fn get_timestamp(&self, k: &K) -> u64 {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
+ Ok(i) => self.vals[i].1,
+ Err(_) => 0,
+ }
+ }
+
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
/// In most case you will want to ignore the timestamp (second item of the tuple).
pub fn items(&self) -> &[(K, u64, V)] {
@@ -138,24 +179,7 @@ where
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
- Ok(i) => {
- let (_, ts1, _v1) = &self.vals[i];
- match ts2.cmp(ts1) {
- Ordering::Greater => {
- self.vals[i].1 = *ts2;
- self.vals[i].2 = v2.clone();
- }
- Ordering::Equal => {
- self.vals[i].2.merge(v2);
- }
- Ordering::Less => (),
- }
- }
- Err(i) => {
- self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
- }
- }
+ self.merge_raw(k, *ts2, v2);
}
}
}
diff --git a/src/util/time.rs b/src/util/time.rs
index d9192443..257b4d2a 100644
--- a/src/util/time.rs
+++ b/src/util/time.rs
@@ -15,6 +15,11 @@ pub fn increment_logical_clock(prev: u64) -> u64 {
std::cmp::max(prev + 1, now_msec())
}
+/// Increment two logical clocks
+pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 {
+ std::cmp::max(prev2 + 1, std::cmp::max(prev + 1, now_msec()))
+}
+
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String {