diff options
Diffstat (limited to 'src/model/k2v')
-rw-r--r-- | src/model/k2v/rpc.rs | 57 |
1 files changed, 39 insertions, 18 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 04ab3ab9..04801ebf 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -268,6 +268,8 @@ impl K2VRpcHandler { seen_str: Option<String>, timeout_msec: u64, ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> { + let has_seen_marker = seen_str.is_some(); + let mut seen = seen_str .as_deref() .map(RangeSeenMarker::decode) @@ -318,7 +320,7 @@ impl K2VRpcHandler { } } - if new_items.is_empty() { + if new_items.is_empty() && has_seen_marker { Ok(None) } else { Ok(Some((new_items, seen.encode()?))) @@ -432,16 +434,44 @@ impl K2VRpcHandler { range: &PollRange, seen_str: &Option<String>, ) -> Result<Vec<K2VItem>, Error> { - let seen = seen_str - .as_deref() - .map(RangeSeenMarker::decode) - .transpose()? - .unwrap_or_default(); - let mut new_items = vec![]; + if let Some(seen_str) = seen_str { + let seen = RangeSeenMarker::decode(seen_str)?; + + // Subscribe now to all changes on that partition, + // so that new items that are inserted while we are reading the range + // will be seen in the loop below + let mut chan = self.subscriptions.subscribe_partition(&range.partition); + + // Check for the presence of any new items already stored in the item table + let mut new_items = self.poll_range_read_range(range, &seen)?; + + // If we found no new items, wait for a matching item to arrive + // on the channel + while new_items.is_empty() { + let item = chan.recv().await?; + if range.matches(&item) && seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } else { + // If no seen marker was specified, we do not poll for anything. + // We return immediately with the set of known items (even if + // it is empty), which will give the client an inital view of + // the dataset and an initial seen marker for further + // PollRange calls. + self.poll_range_read_range(range, &RangeSeenMarker::default()) + } + } - let mut chan = self.subscriptions.subscribe_partition(&range.partition); + fn poll_range_read_range( + &self, + range: &PollRange, + seen: &RangeSeenMarker, + ) -> Result<Vec<K2VItem>, Error> { + let mut new_items = vec![]; - // Read current state of the specified range to check new items let partition_hash = range.partition.hash(); let first_key = match &range.start { None => partition_hash.to_vec(), @@ -461,15 +491,6 @@ impl K2VRpcHandler { } } - // If we found no new items, wait for a matching item to arrive - // on the channel - while new_items.is_empty() { - let item = chan.recv().await?; - if range.matches(&item) && seen.is_new_item(&item) { - new_items.push(item); - } - } - Ok(new_items) } } |