diff options
author | Alex <alex@adnab.me> | 2023-01-26 16:19:04 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-26 16:19:04 +0000 |
commit | 246f7468cd18c8ef4f3c0c4c209853cd2500cc76 (patch) | |
tree | 6e5f9ddd2159ba5396cc441a82b8240c4306323f /src/model/k2v/poll.rs | |
parent | 611792ddcf86f0a728e22abaa6e172d3679d5ca6 (diff) | |
parent | 1dff62564fdda392a97986dca55232f30a1f4234 (diff) | |
download | garage-246f7468cd18c8ef4f3c0c4c209853cd2500cc76.tar.gz garage-246f7468cd18c8ef4f3c0c4c209853cd2500cc76.zip |
Merge pull request 'K2V PollRange, version 2' (#471) from k2v-watch-range-2 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/471
Diffstat (limited to 'src/model/k2v/poll.rs')
-rw-r--r-- | src/model/k2v/poll.rs | 50 |
1 files changed, 0 insertions, 50 deletions
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs deleted file mode 100644 index 93105207..00000000 --- a/src/model/k2v/poll.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::collections::HashMap; -use std::sync::Mutex; - -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; - -use crate::k2v::item_table::*; - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollKey { - pub partition: K2VItemPartition, - pub sort_key: String, -} - -#[derive(Default)] -pub struct SubscriptionManager { - subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, -} - -impl SubscriptionManager { - pub fn new() -> Self { - Self::default() - } - - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify(&self, item: &K2VItem) { - let key = PollKey { - partition: item.partition.clone(), - sort_key: item.sort_key.clone(), - }; - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(&key) { - if s.send(item.clone()).is_err() { - // no more subscribers, remove channel from here - // (we will re-create it later if we need to subscribe again) - subs.remove(&key); - } - } - } -} |