diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 278 |
1 files changed, 170 insertions, 108 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ce291068..12d073b6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,4 +1,5 @@ //! Contain structs related to making RPCs +use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -35,11 +36,11 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); #[derive(Copy, Clone)] pub struct RequestStrategy { /// Min number of response to consider the request successful - pub rs_quorum: Option<usize>, - /// Should requests be dropped after enough response are received - pub rs_interrupt_after_quorum: bool, + rs_quorum: Option<usize>, + /// Send all requests at once + rs_send_all_at_once: Option<bool>, /// Request priority - pub rs_priority: RequestPriority, + rs_priority: RequestPriority, /// Custom timeout for this request rs_timeout: Timeout, } @@ -56,7 +57,7 @@ impl RequestStrategy { pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { rs_quorum: None, - rs_interrupt_after_quorum: false, + rs_send_all_at_once: None, rs_priority: prio, rs_timeout: Timeout::Default, } @@ -66,10 +67,9 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set if requests can be dropped after quorum has been reached - /// In general true for read requests, and false for write - pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { - self.rs_interrupt_after_quorum = interrupt; + /// Set quorum to be reached for request + pub fn send_all_at_once(mut self, value: bool) -> Self { + self.rs_send_all_at_once = Some(value); self } /// Deactivate timeout for this request @@ -235,31 +235,19 @@ impl RpcHelper { let quorum = strategy.rs_quorum.unwrap_or(to.len()); let tracer = opentelemetry::global::tracer("garage"); - let span_name = if strategy.rs_interrupt_after_quorum { - format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len()) - } else { - format!( - "RPC {} to {} (quorum {})", - endpoint.path(), - to.len(), - quorum - ) - }; + let span_name = format!("Read RPC {} to {} of {}", endpoint.path(), quorum, to.len()); + let mut span = tracer.start(span_name); span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id))); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); - span.set_attribute(KeyValue::new( - "interrupt_after_quorum", - strategy.rs_interrupt_after_quorum.to_string(), - )); - self.try_call_many_internal(endpoint, to, msg, strategy, quorum) + self.try_call_many_inner(endpoint, to, msg, strategy, quorum) .with_context(Context::current_with_span(span)) .await } - async fn try_call_many_internal<M, N, H, S>( + async fn try_call_many_inner<M, N, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], @@ -273,12 +261,20 @@ impl RpcHelper { H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, { - let msg = msg.into_req().map_err(netapp::error::Error::from)?; + // Once quorum is reached, other requests don't matter. + // What we do here is only send the required number of requests + // to reach a quorum, priorizing nodes with the lowest latency. + // When there are errors, we start new requests to compensate. + + // Reorder requests to priorize closeness / low latency + let request_order = self.request_order(to); + let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) - let requests = to.iter().cloned().map(|to| { + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let mut requests = request_order.into_iter().map(|to| { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); @@ -291,93 +287,40 @@ impl RpcHelper { let mut successes = vec![]; let mut errors = vec![]; - if strategy.rs_interrupt_after_quorum { - // Case 1: once quorum is reached, other requests don't matter. - // What we do here is only send the required number of requests - // to reach a quorum, priorizing nodes with the lowest latency. - // When there are errors, we start new requests to compensate. - - // Reorder requests to priorize closeness / low latency - let request_order = self.request_order(to); - let mut ord_requests = vec![(); request_order.len()] - .into_iter() - .map(|_| None) - .collect::<Vec<_>>(); - for (to, fut) in requests { - let i = request_order.iter().position(|x| *x == to).unwrap(); - ord_requests[i] = Some((to, fut)); + // resp_stream will contain all of the requests that are currently in flight. + // (for the moment none, they will be added in the loop below) + let mut resp_stream = FuturesUnordered::new(); + + // Do some requests and collect results + while successes.len() < quorum { + // If the current set of requests that are running is not enough to possibly + // reach quorum, start some new requests. + while send_all_at_once || successes.len() + resp_stream.len() < quorum { + if let Some((req_to, fut)) = requests.next() { + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", req_to)); + resp_stream.push(tokio::spawn( + fut.with_context(Context::current_with_span(span)), + )); + } else { + break; + } } - // Make an iterator to take requests in their sorted order - let mut requests = ord_requests.into_iter().map(Option::unwrap); - - // resp_stream will contain all of the requests that are currently in flight. - // (for the moment none, they will be added in the loop below) - let mut resp_stream = FuturesUnordered::new(); - - // Do some requests and collect results - 'request_loop: while successes.len() < quorum { - // If the current set of requests that are running is not enough to possibly - // reach quorum, start some new requests. - while successes.len() + resp_stream.len() < quorum { - if let Some((req_to, fut)) = requests.next() { - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer.start(format!("RPC to {:?}", req_to)); - resp_stream.push(tokio::spawn( - fut.with_context(Context::current_with_span(span)), - )); - } else { - // If we have no request to add, we know that we won't ever - // reach quorum: bail out now. - break 'request_loop; - } - } - assert!(!resp_stream.is_empty()); // because of loop invariants + if successes.len() + resp_stream.len() < quorum { + // We know we won't ever reach quorum + break; + } - // Wait for one request to terminate - match resp_stream.next().await.unwrap().unwrap() { - Ok(msg) => { - successes.push(msg); - } - Err(e) => { - errors.push(e); - } + // Wait for one request to terminate + match resp_stream.next().await.unwrap().unwrap() { + Ok(msg) => { + successes.push(msg); } - } - } else { - // Case 2: all of the requests need to be sent in all cases, - // and need to terminate. (this is the case for writes that - // must be spread to n nodes) - // Just start all the requests in parallel and return as soon - // as the quorum is reached. - let mut resp_stream = requests - .map(|(_, fut)| fut) - .collect::<FuturesUnordered<_>>(); - - while let Some(resp) = resp_stream.next().await { - match resp { - Ok(msg) => { - successes.push(msg); - if successes.len() >= quorum { - break; - } - } - Err(e) => { - errors.push(e); - } + Err(e) => { + errors.push(e); } } - - if !resp_stream.is_empty() { - // Continue remaining requests in background. - // Note: these requests can get interrupted on process shutdown, - // we must not count on them being executed for certain. - // For all background things that have to happen with certainty, - // they have to be put in a proper queue that is persisted to disk. - tokio::spawn(async move { - resp_stream.collect::<Vec<Result<_, _>>>().await; - }); - } } if successes.len() >= quorum { @@ -432,4 +375,123 @@ impl RpcHelper { .map(|(_, _, _, to)| to) .collect::<Vec<_>>() } + + pub async fn try_write_many_sets<M, N, H, S>( + &self, + endpoint: &Arc<Endpoint<M, H>>, + to_sets: &[Vec<Uuid>], + msg: N, + strategy: RequestStrategy, + ) -> Result<Vec<S>, Error> + where + M: Rpc<Response = Result<S, Error>> + 'static, + N: IntoReq<M>, + H: StreamingEndpointHandler<M> + 'static, + S: Send + 'static, + { + let quorum = strategy + .rs_quorum + .expect("internal error: missing quroum in try_write_many_sets"); + + let tracer = opentelemetry::global::tracer("garage"); + let span_name = format!( + "Write RPC {} (quorum {} in {} sets)", + endpoint.path(), + quorum, + to_sets.len() + ); + + let mut span = tracer.start(span_name); + span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id))); + span.set_attribute(KeyValue::new("to", format!("{:?}", to_sets))); + span.set_attribute(KeyValue::new("quorum", quorum as i64)); + + self.try_write_many_sets_inner(endpoint, to_sets, msg, strategy, quorum) + .with_context(Context::current_with_span(span)) + .await + } + + async fn try_write_many_sets_inner<M, N, H, S>( + &self, + endpoint: &Arc<Endpoint<M, H>>, + to_sets: &[Vec<Uuid>], + msg: N, + strategy: RequestStrategy, + quorum: usize, + ) -> Result<Vec<S>, Error> + where + M: Rpc<Response = Result<S, Error>> + 'static, + N: IntoReq<M>, + H: StreamingEndpointHandler<M> + 'static, + S: Send + 'static, + { + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + + let mut peers = HashMap::<Uuid, Vec<usize>>::new(); + for (i, set) in to_sets.iter().enumerate() { + for peer in set.iter() { + peers.entry(*peer).or_default().push(i); + } + } + + let requests = peers.iter().map(|(peer, _)| { + let self2 = self.clone(); + let msg = msg.clone(); + let endpoint2 = endpoint.clone(); + let to = *peer; + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", to)); + let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }; + tokio::spawn(fut.with_context(Context::current_with_span(span))) + }); + let mut resp_stream = requests.collect::<FuturesUnordered<_>>(); + + let mut successes = vec![]; + let mut errors = vec![]; + + let mut set_counters = vec![(0, 0); to_sets.len()]; + + while !resp_stream.is_empty() { + let (node, resp) = resp_stream.next().await.unwrap().unwrap(); + + match resp { + Ok(msg) => { + for set in peers.get(&node).unwrap().iter() { + set_counters[*set].0 += 1; + } + successes.push(msg); + } + Err(e) => { + for set in peers.get(&node).unwrap().iter() { + set_counters[*set].1 += 1; + } + errors.push(e); + } + } + + if set_counters.iter().all(|x| x.0 > quorum) { + // Success + + // Continue all other requets in background + tokio::spawn(async move { + resp_stream.collect::<Vec<Result<_, _>>>().await; + }); + + return Ok(successes); + } + + if set_counters + .iter() + .enumerate() + .any(|(i, x)| x.1 + quorum > to_sets[i].len()) + { + // Too many errors in this set, we know we won't get a quorum + break; + } + } + + // Failure, could not get quorum + let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); + Err(Error::Quorum(quorum, successes.len(), peers.len(), errors)) + } } |