aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs22
1 files changed, 4 insertions, 18 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 96ea2c96..54290888 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -8,7 +8,6 @@ use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
-use futures_util::stream::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex, MutexGuard};
@@ -18,7 +17,7 @@ use opentelemetry::{
Context,
};
-use garage_net::stream::{stream_asyncread, ByteStream};
+use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db;
@@ -247,7 +246,8 @@ impl BlockManager {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
read_stream_to_end(stream)
.await
- .map(|data| DataBlock::from_parts(header, data))
+ .err_context("error in block data stream")
+ .map(|data| DataBlock::from_parts(header, data.into_bytes()))
})
.await
}
@@ -477,7 +477,7 @@ impl BlockManager {
stream: Option<ByteStream>,
) -> Result<(), Error> {
let stream = stream.ok_or_message("missing stream")?;
- let bytes = read_stream_to_end(stream).await?;
+ let bytes = read_stream_to_end(stream).await?.into_bytes();
let data = DataBlock::from_parts(header, bytes);
self.write_block(&hash, &data).await
}
@@ -823,20 +823,6 @@ impl BlockManagerLocked {
}
}
-async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
- let mut parts: Vec<Bytes> = vec![];
- while let Some(part) = stream.next().await {
- parts.push(part.ok_or_message("error in stream")?);
- }
-
- Ok(parts
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat()
- .into())
-}
-
struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop {