From 789540ca379efd710f1e2699fd458b4821f36a5b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 11:59:57 +0100 Subject: Type definition for range seen marker --- src/model/k2v/seen.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 src/model/k2v/seen.rs (limited to 'src/model/k2v/seen.rs') diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs new file mode 100644 index 00000000..b8f4ff27 --- /dev/null +++ b/src/model/k2v/seen.rs @@ -0,0 +1,85 @@ +//! Implements a RangeSeenMarker, a data type used in the PollRange API +//! to indicate which items in the range have already been seen +//! and which have not been seen yet. +//! +//! It consists of a vector clock that indicates that for each node, +//! all items produced by that node with timestamps <= the value in the +//! vector clock has been seen, as well as a set of causal contexts for +//! individual items. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::Uuid; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; +use garage_util::error::{Error, OkOrMessage}; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct RangeSeenMarker { + vector_clock: VectorClock, + items: BTreeMap, +} + +impl RangeSeenMarker { + pub fn new() -> Self { + Self::default() + } + + pub fn mark_seen_node_items<'a, I: IntoIterator>( + &mut self, + node: Uuid, + items: I, + ) { + let node = make_node_id(node); + for item in items.into_iter() { + let cc = item.causal_context(); + + if let Some(ts) = cc.vector_clock.get(&node) { + let ent = self.vector_clock.entry(node).or_insert(0); + *ent = std::cmp::max(*ent, *ts); + } + + if vclock_gt(&cc.vector_clock, &self.vector_clock) { + match self.items.get_mut(&item.sort_key) { + None => { + self.items.insert(item.sort_key.clone(), cc.vector_clock); + } + Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock), + } + } + } + } + + pub fn canonicalize(&mut self) { + let self_vc = &self.vector_clock; + self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc)) + } + + pub fn encode(&mut self) -> Result { + self.canonicalize(); + + let bytes = nonversioned_encode(&self)?; + let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; + Ok(base64::encode(&bytes)) + } + + pub fn decode(s: &str) -> Result { + let bytes = base64::decode(&s).ok_or_message("invalid base64")?; + let bytes = zstd::stream::decode_all(&mut &bytes[..])?; + Ok(nonversioned_decode(&bytes)?) + } + + pub fn is_new_item(&self, item: &K2VItem) -> bool { + let cc = item.causal_context(); + vclock_gt(&cc.vector_clock, &self.vector_clock) + && self + .items + .get(&item.sort_key) + .map(|vc| vclock_gt(&cc.vector_clock, &vc)) + .unwrap_or(true) + } +} -- cgit v1.2.3 From 43fd6c1526339fb444e5e25021f42d0aee252b27 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 12:54:24 +0100 Subject: PollRange RPC --- src/model/k2v/seen.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'src/model/k2v/seen.rs') diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index b8f4ff27..d2cd54c7 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -17,6 +17,7 @@ use garage_util::error::{Error, OkOrMessage}; use crate::k2v::causality::*; use crate::k2v::item_table::*; +use crate::k2v::sub::*; #[derive(Debug, Serialize, Deserialize, Default)] pub struct RangeSeenMarker { @@ -29,6 +30,18 @@ impl RangeSeenMarker { Self::default() } + pub fn restrict(&mut self, range: &PollRange) { + if let Some(start) = &range.start { + self.items = self.items.split_off(start); + } + if let Some(end) = &range.end { + self.items.split_off(end); + } + if let Some(pfx) = &range.prefix { + self.items.retain(|k, _v| k.starts_with(pfx)); + } + } + pub fn mark_seen_node_items<'a, I: IntoIterator>( &mut self, node: Uuid, -- cgit v1.2.3 From bba13f40fc2e411347ea83960935b39cedb0a7c4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 12:27:19 +0100 Subject: Correctly return bad requests when seeh marker is invalid --- src/model/k2v/seen.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src/model/k2v/seen.rs') diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index d2cd54c7..314d0f9e 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -13,8 +13,9 @@ use serde::{Deserialize, Serialize}; use garage_util::data::Uuid; use garage_util::encode::{nonversioned_decode, nonversioned_encode}; -use garage_util::error::{Error, OkOrMessage}; +use garage_util::error::Error; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::sub::*; @@ -80,10 +81,15 @@ impl RangeSeenMarker { Ok(base64::encode(&bytes)) } - pub fn decode(s: &str) -> Result { - let bytes = base64::decode(&s).ok_or_message("invalid base64")?; - let bytes = zstd::stream::decode_all(&mut &bytes[..])?; - Ok(nonversioned_decode(&bytes)?) + /// Decode from msgpack+zstd+b64 representation, returns None on error. + pub fn decode(s: &str) -> Option { + let bytes = base64::decode(&s).ok()?; + let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?; + nonversioned_decode(&bytes).ok() + } + + pub fn decode_helper(s: &str) -> Result { + Self::decode(s).ok_or_bad_request("Invalid causality token") } pub fn is_new_item(&self, item: &K2VItem) -> bool { -- cgit v1.2.3