diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-16 16:34:01 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-16 16:34:01 +0100 |
commit | 22f38808e744ea5b30ad771fcb344a29579b56d4 (patch) | |
tree | e29f7c6d34d07c6fc4eb5d551ddb6e987be30fec | |
parent | 707442f5de416fdbed4681a33b739f0a787b7834 (diff) | |
download | garage-22f38808e744ea5b30ad771fcb344a29579b56d4.tar.gz garage-22f38808e744ea5b30ad771fcb344a29579b56d4.zip |
rpc_helper: don't use tokio::spawn for individual requests
-rw-r--r-- | src/rpc/rpc_helper.rs | 18 |
1 files changed, 7 insertions, 11 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index e269ddaa..7e9fabd7 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -299,9 +299,7 @@ impl RpcHelper { if let Some((req_to, fut)) = requests.next() { let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); - resp_stream.push(tokio::spawn( - fut.with_context(Context::current_with_span(span)), - )); + resp_stream.push(fut.with_context(Context::current_with_span(span))); } else { break; } @@ -313,7 +311,7 @@ impl RpcHelper { } // Wait for one request to terminate - match resp_stream.next().await.unwrap().unwrap() { + match resp_stream.next().await.unwrap() { Ok(msg) => { successes.push(msg); } @@ -448,7 +446,7 @@ impl RpcHelper { let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", to)); let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }; - tokio::spawn(fut.with_context(Context::current_with_span(span))) + fut.with_context(Context::current_with_span(span)) }); let mut resp_stream = requests.collect::<FuturesUnordered<_>>(); @@ -457,9 +455,7 @@ impl RpcHelper { let mut set_counters = vec![(0, 0); to_sets.len()]; - while !resp_stream.is_empty() { - let (node, resp) = resp_stream.next().await.unwrap().unwrap(); - + while let Some((node, resp)) = resp_stream.next().await { match resp { Ok(msg) => { for set in peers.get(&node).unwrap().iter() { @@ -475,12 +471,12 @@ impl RpcHelper { } } - if set_counters.iter().all(|x| x.0 >= quorum) { + if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { // Success // Continue all other requets in background tokio::spawn(async move { - resp_stream.collect::<Vec<Result<_, _>>>().await; + resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await; }); return Ok(successes); @@ -489,7 +485,7 @@ impl RpcHelper { if set_counters .iter() .enumerate() - .any(|(i, x)| x.1 + quorum > to_sets[i].len()) + .any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len()) { // Too many errors in this set, we know we won't get a quorum break; |