diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/message.rs | 8 | ||||
-rw-r--r-- | src/recv.rs | 3 | ||||
-rw-r--r-- | src/send.rs | 5 | ||||
-rw-r--r-- | src/util.rs | 3 |
4 files changed, 11 insertions, 8 deletions
diff --git a/src/message.rs b/src/message.rs index dbcc857..6d50254 100644 --- a/src/message.rs +++ b/src/message.rs @@ -192,8 +192,8 @@ impl Framing { // required because otherwise the borrow-checker complains let Framing { direct, stream } = self; - let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec()) }) - .chain(stream::once(async move { Ok(direct) })); + let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec().into()) }) + .chain(stream::once(async move { Ok(direct.into()) })); if let Some(stream) = stream { Box::pin(res.chain(stream)) @@ -217,7 +217,7 @@ impl Framing { let mut len = [0; 4]; len.copy_from_slice(&packet[..4]); let len = u32::from_be_bytes(len); - packet.drain(..4); + packet = packet.slice(4..); let mut buffer = Vec::new(); let len = len as usize; @@ -226,7 +226,7 @@ impl Framing { buffer.extend_from_slice(&packet[..max_cp]); if buffer.len() == len { - packet.drain(..max_cp); + packet = packet.slice(max_cp..); break; } packet = stream diff --git a/src/recv.rs b/src/recv.rs index f5221e6..abe7b9a 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use log::trace; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; @@ -85,7 +86,7 @@ pub(crate) trait RecvLoop: Sync + 'static { let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; trace!("recv_loop: read {} bytes", next_slice.len()); - Ok(next_slice) + Ok(Bytes::from(next_slice)) }; let mut sender = if let Some(send) = streams.remove(&(id)) { diff --git a/src/send.rs b/src/send.rs index 0179eb2..660e85c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::Bytes; use async_trait::async_trait; use log::trace; @@ -49,7 +50,7 @@ impl From<ByteStream> for DataReader { fn from(data: ByteStream) -> DataReader { DataReader { reader: data, - packet: Ok(Vec::new()), + packet: Ok(Bytes::new()), pos: 0, buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), eos: false, @@ -130,7 +131,7 @@ impl Stream for DataReader { Ok(v) => v, Err(e) => { let e = *e; - *this.packet = Ok(Vec::new()); + *this.packet = Ok(Bytes::new()); *this.eos = true; return Poll::Ready(Some(DataFrame::Error(e))); } diff --git a/src/util.rs b/src/util.rs index 6fbafe6..e81a89c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use log::info; use serde::Serialize; +use bytes::Bytes; use futures::Stream; use tokio::sync::watch; @@ -27,7 +28,7 @@ pub type NetworkKey = sodiumoxide::crypto::auth::Key; /// meaning, it's up to your application to define their semantic. pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send>>; -pub type Packet = Result<Vec<u8>, u8>; +pub type Packet = Result<Bytes, u8>; /// Utility function: encodes any serializable value in MessagePack binary format /// using the RMP library. |