aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-16 16:34:01 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-16 16:34:01 +0100
commit22f38808e744ea5b30ad771fcb344a29579b56d4 (patch)
treee29f7c6d34d07c6fc4eb5d551ddb6e987be30fec
parent707442f5de416fdbed4681a33b739f0a787b7834 (diff)
downloadgarage-22f38808e744ea5b30ad771fcb344a29579b56d4.tar.gz
garage-22f38808e744ea5b30ad771fcb344a29579b56d4.zip
rpc_helper: don't use tokio::spawn for individual requests
-rw-r--r--src/rpc/rpc_helper.rs18
1 files changed, 7 insertions, 11 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index e269ddaa..7e9fabd7 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -299,9 +299,7 @@ impl RpcHelper {
if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
- resp_stream.push(tokio::spawn(
- fut.with_context(Context::current_with_span(span)),
- ));
+ resp_stream.push(fut.with_context(Context::current_with_span(span)));
} else {
break;
}
@@ -313,7 +311,7 @@ impl RpcHelper {
}
// Wait for one request to terminate
- match resp_stream.next().await.unwrap().unwrap() {
+ match resp_stream.next().await.unwrap() {
Ok(msg) => {
successes.push(msg);
}
@@ -448,7 +446,7 @@ impl RpcHelper {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", to));
let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) };
- tokio::spawn(fut.with_context(Context::current_with_span(span)))
+ fut.with_context(Context::current_with_span(span))
});
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
@@ -457,9 +455,7 @@ impl RpcHelper {
let mut set_counters = vec![(0, 0); to_sets.len()];
- while !resp_stream.is_empty() {
- let (node, resp) = resp_stream.next().await.unwrap().unwrap();
-
+ while let Some((node, resp)) = resp_stream.next().await {
match resp {
Ok(msg) => {
for set in peers.get(&node).unwrap().iter() {
@@ -475,12 +471,12 @@ impl RpcHelper {
}
}
- if set_counters.iter().all(|x| x.0 >= quorum) {
+ if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
// Success
// Continue all other requets in background
tokio::spawn(async move {
- resp_stream.collect::<Vec<Result<_, _>>>().await;
+ resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
});
return Ok(successes);
@@ -489,7 +485,7 @@ impl RpcHelper {
if set_counters
.iter()
.enumerate()
- .any(|(i, x)| x.1 + quorum > to_sets[i].len())
+ .any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len())
{
// Too many errors in this set, we know we won't get a quorum
break;