aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/garage.rs8
-rw-r--r--src/model/k2v/causality.rs62
-rw-r--r--src/model/k2v/item_table.rs11
-rw-r--r--src/model/k2v/mod.rs4
-rw-r--r--src/model/k2v/poll.rs50
-rw-r--r--src/model/k2v/rpc.rs275
-rw-r--r--src/model/k2v/seen.rs104
-rw-r--r--src/model/k2v/sub.rs110
8 files changed, 531 insertions, 93 deletions
diff --git a/src/model/garage.rs b/src/model/garage.rs
index ffa54dc5..4716954a 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, rpc::*, sub::*};
/// An entire Garage full of data
pub struct Garage {
@@ -305,8 +305,10 @@ impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
+
info!("Initialize K2V item table...");
let item_table = Table::new(
K2VItemTable {
@@ -317,7 +319,9 @@ impl GarageK2V {
system.clone(),
db,
);
- let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ info!("Initialize K2V RPC handler...");
+ let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);
Self {
item_table,
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 9a692870..b1ec8035 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -1,3 +1,13 @@
+//! Implements a CausalContext, which is a set of timestamps for each
+//! node -- a vector clock --, indicating that the versions with
+//! timestamps <= these numbers have been seen and can be
+//! overwritten by a subsequent write.
+//!
+//! The textual representation of a CausalContext, which we call a
+//! "causality token", is used in the API and must be sent along with
+//! each write or delete operation to indicate the previously seen
+//! versions that we want to overwrite or delete.
+
use std::collections::BTreeMap;
use std::convert::TryInto;
@@ -5,28 +15,44 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+
/// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;
+pub type VectorClock = BTreeMap<K2VNodeId, u64>;
+
pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
let mut tmp = [0u8; 8];
tmp.copy_from_slice(&node_id.as_slice()[..8]);
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
+ a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
+}
+
+pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
+ let mut ret = a.clone();
+ for (n, ts) in b.iter() {
+ let ent = ret.entry(*n).or_insert(0);
+ *ent = std::cmp::max(*ts, *ent);
+ }
+ ret
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
pub struct CausalContext {
- pub vector_clock: BTreeMap<K2VNodeId, u64>,
+ pub vector_clock: VectorClock,
}
impl CausalContext {
/// Empty causality context
- pub fn new_empty() -> Self {
- Self {
- vector_clock: BTreeMap::new(),
- }
+ pub fn new() -> Self {
+ Self::default()
}
+
/// Make binary representation and encode in base64
pub fn serialize(&self) -> String {
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
@@ -43,12 +69,13 @@ impl CausalContext {
base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
}
- /// Parse from base64-encoded binary representation
- pub fn parse(s: &str) -> Result<Self, String> {
- let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
- .map_err(|e| format!("bad causality token base64: {}", e))?;
+
+ /// Parse from base64-encoded binary representation.
+ /// Returns None on error.
+ pub fn parse(s: &str) -> Option<Self> {
+ let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD).ok()?;
if bytes.len() % 16 != 8 || bytes.len() < 8 {
- return Err("bad causality token length".into());
+ return None;
}
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
@@ -65,16 +92,19 @@ impl CausalContext {
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
if check != checksum {
- return Err("bad causality token checksum".into());
+ return None;
}
- Ok(ret)
+ Some(ret)
}
+
+ pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
+ Self::parse(s).ok_or_bad_request("Invalid causality token")
+ }
+
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
- self.vector_clock
- .iter()
- .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
+ vclock_gt(&self.vector_clock, &other.vector_clock)
}
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index ce3e4129..bc2b1aef 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
@@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
- ) {
+ node_ts: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -98,12 +99,14 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
pub fn causal_context(&self) -> CausalContext {
- let mut cc = CausalContext::new_empty();
+ let mut cc = CausalContext::new();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index f6a96151..acc1fcdc 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,8 @@
pub mod causality;
+pub mod seen;
pub mod item_table;
-pub mod poll;
pub mod rpc;
+
+pub mod sub;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
deleted file mode 100644
index 93105207..00000000
--- a/src/model/k2v/poll.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use std::collections::HashMap;
-use std::sync::Mutex;
-
-use serde::{Deserialize, Serialize};
-use tokio::sync::broadcast;
-
-use crate::k2v::item_table::*;
-
-#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct PollKey {
- pub partition: K2VItemPartition,
- pub sort_key: String,
-}
-
-#[derive(Default)]
-pub struct SubscriptionManager {
- subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
-}
-
-impl SubscriptionManager {
- pub fn new() -> Self {
- Self::default()
- }
-
- pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(key) {
- s.subscribe()
- } else {
- let (tx, rx) = broadcast::channel(8);
- subs.insert(key.clone(), tx);
- rx
- }
- }
-
- pub fn notify(&self, item: &K2VItem) {
- let key = PollKey {
- partition: item.partition.clone(),
- sort_key: item.sort_key.clone(),
- };
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(&key) {
- if s.send(item.clone()).is_err() {
- // no more subscribers, remove channel from here
- // (we will re-create it later if we need to subscribe again)
- subs.remove(&key);
- }
- }
- }
-}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..117103b6 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -5,9 +5,10 @@
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::Duration;
+use std::collections::{BTreeMap, HashMap};
+use std::convert::TryInto;
+use std::sync::{Arc, Mutex, MutexGuard};
+use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::now_msec;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -25,9 +29,15 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
+use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::seen::*;
+use crate::k2v::sub::*;
+
+const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
+
+const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@@ -40,7 +50,13 @@ enum K2VRpc {
causal_context: CausalContext,
timeout_msec: u64,
},
+ PollRange {
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ },
PollItemResponse(Option<K2VItem>),
+ PollRangeResponse(Uuid, Vec<K2VItem>),
}
#[derive(Debug, Serialize, Deserialize)]
@@ -59,6 +75,12 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+
+ // Using a mutex on the local_timestamp_tree is not strictly necessary,
+ // but it helps to not try to do several inserts at the same time,
+ // which would create transaction conflicts and force many useless retries.
+ local_timestamp_tree: Mutex<db::Tree>,
+
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -66,14 +88,19 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
+ db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
+ let local_timestamp_tree = db
+ .open_tree("k2v_local_timestamp")
+ .expect("Unable to open DB tree for k2v local timestamp");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ local_timestamp_tree: Mutex::new(local_timestamp_tree),
endpoint,
subscriptions,
});
@@ -181,7 +208,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -230,9 +257,7 @@ impl K2VRpcHandler {
resp = Some(x);
}
}
- K2VRpc::PollItemResponse(None) => {
- return Ok(None);
- }
+ K2VRpc::PollItemResponse(None) => (),
v => return Err(Error::unexpected_rpc_message(v)),
}
}
@@ -240,10 +265,117 @@ impl K2VRpcHandler {
Ok(resp)
}
+ pub async fn poll_range(
+ &self,
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
+ let has_seen_marker = seen_str.is_some();
+
+ // Parse seen marker, we will use it below. This is also the first check
+ // that it is valid, which returns a bad request error if not.
+ let mut seen = seen_str
+ .as_deref()
+ .map(RangeSeenMarker::decode_helper)
+ .transpose()?
+ .unwrap_or_default();
+ seen.restrict(&range);
+
+ // Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ let nodes = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&range.partition.hash());
+ let quorum = self.item_table.data.replication.read_quorum();
+ let msg = K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ };
+
+ // Send the request to all nodes, use FuturesUnordered to get the responses in any order
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
+ let mut requests = nodes
+ .iter()
+ .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .collect::<FuturesUnordered<_>>();
+
+ // Fetch responses. This procedure stops fetching responses when any of the following
+ // conditions arise:
+ // - we have a response to all requests
+ // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
+ // has passed since the quorum was achieved
+ // - a global RPC timeout expired
+ // The extra delay after a quorum was received is usefull if the third response was to
+ // arrive during this short interval: this would allow us to consider all the data seen
+ // by that last node in the response we produce, and would likely help reduce the
+ // size of the seen marker that we will return (because we would have an info of the
+ // kind: all items produced by that node until time ts have been returned, so we can
+ // bump the entry in the global vector clock and possibly remove some item-specific
+ // vector clocks)
+ let mut deadline =
+ Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut resps = vec![];
+ let mut errors = vec![];
+ loop {
+ select! {
+ _ = tokio::time::sleep_until(deadline.into()) => {
+ break;
+ }
+ res = requests.next() => match res {
+ None => break,
+ Some(Err(e)) => errors.push(e),
+ Some(Ok(r)) => {
+ resps.push(r);
+ if resps.len() >= quorum {
+ deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
+ }
+ }
+ }
+ }
+ }
+ if errors.len() > nodes.len() - quorum {
+ let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
+ return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ }
+
+ // Take all returned items into account to produce the response.
+ let mut new_items = BTreeMap::<String, K2VItem>::new();
+ for v in resps {
+ if let K2VRpc::PollRangeResponse(node, items) = v {
+ seen.mark_seen_node_items(node, items.iter());
+ for item in items.into_iter() {
+ match new_items.get_mut(&item.sort_key) {
+ Some(ent) => {
+ ent.merge(&item);
+ }
+ None => {
+ new_items.insert(item.sort_key.clone(), item);
+ }
+ }
+ }
+ } else {
+ return Err(Error::unexpected_rpc_message(v).into());
+ }
+ }
+
+ if new_items.is_empty() && has_seen_marker {
+ Ok(None)
+ } else {
+ Ok(Some((new_items, seen.encode()?)))
+ }
+ }
+
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
- let new = self.local_insert(item)?;
+ let new = {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ self.local_insert(&local_timestamp_tree, item)?
+ };
// Propagate to rest of network
if let Some(updated) = new {
@@ -256,11 +388,14 @@ impl K2VRpcHandler {
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
- for item in items {
- let new = self.local_insert(item)?;
+ {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ for item in items {
+ let new = self.local_insert(&local_timestamp_tree, item)?;
- if let Some(updated) = new {
- updated_vec.push(updated);
+ if let Some(updated) = new {
+ updated_vec.push(updated);
+ }
}
}
@@ -272,10 +407,22 @@ impl K2VRpcHandler {
Ok(K2VRpc::Ok)
}
- fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ fn local_insert(
+ &self,
+ local_timestamp_tree: &MutexGuard<'_, db::Tree>,
+ item: &InsertedItem,
+ ) -> Result<Option<K2VItem>, Error> {
+ let now = now_msec();
+
self.item_table
.data
- .update_entry_with(&item.partition, &item.sort_key, |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let old_local_timestamp = tx
+ .get(&local_timestamp_tree, TIMESTAMP_KEY)?
+ .and_then(|x| x.try_into().ok())
+ .map(u64::from_be_bytes)
+ .unwrap_or_default();
+
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@@ -283,13 +430,25 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
- ent.update(self.system.id, &item.causal_context, item.value.clone());
- ent
+ let new_local_timestamp = ent.update(
+ self.system.id,
+ &item.causal_context,
+ item.value.clone(),
+ std::cmp::max(old_local_timestamp, now),
+ );
+
+ tx.insert(
+ &local_timestamp_tree,
+ TIMESTAMP_KEY,
+ u64::to_be_bytes(new_local_timestamp),
+ )?;
+
+ Ok(ent)
})
}
- async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
- let mut chan = self.subscriptions.subscribe(key);
+ async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@@ -311,6 +470,71 @@ impl K2VRpcHandler {
Ok(value)
}
+
+ async fn handle_poll_range(
+ &self,
+ range: &PollRange,
+ seen_str: &Option<String>,
+ ) -> Result<Vec<K2VItem>, Error> {
+ if let Some(seen_str) = seen_str {
+ let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
+
+ // Subscribe now to all changes on that partition,
+ // so that new items that are inserted while we are reading the range
+ // will be seen in the loop below
+ let mut chan = self.subscriptions.subscribe_partition(&range.partition);
+
+ // Check for the presence of any new items already stored in the item table
+ let mut new_items = self.poll_range_read_range(range, &seen)?;
+
+ // If we found no new items, wait for a matching item to arrive
+ // on the channel
+ while new_items.is_empty() {
+ let item = chan.recv().await?;
+ if range.matches(&item) && seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ } else {
+ // If no seen marker was specified, we do not poll for anything.
+ // We return immediately with the set of known items (even if
+ // it is empty), which will give the client an inital view of
+ // the dataset and an initial seen marker for further
+ // PollRange calls.
+ self.poll_range_read_range(range, &RangeSeenMarker::default())
+ }
+ }
+
+ fn poll_range_read_range(
+ &self,
+ range: &PollRange,
+ seen: &RangeSeenMarker,
+ ) -> Result<Vec<K2VItem>, Error> {
+ let mut new_items = vec![];
+
+ let partition_hash = range.partition.hash();
+ let first_key = match &range.start {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
+ };
+ for item in self.item_table.data.store.range(first_key..)? {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let item = self.item_table.data.decode_entry(&value)?;
+ if !range.matches(&item) {
+ break;
+ }
+ if seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ }
}
#[async_trait]
@@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
- ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
+ K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ } => {
+ let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
+ select! {
+ ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
+ _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
+ }
+ }
m => Err(Error::unexpected_rpc_message(m)),
}
}
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
new file mode 100644
index 00000000..314d0f9e
--- /dev/null
+++ b/src/model/k2v/seen.rs
@@ -0,0 +1,104 @@
+//! Implements a RangeSeenMarker, a data type used in the PollRange API
+//! to indicate which items in the range have already been seen
+//! and which have not been seen yet.
+//!
+//! It consists of a vector clock that indicates that for each node,
+//! all items produced by that node with timestamps <= the value in the
+//! vector clock has been seen, as well as a set of causal contexts for
+//! individual items.
+
+use std::collections::BTreeMap;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::Uuid;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
+use garage_util::error::Error;
+
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+use crate::k2v::sub::*;
+
+#[derive(Debug, Serialize, Deserialize, Default)]
+pub struct RangeSeenMarker {
+ vector_clock: VectorClock,
+ items: BTreeMap<String, VectorClock>,
+}
+
+impl RangeSeenMarker {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn restrict(&mut self, range: &PollRange) {
+ if let Some(start) = &range.start {
+ self.items = self.items.split_off(start);
+ }
+ if let Some(end) = &range.end {
+ self.items.split_off(end);
+ }
+ if let Some(pfx) = &range.prefix {
+ self.items.retain(|k, _v| k.starts_with(pfx));
+ }
+ }
+
+ pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
+ &mut self,
+ node: Uuid,
+ items: I,
+ ) {
+ let node = make_node_id(node);
+ for item in items.into_iter() {
+ let cc = item.causal_context();
+
+ if let Some(ts) = cc.vector_clock.get(&node) {
+ let ent = self.vector_clock.entry(node).or_insert(0);
+ *ent = std::cmp::max(*ent, *ts);
+ }
+
+ if vclock_gt(&cc.vector_clock, &self.vector_clock) {
+ match self.items.get_mut(&item.sort_key) {
+ None => {
+ self.items.insert(item.sort_key.clone(), cc.vector_clock);
+ }
+ Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
+ }
+ }
+ }
+ }
+
+ pub fn canonicalize(&mut self) {
+ let self_vc = &self.vector_clock;
+ self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
+ }
+
+ pub fn encode(&mut self) -> Result<String, Error> {
+ self.canonicalize();
+
+ let bytes = nonversioned_encode(&self)?;
+ let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
+ Ok(base64::encode(&bytes))
+ }
+
+ /// Decode from msgpack+zstd+b64 representation, returns None on error.
+ pub fn decode(s: &str) -> Option<Self> {
+ let bytes = base64::decode(&s).ok()?;
+ let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
+ nonversioned_decode(&bytes).ok()
+ }
+
+ pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
+ Self::decode(s).ok_or_bad_request("Invalid causality token")
+ }
+
+ pub fn is_new_item(&self, item: &K2VItem) -> bool {
+ let cc = item.causal_context();
+ vclock_gt(&cc.vector_clock, &self.vector_clock)
+ && self
+ .items
+ .get(&item.sort_key)
+ .map(|vc| vclock_gt(&cc.vector_clock, &vc))
+ .unwrap_or(true)
+ }
+}
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs
new file mode 100644
index 00000000..b1daa271
--- /dev/null
+++ b/src/model/k2v/sub.rs
@@ -0,0 +1,110 @@
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollKey {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+}
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollRange {
+ pub partition: K2VItemPartition,
+ pub prefix: Option<String>,
+ pub start: Option<String>,
+ pub end: Option<String>,
+}
+
+#[derive(Default)]
+pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
+
+#[derive(Default)]
+pub struct SubscriptionManagerInner {
+ item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
+ part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
+}
+
+impl SubscriptionManager {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.item_subscriptions.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.item_subscriptions.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub(crate) fn subscribe_partition(
+ &self,
+ part: &K2VItemPartition,
+ ) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.part_subscriptions.get(part) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.part_subscriptions.insert(part.clone(), tx);
+ rx
+ }
+ }
+
+ pub(crate) fn notify(&self, item: &K2VItem) {
+ let mut inner = self.0.lock().unwrap();
+
+ // 1. Notify single item subscribers,
+ // removing subscriptions with no more listeners if any
+ let key = PollKey {
+ partition: item.partition.clone(),
+ sort_key: item.sort_key.clone(),
+ };
+ if let Some(s) = inner.item_subscriptions.get(&key) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.item_subscriptions.remove(&key);
+ }
+ }
+
+ // 2. Notify partition subscribers,
+ // removing subscriptions with no more listeners if any
+ if let Some(s) = inner.part_subscriptions.get(&item.partition) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.part_subscriptions.remove(&item.partition);
+ }
+ }
+ }
+}
+
+impl PollRange {
+ pub fn matches(&self, item: &K2VItem) -> bool {
+ item.partition == self.partition
+ && self
+ .prefix
+ .as_ref()
+ .map(|x| item.sort_key.starts_with(x))
+ .unwrap_or(true)
+ && self
+ .start
+ .as_ref()
+ .map(|x| item.sort_key >= *x)
+ .unwrap_or(true)
+ && self
+ .end
+ .as_ref()
+ .map(|x| item.sort_key < *x)
+ .unwrap_or(true)
+ }
+}