aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 15:20:00 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:25:02 +0200
commit8e7e680afe39f48fe15f365c9ef3fee57596e119 (patch)
treecc465dcd31776c1b8a865b22b2de923fca26efdb /src/block/manager.rs
parent16f6a1a65d4b973ea13cd00bbfdd7e225041e447 (diff)
downloadgarage-8e7e680afe39f48fe15f365c9ef3fee57596e119.tar.gz
garage-8e7e680afe39f48fe15f365c9ef3fee57596e119.zip
First adaptation to WIP netapp with streaming body
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs19
1 files changed, 9 insertions, 10 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index be53ec6e..408de148 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -8,7 +8,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
-use futures::future::*;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::select;
@@ -637,24 +636,24 @@ impl BlockManager {
}
who.retain(|id| *id != self.system.id);
- let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
- let who_needs_fut = who.iter().map(|to| {
- self.system.rpc.call_arc(
+ let who_needs_resps = self
+ .system
+ .rpc
+ .call_many(
&self.endpoint,
- *to,
- msg.clone(),
+ &who,
+ BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
)
- });
- let who_needs_resps = join_all(who_needs_fut).await;
+ .await?;
let mut need_nodes = vec![];
- for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
+ for (node, needed) in who_needs_resps.into_iter() {
match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => {
if needed {
- need_nodes.push(*node);
+ need_nodes.push(node);
}
}
m => {