aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/message.rs8
-rw-r--r--src/recv.rs3
-rw-r--r--src/send.rs5
-rw-r--r--src/util.rs3
4 files changed, 11 insertions, 8 deletions
diff --git a/src/message.rs b/src/message.rs
index dbcc857..6d50254 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -192,8 +192,8 @@ impl Framing {
// required because otherwise the borrow-checker complains
let Framing { direct, stream } = self;
- let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec()) })
- .chain(stream::once(async move { Ok(direct) }));
+ let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec().into()) })
+ .chain(stream::once(async move { Ok(direct.into()) }));
if let Some(stream) = stream {
Box::pin(res.chain(stream))
@@ -217,7 +217,7 @@ impl Framing {
let mut len = [0; 4];
len.copy_from_slice(&packet[..4]);
let len = u32::from_be_bytes(len);
- packet.drain(..4);
+ packet = packet.slice(4..);
let mut buffer = Vec::new();
let len = len as usize;
@@ -226,7 +226,7 @@ impl Framing {
buffer.extend_from_slice(&packet[..max_cp]);
if buffer.len() == len {
- packet.drain(..max_cp);
+ packet = packet.slice(max_cp..);
break;
}
packet = stream
diff --git a/src/recv.rs b/src/recv.rs
index f5221e6..abe7b9a 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
+use bytes::Bytes;
use log::trace;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
@@ -85,7 +86,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;
trace!("recv_loop: read {} bytes", next_slice.len());
- Ok(next_slice)
+ Ok(Bytes::from(next_slice))
};
let mut sender = if let Some(send) = streams.remove(&(id)) {
diff --git a/src/send.rs b/src/send.rs
index 0179eb2..660e85c 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
+use bytes::Bytes;
use async_trait::async_trait;
use log::trace;
@@ -49,7 +50,7 @@ impl From<ByteStream> for DataReader {
fn from(data: ByteStream) -> DataReader {
DataReader {
reader: data,
- packet: Ok(Vec::new()),
+ packet: Ok(Bytes::new()),
pos: 0,
buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize),
eos: false,
@@ -130,7 +131,7 @@ impl Stream for DataReader {
Ok(v) => v,
Err(e) => {
let e = *e;
- *this.packet = Ok(Vec::new());
+ *this.packet = Ok(Bytes::new());
*this.eos = true;
return Poll::Ready(Some(DataFrame::Error(e)));
}
diff --git a/src/util.rs b/src/util.rs
index 6fbafe6..e81a89c 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -4,6 +4,7 @@ use std::pin::Pin;
use log::info;
use serde::Serialize;
+use bytes::Bytes;
use futures::Stream;
use tokio::sync::watch;
@@ -27,7 +28,7 @@ pub type NetworkKey = sodiumoxide::crypto::auth::Key;
/// meaning, it's up to your application to define their semantic.
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send>>;
-pub type Packet = Result<Vec<u8>, u8>;
+pub type Packet = Result<Bytes, u8>;
/// Utility function: encodes any serializable value in MessagePack binary format
/// using the RMP library.