aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/sub.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v/sub.rs')
-rw-r--r--src/model/k2v/sub.rs31
1 files changed, 17 insertions, 14 deletions
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs
index c4273dba..b1daa271 100644
--- a/src/model/k2v/sub.rs
+++ b/src/model/k2v/sub.rs
@@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
#[derive(Default)]
pub struct SubscriptionManagerInner {
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
- range_subscriptions: HashMap<PollRange, broadcast::Sender<K2VItem>>,
+ part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
}
impl SubscriptionManager {
@@ -34,7 +34,7 @@ impl SubscriptionManager {
Self::default()
}
- pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
if let Some(s) = inner.item_subscriptions.get(key) {
s.subscribe()
@@ -45,18 +45,21 @@ impl SubscriptionManager {
}
}
- pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VItem> {
+ pub(crate) fn subscribe_partition(
+ &self,
+ part: &K2VItemPartition,
+ ) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
- if let Some(s) = inner.range_subscriptions.get(key) {
+ if let Some(s) = inner.part_subscriptions.get(part) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
- inner.range_subscriptions.insert(key.clone(), tx);
+ inner.part_subscriptions.insert(part.clone(), tx);
rx
}
}
- pub fn notify(&self, item: &K2VItem) {
+ pub(crate) fn notify(&self, item: &K2VItem) {
let mut inner = self.0.lock().unwrap();
// 1. Notify single item subscribers,
@@ -73,20 +76,20 @@ impl SubscriptionManager {
}
}
- // 2. Notify range subscribers,
+ // 2. Notify partition subscribers,
// removing subscriptions with no more listeners if any
- inner.range_subscriptions.retain(|sub, chan| {
- if sub.matches(&item) {
- chan.send(item.clone()).is_ok()
- } else {
- chan.receiver_count() != 0
+ if let Some(s) = inner.part_subscriptions.get(&item.partition) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.part_subscriptions.remove(&item.partition);
}
- });
+ }
}
}
impl PollRange {
- fn matches(&self, item: &K2VItem) -> bool {
+ pub fn matches(&self, item: &K2VItem) -> bool {
item.partition == self.partition
&& self
.prefix