diff options
Diffstat (limited to 'src/model/k2v/sub.rs')
-rw-r--r-- | src/model/k2v/sub.rs | 31 |
1 files changed, 17 insertions, 14 deletions
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs index c4273dba..b1daa271 100644 --- a/src/model/k2v/sub.rs +++ b/src/model/k2v/sub.rs @@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>); #[derive(Default)] pub struct SubscriptionManagerInner { item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>, - range_subscriptions: HashMap<PollRange, broadcast::Sender<K2VItem>>, + part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>, } impl SubscriptionManager { @@ -34,7 +34,7 @@ impl SubscriptionManager { Self::default() } - pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { + pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { let mut inner = self.0.lock().unwrap(); if let Some(s) = inner.item_subscriptions.get(key) { s.subscribe() @@ -45,18 +45,21 @@ impl SubscriptionManager { } } - pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VItem> { + pub(crate) fn subscribe_partition( + &self, + part: &K2VItemPartition, + ) -> broadcast::Receiver<K2VItem> { let mut inner = self.0.lock().unwrap(); - if let Some(s) = inner.range_subscriptions.get(key) { + if let Some(s) = inner.part_subscriptions.get(part) { s.subscribe() } else { let (tx, rx) = broadcast::channel(8); - inner.range_subscriptions.insert(key.clone(), tx); + inner.part_subscriptions.insert(part.clone(), tx); rx } } - pub fn notify(&self, item: &K2VItem) { + pub(crate) fn notify(&self, item: &K2VItem) { let mut inner = self.0.lock().unwrap(); // 1. Notify single item subscribers, @@ -73,20 +76,20 @@ impl SubscriptionManager { } } - // 2. Notify range subscribers, + // 2. Notify partition subscribers, // removing subscriptions with no more listeners if any - inner.range_subscriptions.retain(|sub, chan| { - if sub.matches(&item) { - chan.send(item.clone()).is_ok() - } else { - chan.receiver_count() != 0 + if let Some(s) = inner.part_subscriptions.get(&item.partition) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.part_subscriptions.remove(&item.partition); } - }); + } } } impl PollRange { - fn matches(&self, item: &K2VItem) -> bool { + pub fn matches(&self, item: &K2VItem) -> bool { item.partition == self.partition && self .prefix |