diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-06 19:31:42 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-06 19:31:42 +0200 |
commit | c2cc08852bcbd94bad5c15c39e7145c0496d7241 (patch) | |
tree | ea3ba2a9f97788cc5ff8ef2f01a3ace0c947977c | |
parent | 13b5f28c7e8dec12b1db61735931b3830a3c893f (diff) | |
download | garage-c2cc08852bcbd94bad5c15c39e7145c0496d7241.tar.gz garage-c2cc08852bcbd94bad5c15c39e7145c0496d7241.zip |
Reenable node ordering
-rw-r--r-- | src/block/manager.rs | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index a9def3b0..66a454b0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::{Stream, TryStreamExt}; +use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -191,7 +191,7 @@ impl BlockManager { order_tag: Option<OrderTag>, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); - //let who = self.system.rpc.request_order(&who); + let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -238,7 +238,7 @@ impl BlockManager { order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { let who = self.replication.read_nodes(hash); - //let who = self.system.rpc.request_order(&who); + let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -296,9 +296,7 @@ impl BlockManager { > { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { - DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { - std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") - }))), + DataBlockHeader::Plain => Ok(Box::pin(stream)), DataBlockHeader::Compressed => { // Too many things, I hate it. let reader = stream_asyncread(stream); |