diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-11 16:12:07 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-11 16:12:07 +0100 |
commit | 638c5a3ce006eba9d90a6358655ba2091423efd7 (patch) | |
tree | abdb44553c6ea93907521db3fac9c8c788c9778f /src/model/k2v/rpc.rs | |
parent | 399f137fd079be2c59b032e67d9ccd8d9214407e (diff) | |
download | garage-638c5a3ce006eba9d90a6358655ba2091423efd7.tar.gz garage-638c5a3ce006eba9d90a6358655ba2091423efd7.zip |
PollRange: add extra RPC delay after quorum is achieved,
to give a chance to the 3rd node to respond
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r-- | src/model/k2v/rpc.rs | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f5d8ffc2..117103b6 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -8,7 +8,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; use std::sync::{Arc, Mutex, MutexGuard}; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -35,6 +35,8 @@ use crate::k2v::item_table::*; use crate::k2v::seen::*; use crate::k2v::sub::*; +const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200); + const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; /// RPC messages for K2V @@ -271,6 +273,8 @@ impl K2VRpcHandler { ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> { let has_seen_marker = seen_str.is_some(); + // Parse seen marker, we will use it below. This is also the first check + // that it is valid, which returns a bad request error if not. let mut seen = seen_str .as_deref() .map(RangeSeenMarker::decode_helper) @@ -278,30 +282,67 @@ impl K2VRpcHandler { .unwrap_or_default(); seen.restrict(&range); + // Prepare PollRange RPC to send to the storage nodes responsible for the parititon let nodes = self .item_table .data .replication .write_nodes(&range.partition.hash()); - - let rpc = self.system.rpc.try_call_many( - &self.endpoint, - &nodes[..], - K2VRpc::PollRange { - range, - seen_str, - timeout_msec, - }, - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.item_table.data.replication.read_quorum()) - .without_timeout(), - ); - let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); - let resps = select! { - r = rpc => r?, - _ = tokio::time::sleep(timeout_duration) => return Ok(None), + let quorum = self.item_table.data.replication.read_quorum(); + let msg = K2VRpc::PollRange { + range, + seen_str, + timeout_msec, }; + // Send the request to all nodes, use FuturesUnordered to get the responses in any order + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); + let mut requests = nodes + .iter() + .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .collect::<FuturesUnordered<_>>(); + + // Fetch responses. This procedure stops fetching responses when any of the following + // conditions arise: + // - we have a response to all requests + // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay + // has passed since the quorum was achieved + // - a global RPC timeout expired + // The extra delay after a quorum was received is usefull if the third response was to + // arrive during this short interval: this would allow us to consider all the data seen + // by that last node in the response we produce, and would likely help reduce the + // size of the seen marker that we will return (because we would have an info of the + // kind: all items produced by that node until time ts have been returned, so we can + // bump the entry in the global vector clock and possibly remove some item-specific + // vector clocks) + let mut deadline = + Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut resps = vec![]; + let mut errors = vec![]; + loop { + select! { + _ = tokio::time::sleep_until(deadline.into()) => { + break; + } + res = requests.next() => match res { + None => break, + Some(Err(e)) => errors.push(e), + Some(Ok(r)) => { + resps.push(r); + if resps.len() >= quorum { + deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY); + } + } + } + } + } + if errors.len() > nodes.len() - quorum { + let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); + return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into()); + } + + // Take all returned items into account to produce the response. let mut new_items = BTreeMap::<String, K2VItem>::new(); for v in resps { if let K2VRpc::PollRangeResponse(node, items) = v { |