aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/model/k2v/rpc.rs57
1 files changed, 39 insertions, 18 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 04ab3ab9..04801ebf 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -268,6 +268,8 @@ impl K2VRpcHandler {
seen_str: Option<String>,
timeout_msec: u64,
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
+ let has_seen_marker = seen_str.is_some();
+
let mut seen = seen_str
.as_deref()
.map(RangeSeenMarker::decode)
@@ -318,7 +320,7 @@ impl K2VRpcHandler {
}
}
- if new_items.is_empty() {
+ if new_items.is_empty() && has_seen_marker {
Ok(None)
} else {
Ok(Some((new_items, seen.encode()?)))
@@ -432,16 +434,44 @@ impl K2VRpcHandler {
range: &PollRange,
seen_str: &Option<String>,
) -> Result<Vec<K2VItem>, Error> {
- let seen = seen_str
- .as_deref()
- .map(RangeSeenMarker::decode)
- .transpose()?
- .unwrap_or_default();
- let mut new_items = vec![];
+ if let Some(seen_str) = seen_str {
+ let seen = RangeSeenMarker::decode(seen_str)?;
+
+ // Subscribe now to all changes on that partition,
+ // so that new items that are inserted while we are reading the range
+ // will be seen in the loop below
+ let mut chan = self.subscriptions.subscribe_partition(&range.partition);
+
+ // Check for the presence of any new items already stored in the item table
+ let mut new_items = self.poll_range_read_range(range, &seen)?;
+
+ // If we found no new items, wait for a matching item to arrive
+ // on the channel
+ while new_items.is_empty() {
+ let item = chan.recv().await?;
+ if range.matches(&item) && seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ } else {
+ // If no seen marker was specified, we do not poll for anything.
+ // We return immediately with the set of known items (even if
+ // it is empty), which will give the client an inital view of
+ // the dataset and an initial seen marker for further
+ // PollRange calls.
+ self.poll_range_read_range(range, &RangeSeenMarker::default())
+ }
+ }
- let mut chan = self.subscriptions.subscribe_partition(&range.partition);
+ fn poll_range_read_range(
+ &self,
+ range: &PollRange,
+ seen: &RangeSeenMarker,
+ ) -> Result<Vec<K2VItem>, Error> {
+ let mut new_items = vec![];
- // Read current state of the specified range to check new items
let partition_hash = range.partition.hash();
let first_key = match &range.start {
None => partition_hash.to_vec(),
@@ -461,15 +491,6 @@ impl K2VRpcHandler {
}
}
- // If we found no new items, wait for a matching item to arrive
- // on the channel
- while new_items.is_empty() {
- let item = chan.recv().await?;
- if range.matches(&item) && seen.is_new_item(&item) {
- new_items.push(item);
- }
- }
-
Ok(new_items)
}
}