aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/model/k2v/causality.rs39
-rw-r--r--src/model/k2v/item_table.rs2
-rw-r--r--src/model/k2v/mod.rs1
-rw-r--r--src/model/k2v/seen.rs85
4 files changed, 117 insertions, 10 deletions
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 9a692870..665b02b2 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -1,3 +1,13 @@
+//! Implements a CausalContext, which is a set of timestamps for each
+//! node -- a vector clock --, indicating that the versions with
+//! timestamps <= these numbers have been seen and can be
+//! overwritten by a subsequent write.
+//!
+//! The textual representation of a CausalContext, which we call a
+//! "causality token", is used in the API and must be sent along with
+//! each write or delete operation to indicate the previously seen
+//! versions that we want to overwrite or delete.
+
use std::collections::BTreeMap;
use std::convert::TryInto;
@@ -9,23 +19,36 @@ use garage_util::data::*;
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;
+pub type VectorClock = BTreeMap<K2VNodeId, u64>;
+
pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
let mut tmp = [0u8; 8];
tmp.copy_from_slice(&node_id.as_slice()[..8]);
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
+ a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
+}
+
+pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
+ let mut ret = a.clone();
+ for (n, ts) in b.iter() {
+ let ent = ret.entry(*n).or_insert(0);
+ *ent = std::cmp::max(*ts, *ent);
+ }
+ ret
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
pub struct CausalContext {
- pub vector_clock: BTreeMap<K2VNodeId, u64>,
+ pub vector_clock: VectorClock,
}
impl CausalContext {
/// Empty causality context
- pub fn new_empty() -> Self {
- Self {
- vector_clock: BTreeMap::new(),
- }
+ pub fn new() -> Self {
+ Self::default()
}
/// Make binary representation and encode in base64
pub fn serialize(&self) -> String {
@@ -72,9 +95,7 @@ impl CausalContext {
}
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
- self.vector_clock
- .iter()
- .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
+ vclock_gt(&self.vector_clock, &other.vector_clock)
}
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 1ba9bb46..bc2b1aef 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -106,7 +106,7 @@ impl K2VItem {
/// Extract the causality context of a K2V Item
pub fn causal_context(&self) -> CausalContext {
- let mut cc = CausalContext::new_empty();
+ let mut cc = CausalContext::new();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index 83ad2512..c488b4c6 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,4 +1,5 @@
pub mod causality;
+pub mod seen;
pub mod item_table;
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
new file mode 100644
index 00000000..b8f4ff27
--- /dev/null
+++ b/src/model/k2v/seen.rs
@@ -0,0 +1,85 @@
+//! Implements a RangeSeenMarker, a data type used in the PollRange API
+//! to indicate which items in the range have already been seen
+//! and which have not been seen yet.
+//!
+//! It consists of a vector clock that indicates that for each node,
+//! all items produced by that node with timestamps <= the value in the
+//! vector clock has been seen, as well as a set of causal contexts for
+//! individual items.
+
+use std::collections::BTreeMap;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::Uuid;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
+use garage_util::error::{Error, OkOrMessage};
+
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Serialize, Deserialize, Default)]
+pub struct RangeSeenMarker {
+ vector_clock: VectorClock,
+ items: BTreeMap<String, VectorClock>,
+}
+
+impl RangeSeenMarker {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
+ &mut self,
+ node: Uuid,
+ items: I,
+ ) {
+ let node = make_node_id(node);
+ for item in items.into_iter() {
+ let cc = item.causal_context();
+
+ if let Some(ts) = cc.vector_clock.get(&node) {
+ let ent = self.vector_clock.entry(node).or_insert(0);
+ *ent = std::cmp::max(*ent, *ts);
+ }
+
+ if vclock_gt(&cc.vector_clock, &self.vector_clock) {
+ match self.items.get_mut(&item.sort_key) {
+ None => {
+ self.items.insert(item.sort_key.clone(), cc.vector_clock);
+ }
+ Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
+ }
+ }
+ }
+ }
+
+ pub fn canonicalize(&mut self) {
+ let self_vc = &self.vector_clock;
+ self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
+ }
+
+ pub fn encode(&mut self) -> Result<String, Error> {
+ self.canonicalize();
+
+ let bytes = nonversioned_encode(&self)?;
+ let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
+ Ok(base64::encode(&bytes))
+ }
+
+ pub fn decode(s: &str) -> Result<Self, Error> {
+ let bytes = base64::decode(&s).ok_or_message("invalid base64")?;
+ let bytes = zstd::stream::decode_all(&mut &bytes[..])?;
+ Ok(nonversioned_decode(&bytes)?)
+ }
+
+ pub fn is_new_item(&self, item: &K2VItem) -> bool {
+ let cc = item.causal_context();
+ vclock_gt(&cc.vector_clock, &self.vector_clock)
+ && self
+ .items
+ .get(&item.sort_key)
+ .map(|vc| vclock_gt(&cc.vector_clock, &vc))
+ .unwrap_or(true)
+ }
+}