aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-23 11:46:57 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-23 11:46:57 +0100
commit9b41f4ff2016b099ee706747828f5828f8c96b1c (patch)
tree3c9e06d8d0ca7162e97e765bbe8091a5eaf676fd /src
parent93552b9275a6a83653353e27c0ee0c64c7a23d59 (diff)
downloadgarage-9b41f4ff2016b099ee706747828f5828f8c96b1c.tar.gz
garage-9b41f4ff2016b099ee706747828f5828f8c96b1c.zip
[refactor-block] move read_stream_to_end to garage_net
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs22
-rw-r--r--src/net/bytes_buf.rs13
-rw-r--r--src/net/stream.rs11
3 files changed, 28 insertions, 18 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 96ea2c96..54290888 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -8,7 +8,6 @@ use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
-use futures_util::stream::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex, MutexGuard};
@@ -18,7 +17,7 @@ use opentelemetry::{
Context,
};
-use garage_net::stream::{stream_asyncread, ByteStream};
+use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db;
@@ -247,7 +246,8 @@ impl BlockManager {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
read_stream_to_end(stream)
.await
- .map(|data| DataBlock::from_parts(header, data))
+ .err_context("error in block data stream")
+ .map(|data| DataBlock::from_parts(header, data.into_bytes()))
})
.await
}
@@ -477,7 +477,7 @@ impl BlockManager {
stream: Option<ByteStream>,
) -> Result<(), Error> {
let stream = stream.ok_or_message("missing stream")?;
- let bytes = read_stream_to_end(stream).await?;
+ let bytes = read_stream_to_end(stream).await?.into_bytes();
let data = DataBlock::from_parts(header, bytes);
self.write_block(&hash, &data).await
}
@@ -823,20 +823,6 @@ impl BlockManagerLocked {
}
}
-async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
- let mut parts: Vec<Bytes> = vec![];
- while let Some(part) = stream.next().await {
- parts.push(part.ok_or_message("error in stream")?);
- }
-
- Ok(parts
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat()
- .into())
-}
-
struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop {
diff --git a/src/net/bytes_buf.rs b/src/net/bytes_buf.rs
index 3929a860..1d928ffb 100644
--- a/src/net/bytes_buf.rs
+++ b/src/net/bytes_buf.rs
@@ -3,6 +3,8 @@ use std::collections::VecDeque;
use bytes::BytesMut;
+use crate::stream::ByteStream;
+
pub use bytes::Bytes;
/// A circular buffer of bytes, internally represented as a list of Bytes
@@ -119,6 +121,17 @@ impl BytesBuf {
pub fn into_slices(self) -> VecDeque<Bytes> {
self.buf
}
+
+ /// Return the entire buffer concatenated into a single big Bytes
+ pub fn into_bytes(mut self) -> Bytes {
+ self.take_all()
+ }
+
+ /// Return the content as a stream of individual chunks
+ pub fn into_stream(self) -> ByteStream {
+ use futures::stream::StreamExt;
+ Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x)))
+ }
}
impl Default for BytesBuf {
diff --git a/src/net/stream.rs b/src/net/stream.rs
index 88c3fed4..3ac6896d 100644
--- a/src/net/stream.rs
+++ b/src/net/stream.rs
@@ -200,3 +200,14 @@ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> Byte
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
tokio_util::io::StreamReader::new(stream)
}
+
+/// Reads all of the content of a `ByteStream` into a BytesBuf
+/// that contains everything
+pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> {
+ let mut buf = BytesBuf::new();
+ while let Some(part) = stream.next().await {
+ buf.extend(part?);
+ }
+
+ Ok(buf)
+}