diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 15:20:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-29 12:25:02 +0200 |
commit | 8e7e680afe39f48fe15f365c9ef3fee57596e119 (patch) | |
tree | cc465dcd31776c1b8a865b22b2de923fca26efdb /src/block | |
parent | 16f6a1a65d4b973ea13cd00bbfdd7e225041e447 (diff) | |
download | garage-8e7e680afe39f48fe15f365c9ef3fee57596e119.tar.gz garage-8e7e680afe39f48fe15f365c9ef3fee57596e119.zip |
First adaptation to WIP netapp with streaming body
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/manager.rs | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index be53ec6e..408de148 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::future::*; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::select; @@ -637,24 +636,24 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.system.rpc.call_arc( + let who_needs_resps = self + .system + .rpc + .call_many( &self.endpoint, - *to, - msg.clone(), + &who, + BlockRpc::NeedBlockQuery(*hash), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) - }); - let who_needs_resps = join_all(who_needs_fut).await; + .await?; let mut need_nodes = vec![]; - for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + for (node, needed) in who_needs_resps.into_iter() { match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { - need_nodes.push(*node); + need_nodes.push(node); } } m => { |