diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/garage.rs | 8 | ||||
-rw-r--r-- | src/model/k2v/causality.rs | 62 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 11 | ||||
-rw-r--r-- | src/model/k2v/mod.rs | 4 | ||||
-rw-r--r-- | src/model/k2v/poll.rs | 50 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 275 | ||||
-rw-r--r-- | src/model/k2v/seen.rs | 105 | ||||
-rw-r--r-- | src/model/k2v/sub.rs | 110 |
8 files changed, 531 insertions, 94 deletions
diff --git a/src/model/garage.rs b/src/model/garage.rs index ffa54dc5..4716954a 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{item_table::*, poll::*, rpc::*}; +use crate::k2v::{item_table::*, rpc::*, sub::*}; /// An entire Garage full of data pub struct Garage { @@ -305,8 +305,10 @@ impl GarageK2V { fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let item_table = Table::new( K2VItemTable { @@ -317,7 +319,9 @@ impl GarageK2V { system.clone(), db, ); - let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + info!("Initialize K2V RPC handler..."); + let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions); Self { item_table, diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 62488d53..c80ebd39 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -1,3 +1,12 @@ +//! Implements a CausalContext, which is a set of timestamps for each +//! node -- a vector clock --, indicating that the versions with +//! timestamps <= these numbers have been seen and can be +//! overwritten by a subsequent write. +//! +//! The textual representation of a CausalContext, which we call a +//! "causality token", is used in the API and must be sent along with +//! each write or delete operation to indicate the previously seen +//! versions that we want to overwrite or delete. use base64::prelude::*; use std::collections::BTreeMap; @@ -7,28 +16,44 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; + /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; +pub type VectorClock = BTreeMap<K2VNodeId, u64>; + pub fn make_node_id(node_id: Uuid) -> K2VNodeId { let mut tmp = [0u8; 8]; tmp.copy_from_slice(&node_id.as_slice()[..8]); u64::from_be_bytes(tmp) } -#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool { + a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0)) +} + +pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock { + let mut ret = a.clone(); + for (n, ts) in b.iter() { + let ent = ret.entry(*n).or_insert(0); + *ent = std::cmp::max(*ts, *ent); + } + ret +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)] pub struct CausalContext { - pub vector_clock: BTreeMap<K2VNodeId, u64>, + pub vector_clock: VectorClock, } impl CausalContext { /// Empty causality context - pub fn new_empty() -> Self { - Self { - vector_clock: BTreeMap::new(), - } + pub fn new() -> Self { + Self::default() } + /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); @@ -45,13 +70,13 @@ impl CausalContext { BASE64_URL_SAFE_NO_PAD.encode(bytes) } - /// Parse from base64-encoded binary representation - pub fn parse(s: &str) -> Result<Self, String> { - let bytes = BASE64_URL_SAFE_NO_PAD - .decode(s) - .map_err(|e| format!("bad causality token base64: {}", e))?; + + /// Parse from base64-encoded binary representation. + /// Returns None on error. + pub fn parse(s: &str) -> Option<Self> { + let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?; if bytes.len() % 16 != 8 || bytes.len() < 8 { - return Err("bad causality token length".into()); + return None; } let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); @@ -68,16 +93,19 @@ impl CausalContext { let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); if check != checksum { - return Err("bad causality token checksum".into()); + return None; } - Ok(ret) + Some(ret) } + + pub fn parse_helper(s: &str) -> Result<Self, HelperError> { + Self::parse(s).ok_or_bad_request("Invalid causality token") + } + /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { - self.vector_clock - .iter() - .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + vclock_gt(&self.vector_clock, &other.vector_clock) } } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 9955a9cd..28646f37 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; @@ -73,7 +73,8 @@ impl K2VItem { this_node: Uuid, context: &Option<CausalContext>, new_value: DvvsValue, - ) { + node_ts: u64, + ) -> u64 { if let Some(context) = context { for (node, t_discard) in context.vector_clock.iter() { if let Some(e) = self.items.get_mut(node) { @@ -98,12 +99,14 @@ impl K2VItem { values: vec![], }); let t_prev = e.max_time(); - e.values.push((t_prev + 1, new_value)); + let t_new = std::cmp::max(t_prev + 1, node_ts + 1); + e.values.push((t_new, new_value)); + t_new } /// Extract the causality context of a K2V Item pub fn causal_context(&self) -> CausalContext { - let mut cc = CausalContext::new_empty(); + let mut cc = CausalContext::new(); for (node, ent) in self.items.iter() { cc.vector_clock.insert(*node, ent.max_time()); } diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index f6a96151..acc1fcdc 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,6 +1,8 @@ pub mod causality; +pub mod seen; pub mod item_table; -pub mod poll; pub mod rpc; + +pub mod sub; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs deleted file mode 100644 index 93105207..00000000 --- a/src/model/k2v/poll.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::collections::HashMap; -use std::sync::Mutex; - -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; - -use crate::k2v::item_table::*; - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollKey { - pub partition: K2VItemPartition, - pub sort_key: String, -} - -#[derive(Default)] -pub struct SubscriptionManager { - subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, -} - -impl SubscriptionManager { - pub fn new() -> Self { - Self::default() - } - - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify(&self, item: &K2VItem) { - let key = PollKey { - partition: item.partition.clone(), - sort_key: item.sort_key.clone(), - }; - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(&key) { - if s.send(item.clone()).is_err() { - // no more subscribers, remove channel from here - // (we will re-create it later if we need to subscribe again) - subs.remove(&key); - } - } - } -} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..117103b6 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -5,9 +5,10 @@ //! node does not process the entry directly, as this would //! mean the vector clock gets much larger than needed). -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryInto; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::now_msec; use garage_rpc::system::System; use garage_rpc::*; @@ -25,9 +29,15 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; +use crate::helper::error::Error as HelperError; use crate::k2v::causality::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::seen::*; +use crate::k2v::sub::*; + +const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200); + +const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -40,7 +50,13 @@ enum K2VRpc { causal_context: CausalContext, timeout_msec: u64, }, + PollRange { + range: PollRange, + seen_str: Option<String>, + timeout_msec: u64, + }, PollItemResponse(Option<K2VItem>), + PollRangeResponse(Uuid, Vec<K2VItem>), } #[derive(Debug, Serialize, Deserialize)] @@ -59,6 +75,12 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. + local_timestamp_tree: Mutex<db::Tree>, + endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -66,14 +88,19 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc<System>, + db: &db::Db, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, subscriptions: Arc<SubscriptionManager>, ) -> Arc<Self> { + let local_timestamp_tree = db + .open_tree("k2v_local_timestamp") + .expect("Unable to open DB tree for k2v local timestamp"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + local_timestamp_tree: Mutex::new(local_timestamp_tree), endpoint, subscriptions, }); @@ -181,7 +208,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -230,9 +257,7 @@ impl K2VRpcHandler { resp = Some(x); } } - K2VRpc::PollItemResponse(None) => { - return Ok(None); - } + K2VRpc::PollItemResponse(None) => (), v => return Err(Error::unexpected_rpc_message(v)), } } @@ -240,10 +265,117 @@ impl K2VRpcHandler { Ok(resp) } + pub async fn poll_range( + &self, + range: PollRange, + seen_str: Option<String>, + timeout_msec: u64, + ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> { + let has_seen_marker = seen_str.is_some(); + + // Parse seen marker, we will use it below. This is also the first check + // that it is valid, which returns a bad request error if not. + let mut seen = seen_str + .as_deref() + .map(RangeSeenMarker::decode_helper) + .transpose()? + .unwrap_or_default(); + seen.restrict(&range); + + // Prepare PollRange RPC to send to the storage nodes responsible for the parititon + let nodes = self + .item_table + .data + .replication + .write_nodes(&range.partition.hash()); + let quorum = self.item_table.data.replication.read_quorum(); + let msg = K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + }; + + // Send the request to all nodes, use FuturesUnordered to get the responses in any order + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); + let mut requests = nodes + .iter() + .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .collect::<FuturesUnordered<_>>(); + + // Fetch responses. This procedure stops fetching responses when any of the following + // conditions arise: + // - we have a response to all requests + // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay + // has passed since the quorum was achieved + // - a global RPC timeout expired + // The extra delay after a quorum was received is usefull if the third response was to + // arrive during this short interval: this would allow us to consider all the data seen + // by that last node in the response we produce, and would likely help reduce the + // size of the seen marker that we will return (because we would have an info of the + // kind: all items produced by that node until time ts have been returned, so we can + // bump the entry in the global vector clock and possibly remove some item-specific + // vector clocks) + let mut deadline = + Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut resps = vec![]; + let mut errors = vec![]; + loop { + select! { + _ = tokio::time::sleep_until(deadline.into()) => { + break; + } + res = requests.next() => match res { + None => break, + Some(Err(e)) => errors.push(e), + Some(Ok(r)) => { + resps.push(r); + if resps.len() >= quorum { + deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY); + } + } + } + } + } + if errors.len() > nodes.len() - quorum { + let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); + return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into()); + } + + // Take all returned items into account to produce the response. + let mut new_items = BTreeMap::<String, K2VItem>::new(); + for v in resps { + if let K2VRpc::PollRangeResponse(node, items) = v { + seen.mark_seen_node_items(node, items.iter()); + for item in items.into_iter() { + match new_items.get_mut(&item.sort_key) { + Some(ent) => { + ent.merge(&item); + } + None => { + new_items.insert(item.sort_key.clone(), item); + } + } + } + } else { + return Err(Error::unexpected_rpc_message(v).into()); + } + } + + if new_items.is_empty() && has_seen_marker { + Ok(None) + } else { + Ok(Some((new_items, seen.encode()?))) + } + } + // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> { - let new = self.local_insert(item)?; + let new = { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + self.local_insert(&local_timestamp_tree, item)? + }; // Propagate to rest of network if let Some(updated) = new { @@ -256,11 +388,14 @@ impl K2VRpcHandler { async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> { let mut updated_vec = vec![]; - for item in items { - let new = self.local_insert(item)?; + { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + for item in items { + let new = self.local_insert(&local_timestamp_tree, item)?; - if let Some(updated) = new { - updated_vec.push(updated); + if let Some(updated) = new { + updated_vec.push(updated); + } } } @@ -272,10 +407,22 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } - fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + fn local_insert( + &self, + local_timestamp_tree: &MutexGuard<'_, db::Tree>, + item: &InsertedItem, + ) -> Result<Option<K2VItem>, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_timestamp = tx + .get(&local_timestamp_tree, TIMESTAMP_KEY)? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +430,25 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_timestamp = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + std::cmp::max(old_local_timestamp, now), + ); + + tx.insert( + &local_timestamp_tree, + TIMESTAMP_KEY, + u64::to_be_bytes(new_local_timestamp), + )?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -311,6 +470,71 @@ impl K2VRpcHandler { Ok(value) } + + async fn handle_poll_range( + &self, + range: &PollRange, + seen_str: &Option<String>, + ) -> Result<Vec<K2VItem>, Error> { + if let Some(seen_str) = seen_str { + let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?; + + // Subscribe now to all changes on that partition, + // so that new items that are inserted while we are reading the range + // will be seen in the loop below + let mut chan = self.subscriptions.subscribe_partition(&range.partition); + + // Check for the presence of any new items already stored in the item table + let mut new_items = self.poll_range_read_range(range, &seen)?; + + // If we found no new items, wait for a matching item to arrive + // on the channel + while new_items.is_empty() { + let item = chan.recv().await?; + if range.matches(&item) && seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } else { + // If no seen marker was specified, we do not poll for anything. + // We return immediately with the set of known items (even if + // it is empty), which will give the client an inital view of + // the dataset and an initial seen marker for further + // PollRange calls. + self.poll_range_read_range(range, &RangeSeenMarker::default()) + } + } + + fn poll_range_read_range( + &self, + range: &PollRange, + seen: &RangeSeenMarker, + ) -> Result<Vec<K2VItem>, Error> { + let mut new_items = vec![]; + + let partition_hash = range.partition.hash(); + let first_key = match &range.start { + None => partition_hash.to_vec(), + Some(sk) => self.item_table.data.tree_key(&range.partition, sk), + }; + for item in self.item_table.data.store.range(first_key..)? { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let item = self.item_table.data.decode_entry(&value)?; + if !range.matches(&item) { + break; + } + if seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } } #[async_trait] @@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } + K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)), + _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])), + } + } m => Err(Error::unexpected_rpc_message(m)), } } diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs new file mode 100644 index 00000000..51098710 --- /dev/null +++ b/src/model/k2v/seen.rs @@ -0,0 +1,105 @@ +//! Implements a RangeSeenMarker, a data type used in the PollRange API +//! to indicate which items in the range have already been seen +//! and which have not been seen yet. +//! +//! It consists of a vector clock that indicates that for each node, +//! all items produced by that node with timestamps <= the value in the +//! vector clock has been seen, as well as a set of causal contexts for +//! individual items. + +use std::collections::BTreeMap; + +use base64::prelude::*; +use serde::{Deserialize, Serialize}; + +use garage_util::data::Uuid; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; +use garage_util::error::Error; + +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; +use crate::k2v::causality::*; +use crate::k2v::item_table::*; +use crate::k2v::sub::*; + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct RangeSeenMarker { + vector_clock: VectorClock, + items: BTreeMap<String, VectorClock>, +} + +impl RangeSeenMarker { + pub fn new() -> Self { + Self::default() + } + + pub fn restrict(&mut self, range: &PollRange) { + if let Some(start) = &range.start { + self.items = self.items.split_off(start); + } + if let Some(end) = &range.end { + self.items.split_off(end); + } + if let Some(pfx) = &range.prefix { + self.items.retain(|k, _v| k.starts_with(pfx)); + } + } + + pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>( + &mut self, + node: Uuid, + items: I, + ) { + let node = make_node_id(node); + for item in items.into_iter() { + let cc = item.causal_context(); + + if let Some(ts) = cc.vector_clock.get(&node) { + let ent = self.vector_clock.entry(node).or_insert(0); + *ent = std::cmp::max(*ent, *ts); + } + + if vclock_gt(&cc.vector_clock, &self.vector_clock) { + match self.items.get_mut(&item.sort_key) { + None => { + self.items.insert(item.sort_key.clone(), cc.vector_clock); + } + Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock), + } + } + } + } + + pub fn canonicalize(&mut self) { + let self_vc = &self.vector_clock; + self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc)) + } + + pub fn encode(&mut self) -> Result<String, Error> { + self.canonicalize(); + + let bytes = nonversioned_encode(&self)?; + let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; + Ok(BASE64_STANDARD.encode(&bytes)) + } + + /// Decode from msgpack+zstd+b64 representation, returns None on error. + pub fn decode(s: &str) -> Option<Self> { + let bytes = BASE64_STANDARD.decode(&s).ok()?; + let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?; + nonversioned_decode(&bytes).ok() + } + + pub fn decode_helper(s: &str) -> Result<Self, HelperError> { + Self::decode(s).ok_or_bad_request("Invalid causality token") + } + + pub fn is_new_item(&self, item: &K2VItem) -> bool { + let cc = item.causal_context(); + vclock_gt(&cc.vector_clock, &self.vector_clock) + && self + .items + .get(&item.sort_key) + .map(|vc| vclock_gt(&cc.vector_clock, &vc)) + .unwrap_or(true) + } +} diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs new file mode 100644 index 00000000..b1daa271 --- /dev/null +++ b/src/model/k2v/sub.rs @@ -0,0 +1,110 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option<String>, + pub start: Option<String>, + pub end: Option<String>, +} + +#[derive(Default)] +pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>); + +#[derive(Default)] +pub struct SubscriptionManagerInner { + item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>, + part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.item_subscriptions.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.item_subscriptions.insert(key.clone(), tx); + rx + } + } + + pub(crate) fn subscribe_partition( + &self, + part: &K2VItemPartition, + ) -> broadcast::Receiver<K2VItem> { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.part_subscriptions.get(part) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.part_subscriptions.insert(part.clone(), tx); + rx + } + } + + pub(crate) fn notify(&self, item: &K2VItem) { + let mut inner = self.0.lock().unwrap(); + + // 1. Notify single item subscribers, + // removing subscriptions with no more listeners if any + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + if let Some(s) = inner.item_subscriptions.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.item_subscriptions.remove(&key); + } + } + + // 2. Notify partition subscribers, + // removing subscriptions with no more listeners if any + if let Some(s) = inner.part_subscriptions.get(&item.partition) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.part_subscriptions.remove(&item.partition); + } + } + } +} + +impl PollRange { + pub fn matches(&self, item: &K2VItem) -> bool { + item.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| item.sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| item.sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| item.sort_key < *x) + .unwrap_or(true) + } +} |