aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v')
-rw-r--r--src/model/k2v/rpc.rs133
-rw-r--r--src/model/k2v/seen.rs13
-rw-r--r--src/model/k2v/sub.rs31
3 files changed, 159 insertions, 18 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index e5497215..8b070885 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -5,7 +5,7 @@
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
-use std::collections::HashMap;
+use std::collections::{BTreeMap, HashMap};
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -31,6 +31,7 @@ use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
+use crate::k2v::seen::*;
use crate::k2v::sub::*;
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
@@ -46,7 +47,13 @@ enum K2VRpc {
causal_context: CausalContext,
timeout_msec: u64,
},
+ PollRange {
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ },
PollItemResponse(Option<K2VItem>),
+ PollRangeResponse(Uuid, Vec<K2VItem>),
}
#[derive(Debug, Serialize, Deserialize)]
@@ -242,9 +249,7 @@ impl K2VRpcHandler {
resp = Some(x);
}
}
- K2VRpc::PollItemResponse(None) => {
- return Ok(None);
- }
+ K2VRpc::PollItemResponse(None) => (),
v => return Err(Error::unexpected_rpc_message(v)),
}
}
@@ -252,6 +257,69 @@ impl K2VRpcHandler {
Ok(resp)
}
+ pub async fn poll_range(
+ &self,
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
+ let mut seen = seen_str
+ .as_deref()
+ .map(RangeSeenMarker::decode)
+ .transpose()?
+ .unwrap_or_default();
+ seen.restrict(&range);
+
+ let nodes = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&range.partition.hash());
+
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
+
+ let mut new_items = BTreeMap::<String, K2VItem>::new();
+ for v in resps {
+ if let K2VRpc::PollRangeResponse(node, items) = v {
+ seen.mark_seen_node_items(node, items.iter());
+ for item in items.into_iter() {
+ match new_items.get_mut(&item.sort_key) {
+ Some(ent) => {
+ ent.merge(&item);
+ }
+ None => {
+ new_items.insert(item.sort_key.clone(), item);
+ }
+ }
+ }
+ } else {
+ return Err(Error::unexpected_rpc_message(v));
+ }
+ }
+
+ if new_items.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some((new_items, seen.encode()?)))
+ }
+ }
+
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
@@ -348,6 +416,52 @@ impl K2VRpcHandler {
Ok(value)
}
+
+ async fn handle_poll_range(
+ &self,
+ range: &PollRange,
+ seen_str: &Option<String>,
+ ) -> Result<Vec<K2VItem>, Error> {
+ let seen = seen_str
+ .as_deref()
+ .map(RangeSeenMarker::decode)
+ .transpose()?
+ .unwrap_or_default();
+ let mut new_items = vec![];
+
+ let mut chan = self.subscriptions.subscribe_partition(&range.partition);
+
+ // Read current state of the specified range to check new items
+ let partition_hash = range.partition.hash();
+ let first_key = match &range.start {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
+ };
+ for item in self.item_table.data.store.range(first_key..)? {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let item = self.item_table.data.decode_entry(&value)?;
+ if !range.matches(&item) {
+ break;
+ }
+ if seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ // If we found no new items, wait for a matching item to arrive
+ // on the channel
+ while new_items.is_empty() {
+ let item = chan.recv().await?;
+ if range.matches(&item) && seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ }
}
#[async_trait]
@@ -367,6 +481,17 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
+ K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ } => {
+ let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
+ select! {
+ ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
+ _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
+ }
+ }
m => Err(Error::unexpected_rpc_message(m)),
}
}
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
index b8f4ff27..d2cd54c7 100644
--- a/src/model/k2v/seen.rs
+++ b/src/model/k2v/seen.rs
@@ -17,6 +17,7 @@ use garage_util::error::{Error, OkOrMessage};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
+use crate::k2v::sub::*;
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct RangeSeenMarker {
@@ -29,6 +30,18 @@ impl RangeSeenMarker {
Self::default()
}
+ pub fn restrict(&mut self, range: &PollRange) {
+ if let Some(start) = &range.start {
+ self.items = self.items.split_off(start);
+ }
+ if let Some(end) = &range.end {
+ self.items.split_off(end);
+ }
+ if let Some(pfx) = &range.prefix {
+ self.items.retain(|k, _v| k.starts_with(pfx));
+ }
+ }
+
pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
&mut self,
node: Uuid,
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs
index c4273dba..b1daa271 100644
--- a/src/model/k2v/sub.rs
+++ b/src/model/k2v/sub.rs
@@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
#[derive(Default)]
pub struct SubscriptionManagerInner {
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
- range_subscriptions: HashMap<PollRange, broadcast::Sender<K2VItem>>,
+ part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
}
impl SubscriptionManager {
@@ -34,7 +34,7 @@ impl SubscriptionManager {
Self::default()
}
- pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
if let Some(s) = inner.item_subscriptions.get(key) {
s.subscribe()
@@ -45,18 +45,21 @@ impl SubscriptionManager {
}
}
- pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VItem> {
+ pub(crate) fn subscribe_partition(
+ &self,
+ part: &K2VItemPartition,
+ ) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
- if let Some(s) = inner.range_subscriptions.get(key) {
+ if let Some(s) = inner.part_subscriptions.get(part) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
- inner.range_subscriptions.insert(key.clone(), tx);
+ inner.part_subscriptions.insert(part.clone(), tx);
rx
}
}
- pub fn notify(&self, item: &K2VItem) {
+ pub(crate) fn notify(&self, item: &K2VItem) {
let mut inner = self.0.lock().unwrap();
// 1. Notify single item subscribers,
@@ -73,20 +76,20 @@ impl SubscriptionManager {
}
}
- // 2. Notify range subscribers,
+ // 2. Notify partition subscribers,
// removing subscriptions with no more listeners if any
- inner.range_subscriptions.retain(|sub, chan| {
- if sub.matches(&item) {
- chan.send(item.clone()).is_ok()
- } else {
- chan.receiver_count() != 0
+ if let Some(s) = inner.part_subscriptions.get(&item.partition) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.part_subscriptions.remove(&item.partition);
}
- });
+ }
}
}
impl PollRange {
- fn matches(&self, item: &K2VItem) -> bool {
+ pub fn matches(&self, item: &K2VItem) -> bool {
item.partition == self.partition
&& self
.prefix