aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-06 19:31:42 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-06 19:31:42 +0200
commitc2cc08852bcbd94bad5c15c39e7145c0496d7241 (patch)
treeea3ba2a9f97788cc5ff8ef2f01a3ace0c947977c
parent13b5f28c7e8dec12b1db61735931b3830a3c893f (diff)
downloadgarage-c2cc08852bcbd94bad5c15c39e7145c0496d7241.tar.gz
garage-c2cc08852bcbd94bad5c15c39e7145c0496d7241.zip
Reenable node ordering
-rw-r--r--src/block/manager.rs10
1 files changed, 4 insertions, 6 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index a9def3b0..66a454b0 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -9,7 +9,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
-use futures::{Stream, TryStreamExt};
+use futures::Stream;
use futures_util::stream::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
@@ -191,7 +191,7 @@ impl BlockManager {
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
- //let who = self.system.rpc.request_order(&who);
+ let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -238,7 +238,7 @@ impl BlockManager {
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash);
- //let who = self.system.rpc.request_order(&who);
+ let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -296,9 +296,7 @@ impl BlockManager {
> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header {
- DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| {
- std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error")
- }))),
+ DataBlockHeader::Plain => Ok(Box::pin(stream)),
DataBlockHeader::Compressed => {
// Too many things, I hate it.
let reader = stream_asyncread(stream);