aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/k2v/item.rs2
-rw-r--r--src/model/garage.rs2
-rw-r--r--src/model/k2v/item_table.rs2
-rw-r--r--src/model/k2v/mod.rs3
-rw-r--r--src/model/k2v/poll.rs50
-rw-r--r--src/model/k2v/rpc.rs10
-rw-r--r--src/model/k2v/sub.rs107
7 files changed, 117 insertions, 59 deletions
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index f85138c7..9b78bc07 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -211,7 +211,7 @@ pub async fn handle_poll_item(
let item = garage
.k2v
.rpc
- .poll(
+ .poll_item(
bucket_id,
partition_key,
sort_key,
diff --git a/src/model/garage.rs b/src/model/garage.rs
index ac1846ce..ce479465 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, rpc::*, sub::*};
/// An entire Garage full of data
pub struct Garage {
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index ce3e4129..a22df68a 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index f6a96151..83ad2512 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -2,5 +2,6 @@ pub mod causality;
pub mod item_table;
-pub mod poll;
pub mod rpc;
+
+pub(crate) mod sub;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
deleted file mode 100644
index 93105207..00000000
--- a/src/model/k2v/poll.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use std::collections::HashMap;
-use std::sync::Mutex;
-
-use serde::{Deserialize, Serialize};
-use tokio::sync::broadcast;
-
-use crate::k2v::item_table::*;
-
-#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct PollKey {
- pub partition: K2VItemPartition,
- pub sort_key: String,
-}
-
-#[derive(Default)]
-pub struct SubscriptionManager {
- subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
-}
-
-impl SubscriptionManager {
- pub fn new() -> Self {
- Self::default()
- }
-
- pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(key) {
- s.subscribe()
- } else {
- let (tx, rx) = broadcast::channel(8);
- subs.insert(key.clone(), tx);
- rx
- }
- }
-
- pub fn notify(&self, item: &K2VItem) {
- let key = PollKey {
- partition: item.partition.clone(),
- sort_key: item.sort_key.clone(),
- };
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(&key) {
- if s.send(item.clone()).is_err() {
- // no more subscribers, remove channel from here
- // (we will re-create it later if we need to subscribe again)
- subs.remove(&key);
- }
- }
- }
-}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..8860676b 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -27,7 +27,7 @@ use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@@ -181,7 +181,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -288,8 +288,8 @@ impl K2VRpcHandler {
})
}
- async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
- let mut chan = self.subscriptions.subscribe(key);
+ async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@@ -326,7 +326,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
- ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs
new file mode 100644
index 00000000..c4273dba
--- /dev/null
+++ b/src/model/k2v/sub.rs
@@ -0,0 +1,107 @@
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollKey {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+}
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollRange {
+ pub partition: K2VItemPartition,
+ pub prefix: Option<String>,
+ pub start: Option<String>,
+ pub end: Option<String>,
+}
+
+#[derive(Default)]
+pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
+
+#[derive(Default)]
+pub struct SubscriptionManagerInner {
+ item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
+ range_subscriptions: HashMap<PollRange, broadcast::Sender<K2VItem>>,
+}
+
+impl SubscriptionManager {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.item_subscriptions.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.item_subscriptions.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.range_subscriptions.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.range_subscriptions.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub fn notify(&self, item: &K2VItem) {
+ let mut inner = self.0.lock().unwrap();
+
+ // 1. Notify single item subscribers,
+ // removing subscriptions with no more listeners if any
+ let key = PollKey {
+ partition: item.partition.clone(),
+ sort_key: item.sort_key.clone(),
+ };
+ if let Some(s) = inner.item_subscriptions.get(&key) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.item_subscriptions.remove(&key);
+ }
+ }
+
+ // 2. Notify range subscribers,
+ // removing subscriptions with no more listeners if any
+ inner.range_subscriptions.retain(|sub, chan| {
+ if sub.matches(&item) {
+ chan.send(item.clone()).is_ok()
+ } else {
+ chan.receiver_count() != 0
+ }
+ });
+ }
+}
+
+impl PollRange {
+ fn matches(&self, item: &K2VItem) -> bool {
+ item.partition == self.partition
+ && self
+ .prefix
+ .as_ref()
+ .map(|x| item.sort_key.starts_with(x))
+ .unwrap_or(true)
+ && self
+ .start
+ .as_ref()
+ .map(|x| item.sort_key >= *x)
+ .unwrap_or(true)
+ && self
+ .end
+ .as_ref()
+ .map(|x| item.sort_key < *x)
+ .unwrap_or(true)
+ }
+}