From a48e2e0cb2bdc75e14dfde199dbca0a779b1316b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 10:30:59 +0100 Subject: K2V: Subscription to ranges of items --- src/model/k2v/sub.rs | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 src/model/k2v/sub.rs (limited to 'src/model/k2v/sub.rs') diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs new file mode 100644 index 00000000..c4273dba --- /dev/null +++ b/src/model/k2v/sub.rs @@ -0,0 +1,107 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option, + pub start: Option, + pub end: Option, +} + +#[derive(Default)] +pub struct SubscriptionManager(Mutex); + +#[derive(Default)] +pub struct SubscriptionManagerInner { + item_subscriptions: HashMap>, + range_subscriptions: HashMap>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.item_subscriptions.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.item_subscriptions.insert(key.clone(), tx); + rx + } + } + + pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.range_subscriptions.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.range_subscriptions.insert(key.clone(), tx); + rx + } + } + + pub fn notify(&self, item: &K2VItem) { + let mut inner = self.0.lock().unwrap(); + + // 1. Notify single item subscribers, + // removing subscriptions with no more listeners if any + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + if let Some(s) = inner.item_subscriptions.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.item_subscriptions.remove(&key); + } + } + + // 2. Notify range subscribers, + // removing subscriptions with no more listeners if any + inner.range_subscriptions.retain(|sub, chan| { + if sub.matches(&item) { + chan.send(item.clone()).is_ok() + } else { + chan.receiver_count() != 0 + } + }); + } +} + +impl PollRange { + fn matches(&self, item: &K2VItem) -> bool { + item.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| item.sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| item.sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| item.sort_key < *x) + .unwrap_or(true) + } +} -- cgit v1.2.3 From 43fd6c1526339fb444e5e25021f42d0aee252b27 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 12:54:24 +0100 Subject: PollRange RPC --- src/model/k2v/sub.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) (limited to 'src/model/k2v/sub.rs') diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs index c4273dba..b1daa271 100644 --- a/src/model/k2v/sub.rs +++ b/src/model/k2v/sub.rs @@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex); #[derive(Default)] pub struct SubscriptionManagerInner { item_subscriptions: HashMap>, - range_subscriptions: HashMap>, + part_subscriptions: HashMap>, } impl SubscriptionManager { @@ -34,7 +34,7 @@ impl SubscriptionManager { Self::default() } - pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { let mut inner = self.0.lock().unwrap(); if let Some(s) = inner.item_subscriptions.get(key) { s.subscribe() @@ -45,18 +45,21 @@ impl SubscriptionManager { } } - pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + pub(crate) fn subscribe_partition( + &self, + part: &K2VItemPartition, + ) -> broadcast::Receiver { let mut inner = self.0.lock().unwrap(); - if let Some(s) = inner.range_subscriptions.get(key) { + if let Some(s) = inner.part_subscriptions.get(part) { s.subscribe() } else { let (tx, rx) = broadcast::channel(8); - inner.range_subscriptions.insert(key.clone(), tx); + inner.part_subscriptions.insert(part.clone(), tx); rx } } - pub fn notify(&self, item: &K2VItem) { + pub(crate) fn notify(&self, item: &K2VItem) { let mut inner = self.0.lock().unwrap(); // 1. Notify single item subscribers, @@ -73,20 +76,20 @@ impl SubscriptionManager { } } - // 2. Notify range subscribers, + // 2. Notify partition subscribers, // removing subscriptions with no more listeners if any - inner.range_subscriptions.retain(|sub, chan| { - if sub.matches(&item) { - chan.send(item.clone()).is_ok() - } else { - chan.receiver_count() != 0 + if let Some(s) = inner.part_subscriptions.get(&item.partition) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.part_subscriptions.remove(&item.partition); } - }); + } } } impl PollRange { - fn matches(&self, item: &K2VItem) -> bool { + pub fn matches(&self, item: &K2VItem) -> bool { item.partition == self.partition && self .prefix -- cgit v1.2.3