aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/model/k2v/rpc.rs77
-rw-r--r--src/rpc/rpc_helper.rs5
2 files changed, 61 insertions, 21 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f5d8ffc2..117103b6 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -8,7 +8,7 @@
use std::collections::{BTreeMap, HashMap};
use std::convert::TryInto;
use std::sync::{Arc, Mutex, MutexGuard};
-use std::time::Duration;
+use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
@@ -35,6 +35,8 @@ use crate::k2v::item_table::*;
use crate::k2v::seen::*;
use crate::k2v::sub::*;
+const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
+
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
/// RPC messages for K2V
@@ -271,6 +273,8 @@ impl K2VRpcHandler {
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
let has_seen_marker = seen_str.is_some();
+ // Parse seen marker, we will use it below. This is also the first check
+ // that it is valid, which returns a bad request error if not.
let mut seen = seen_str
.as_deref()
.map(RangeSeenMarker::decode_helper)
@@ -278,30 +282,67 @@ impl K2VRpcHandler {
.unwrap_or_default();
seen.restrict(&range);
+ // Prepare PollRange RPC to send to the storage nodes responsible for the parititon
let nodes = self
.item_table
.data
.replication
.write_nodes(&range.partition.hash());
-
- let rpc = self.system.rpc.try_call_many(
- &self.endpoint,
- &nodes[..],
- K2VRpc::PollRange {
- range,
- seen_str,
- timeout_msec,
- },
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.item_table.data.replication.read_quorum())
- .without_timeout(),
- );
- let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
- let resps = select! {
- r = rpc => r?,
- _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ let quorum = self.item_table.data.replication.read_quorum();
+ let msg = K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
};
+ // Send the request to all nodes, use FuturesUnordered to get the responses in any order
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
+ let mut requests = nodes
+ .iter()
+ .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .collect::<FuturesUnordered<_>>();
+
+ // Fetch responses. This procedure stops fetching responses when any of the following
+ // conditions arise:
+ // - we have a response to all requests
+ // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
+ // has passed since the quorum was achieved
+ // - a global RPC timeout expired
+ // The extra delay after a quorum was received is usefull if the third response was to
+ // arrive during this short interval: this would allow us to consider all the data seen
+ // by that last node in the response we produce, and would likely help reduce the
+ // size of the seen marker that we will return (because we would have an info of the
+ // kind: all items produced by that node until time ts have been returned, so we can
+ // bump the entry in the global vector clock and possibly remove some item-specific
+ // vector clocks)
+ let mut deadline =
+ Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut resps = vec![];
+ let mut errors = vec![];
+ loop {
+ select! {
+ _ = tokio::time::sleep_until(deadline.into()) => {
+ break;
+ }
+ res = requests.next() => match res {
+ None => break,
+ Some(Err(e)) => errors.push(e),
+ Some(Ok(r)) => {
+ resps.push(r);
+ if resps.len() >= quorum {
+ deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
+ }
+ }
+ }
+ }
+ }
+ if errors.len() > nodes.len() - quorum {
+ let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
+ return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ }
+
+ // Take all returned items into account to produce the response.
let mut new_items = BTreeMap::<String, K2VItem>::new();
for v in resps {
if let K2VRpc::PollRangeResponse(node, items) = v {
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 1ec250c3..e59c372a 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,10 +15,9 @@ use opentelemetry::{
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
-use netapp::message::IntoReq;
pub use netapp::message::{
- Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
- PRIO_SECONDARY,
+ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
+ PRIO_NORMAL, PRIO_SECONDARY,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};