diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-23 16:50:34 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-23 16:50:34 +0100 |
commit | 916c67ccf4c9d31c14088f2d775e15c64750458f (patch) | |
tree | 63226f78b128f33b63d3b223283047372bcc31f2 /src/net | |
parent | 81cebdd12415381f67747e96591e83b1a4a8cc0b (diff) | |
parent | 61758ce0f91b930542bd2ee3c72735000cc12e75 (diff) | |
download | garage-916c67ccf4c9d31c14088f2d775e15c64750458f.tar.gz garage-916c67ccf4c9d31c14088f2d775e15c64750458f.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/net')
-rw-r--r-- | src/net/bytes_buf.rs | 13 | ||||
-rw-r--r-- | src/net/stream.rs | 11 |
2 files changed, 24 insertions, 0 deletions
diff --git a/src/net/bytes_buf.rs b/src/net/bytes_buf.rs index 3929a860..1d928ffb 100644 --- a/src/net/bytes_buf.rs +++ b/src/net/bytes_buf.rs @@ -3,6 +3,8 @@ use std::collections::VecDeque; use bytes::BytesMut; +use crate::stream::ByteStream; + pub use bytes::Bytes; /// A circular buffer of bytes, internally represented as a list of Bytes @@ -119,6 +121,17 @@ impl BytesBuf { pub fn into_slices(self) -> VecDeque<Bytes> { self.buf } + + /// Return the entire buffer concatenated into a single big Bytes + pub fn into_bytes(mut self) -> Bytes { + self.take_all() + } + + /// Return the content as a stream of individual chunks + pub fn into_stream(self) -> ByteStream { + use futures::stream::StreamExt; + Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x))) + } } impl Default for BytesBuf { diff --git a/src/net/stream.rs b/src/net/stream.rs index 88c3fed4..3ac6896d 100644 --- a/src/net/stream.rs +++ b/src/net/stream.rs @@ -200,3 +200,14 @@ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> Byte pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { tokio_util::io::StreamReader::new(stream) } + +/// Reads all of the content of a `ByteStream` into a BytesBuf +/// that contains everything +pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> { + let mut buf = BytesBuf::new(); + while let Some(part) = stream.next().await { + buf.extend(part?); + } + + Ok(buf) +} |