aboutsummaryrefslogtreecommitdiff
path: root/src/net
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-23 16:50:34 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-23 16:50:34 +0100
commit916c67ccf4c9d31c14088f2d775e15c64750458f (patch)
tree63226f78b128f33b63d3b223283047372bcc31f2 /src/net
parent81cebdd12415381f67747e96591e83b1a4a8cc0b (diff)
parent61758ce0f91b930542bd2ee3c72735000cc12e75 (diff)
downloadgarage-916c67ccf4c9d31c14088f2d775e15c64750458f.tar.gz
garage-916c67ccf4c9d31c14088f2d775e15c64750458f.zip
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/net')
-rw-r--r--src/net/bytes_buf.rs13
-rw-r--r--src/net/stream.rs11
2 files changed, 24 insertions, 0 deletions
diff --git a/src/net/bytes_buf.rs b/src/net/bytes_buf.rs
index 3929a860..1d928ffb 100644
--- a/src/net/bytes_buf.rs
+++ b/src/net/bytes_buf.rs
@@ -3,6 +3,8 @@ use std::collections::VecDeque;
use bytes::BytesMut;
+use crate::stream::ByteStream;
+
pub use bytes::Bytes;
/// A circular buffer of bytes, internally represented as a list of Bytes
@@ -119,6 +121,17 @@ impl BytesBuf {
pub fn into_slices(self) -> VecDeque<Bytes> {
self.buf
}
+
+ /// Return the entire buffer concatenated into a single big Bytes
+ pub fn into_bytes(mut self) -> Bytes {
+ self.take_all()
+ }
+
+ /// Return the content as a stream of individual chunks
+ pub fn into_stream(self) -> ByteStream {
+ use futures::stream::StreamExt;
+ Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x)))
+ }
}
impl Default for BytesBuf {
diff --git a/src/net/stream.rs b/src/net/stream.rs
index 88c3fed4..3ac6896d 100644
--- a/src/net/stream.rs
+++ b/src/net/stream.rs
@@ -200,3 +200,14 @@ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> Byte
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
tokio_util::io::StreamReader::new(stream)
}
+
+/// Reads all of the content of a `ByteStream` into a BytesBuf
+/// that contains everything
+pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> {
+ let mut buf = BytesBuf::new();
+ while let Some(part) = stream.next().await {
+ buf.extend(part?);
+ }
+
+ Ok(buf)
+}