aboutsummaryrefslogtreecommitdiff
path: root/src/proto.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-12 17:59:46 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-12 17:59:46 +0200
commitf87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b (patch)
tree5407c8eab331d066e66f5193d51f6fd66bedb9bb /src/proto.rs
parent040231d554b74e981644e606c096ced6fc36a2ad (diff)
downloadnetapp-f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b.tar.gz
netapp-f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b.zip
WIP v0.3.0 with changed API
Diffstat (limited to 'src/proto.rs')
-rw-r--r--src/proto.rs36
1 files changed, 22 insertions, 14 deletions
diff --git a/src/proto.rs b/src/proto.rs
index ef3b31c..5b71ba5 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -38,9 +38,10 @@ pub const PRIO_PRIMARY: RequestPriority = 0x00;
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
pub const PRIO_SECONDARY: RequestPriority = 0x01;
-const MAX_CHUNK_SIZE: usize = 0x4000;
-
-pub(crate) type RequestID = u16;
+pub(crate) type RequestID = u32;
+type ChunkLength = u16;
+const MAX_CHUNK_LENGTH: ChunkLength = 0x4000;
+const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
struct SendQueueItem {
id: RequestID,
@@ -85,6 +86,12 @@ impl SendQueue {
}
}
+// Messages are sent by chunks
+// Chunk format:
+// - u32 BE: request id (same for request and response)
+// - u16 BE: chunk length
+// - [u8; chunk_length] chunk data
+
#[async_trait]
pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
@@ -117,22 +124,23 @@ pub(crate) trait SendLoop: Sync {
item.data.len(),
item.cursor
);
- let header_id = u16::to_be_bytes(item.id);
+ let header_id = RequestID::to_be_bytes(item.id);
write.write_all(&header_id[..]).await?;
- if item.data.len() - item.cursor > MAX_CHUNK_SIZE {
- let header_size = u16::to_be_bytes(MAX_CHUNK_SIZE as u16 | 0x8000);
+ if item.data.len() - item.cursor > MAX_CHUNK_LENGTH as usize {
+ let header_size =
+ ChunkLength::to_be_bytes(MAX_CHUNK_LENGTH | CHUNK_HAS_CONTINUATION);
write.write_all(&header_size[..]).await?;
- let new_cursor = item.cursor + MAX_CHUNK_SIZE as usize;
+ let new_cursor = item.cursor + MAX_CHUNK_LENGTH as usize;
write.write_all(&item.data[item.cursor..new_cursor]).await?;
item.cursor = new_cursor;
sending.push(item);
} else {
- let send_len = (item.data.len() - item.cursor) as u16;
+ let send_len = (item.data.len() - item.cursor) as ChunkLength;
- let header_size = u16::to_be_bytes(send_len);
+ let header_size = ChunkLength::to_be_bytes(send_len);
write.write_all(&header_size[..]).await?;
write.write_all(&item.data[item.cursor..]).await?;
@@ -172,18 +180,18 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut receiving = HashMap::new();
loop {
trace!("recv_loop: reading packet");
- let mut header_id = [0u8; 2];
+ let mut header_id = [0u8; RequestID::BITS as usize / 8];
read.read_exact(&mut header_id[..]).await?;
let id = RequestID::from_be_bytes(header_id);
trace!("recv_loop: got header id: {:04x}", id);
- let mut header_size = [0u8; 2];
+ let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?;
- let size = RequestID::from_be_bytes(header_size);
+ let size = ChunkLength::from_be_bytes(header_size);
trace!("recv_loop: got header size: {:04x}", size);
- let has_cont = (size & 0x8000) != 0;
- let size = size & !0x8000;
+ let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
+ let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;