aboutsummaryrefslogtreecommitdiff
path: root/src/proto.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-13 12:56:53 +0200
committerAlex <alex@adnab.me>2022-09-13 12:56:53 +0200
commit8ac109e3a84bd34550d66baf65fe59b86b63bca2 (patch)
treea49a199a1049d18afaa60f47f46e04cb798aa4b2 /src/proto.rs
parenta82700c5a27612002e6ee029ae77915b8114182f (diff)
parent298e956a199711b65ce3820931ca943108b78225 (diff)
downloadnetapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.tar.gz
netapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.zip
Merge pull request 'add streaming body to requests and responses' (#3) from stream-body into mainv0.5.0
Reviewed-on: https://git.deuxfleurs.fr/lx/netapp/pulls/3
Diffstat (limited to 'src/proto.rs')
-rw-r--r--src/proto.rs358
1 files changed, 0 insertions, 358 deletions
diff --git a/src/proto.rs b/src/proto.rs
deleted file mode 100644
index 8f7e70f..0000000
--- a/src/proto.rs
+++ /dev/null
@@ -1,358 +0,0 @@
-use std::collections::{HashMap, VecDeque};
-use std::fmt::Write;
-use std::sync::Arc;
-
-use log::trace;
-
-use futures::{AsyncReadExt, AsyncWriteExt};
-use kuska_handshake::async_std::BoxStreamWrite;
-
-use tokio::sync::mpsc;
-
-use async_trait::async_trait;
-
-use crate::error::*;
-
-/// Priority of a request (click to read more about priorities).
-///
-/// This priority value is used to priorize messages
-/// in the send queue of the client, and their responses in the send queue of the
-/// server. Lower values mean higher priority.
-///
-/// This mechanism is usefull for messages bigger than the maximum chunk size
-/// (set at `0x4000` bytes), such as large file transfers.
-/// In such case, all of the messages in the send queue with the highest priority
-/// will take turns to send individual chunks, in a round-robin fashion.
-/// Once all highest priority messages are sent successfully, the messages with
-/// the next highest priority will begin being sent in the same way.
-///
-/// The same priority value is given to a request and to its associated response.
-pub type RequestPriority = u8;
-
-/// Priority class: high
-pub const PRIO_HIGH: RequestPriority = 0x20;
-/// Priority class: normal
-pub const PRIO_NORMAL: RequestPriority = 0x40;
-/// Priority class: background
-pub const PRIO_BACKGROUND: RequestPriority = 0x80;
-/// Priority: primary among given class
-pub const PRIO_PRIMARY: RequestPriority = 0x00;
-/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
-pub const PRIO_SECONDARY: RequestPriority = 0x01;
-
-// Messages are sent by chunks
-// Chunk format:
-// - u32 BE: request id (same for request and response)
-// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag
-// when this is not the last chunk of the message
-// - [u8; chunk_length] chunk data
-
-pub(crate) type RequestID = u32;
-type ChunkLength = u16;
-const MAX_CHUNK_LENGTH: ChunkLength = 0x4000;
-const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
-
-struct SendQueueItem {
- id: RequestID,
- prio: RequestPriority,
- data: Vec<u8>,
- cursor: usize,
-}
-
-struct SendQueue {
- items: VecDeque<(u8, VecDeque<SendQueueItem>)>,
-}
-
-impl SendQueue {
- fn new() -> Self {
- Self {
- items: VecDeque::with_capacity(64),
- }
- }
- fn push(&mut self, item: SendQueueItem) {
- let prio = item.prio;
- let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) {
- Ok(i) => i,
- Err(i) => {
- self.items.insert(i, (prio, VecDeque::new()));
- i
- }
- };
- self.items[pos_prio].1.push_back(item);
- }
- fn pop(&mut self) -> Option<SendQueueItem> {
- match self.items.pop_front() {
- None => None,
- Some((prio, mut items_at_prio)) => {
- let ret = items_at_prio.pop_front();
- if !items_at_prio.is_empty() {
- self.items.push_front((prio, items_at_prio));
- }
- ret.or_else(|| self.pop())
- }
- }
- }
- fn is_empty(&self) -> bool {
- self.items.iter().all(|(_k, v)| v.is_empty())
- }
- fn dump(&self) -> String {
- let mut ret = String::new();
- for (prio, q) in self.items.iter() {
- for item in q.iter() {
- write!(
- &mut ret,
- " [{} {} ({})]",
- prio,
- item.data.len() - item.cursor,
- item.id
- )
- .unwrap();
- }
- }
- ret
- }
-}
-
-/// The SendLoop trait, which is implemented both by the client and the server
-/// connection objects (ServerConna and ClientConn) adds a method `.send_loop()`
-/// that takes a channel of messages to send and an asynchronous writer,
-/// and sends messages from the channel to the async writer, putting them in a queue
-/// before being sent and doing the round-robin sending strategy.
-///
-/// The `.send_loop()` exits when the sending end of the channel is closed,
-/// or if there is an error at any time writing to the async writer.
-#[async_trait]
-pub(crate) trait SendLoop: Sync {
- async fn send_loop<W>(
- self: Arc<Self>,
- mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
- mut write: BoxStreamWrite<W>,
- debug_name: String,
- ) -> Result<(), Error>
- where
- W: AsyncWriteExt + Unpin + Send + Sync,
- {
- let mut sending = SendQueue::new();
- let mut should_exit = false;
- while !should_exit || !sending.is_empty() {
- trace!("send_loop({}): queue = {}", debug_name, sending.dump());
- if let Ok((id, prio, data)) = msg_recv.try_recv() {
- trace!(
- "send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
- debug_name,
- id,
- prio,
- data.len()
- );
- sending.push(SendQueueItem {
- id,
- prio,
- data,
- cursor: 0,
- });
- } else if let Some(mut item) = sending.pop() {
- trace!(
- "send_loop({}): sending bytes for {} ({} bytes, {} already sent)",
- debug_name,
- item.id,
- item.data.len(),
- item.cursor
- );
- let header_id = RequestID::to_be_bytes(item.id);
- write.write_all(&header_id[..]).await?;
-
- if item.data.len() - item.cursor > MAX_CHUNK_LENGTH as usize {
- let size_header =
- ChunkLength::to_be_bytes(MAX_CHUNK_LENGTH | CHUNK_HAS_CONTINUATION);
- write.write_all(&size_header[..]).await?;
-
- let new_cursor = item.cursor + MAX_CHUNK_LENGTH as usize;
- write.write_all(&item.data[item.cursor..new_cursor]).await?;
- item.cursor = new_cursor;
-
- sending.push(item);
- } else {
- let send_len = (item.data.len() - item.cursor) as ChunkLength;
-
- let size_header = ChunkLength::to_be_bytes(send_len);
- write.write_all(&size_header[..]).await?;
-
- write.write_all(&item.data[item.cursor..]).await?;
- }
- write.flush().await?;
- } else {
- let sth = msg_recv.recv().await;
- if let Some((id, prio, data)) = sth {
- trace!(
- "send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
- debug_name,
- id,
- prio,
- data.len()
- );
- sending.push(SendQueueItem {
- id,
- prio,
- data,
- cursor: 0,
- });
- } else {
- should_exit = true;
- }
- }
- }
-
- let _ = write.goodbye().await;
- Ok(())
- }
-}
-
-/// The RecvLoop trait, which is implemented both by the client and the server
-/// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()`
-/// and a prototype of a handler for received messages `.recv_handler()` that
-/// must be filled by implementors. `.recv_loop()` receives messages in a loop
-/// according to the protocol defined above: chunks of message in progress of being
-/// received are stored in a buffer, and when the last chunk of a message is received,
-/// the full message is passed to the receive handler.
-#[async_trait]
-pub(crate) trait RecvLoop: Sync + 'static {
- fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
-
- async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
- where
- R: AsyncReadExt + Unpin + Send + Sync,
- {
- let mut receiving = HashMap::new();
- loop {
- let mut header_id = [0u8; RequestID::BITS as usize / 8];
- match read.read_exact(&mut header_id[..]).await {
- Ok(_) => (),
- Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
- Err(e) => return Err(e.into()),
- };
- let id = RequestID::from_be_bytes(header_id);
-
- let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
- read.read_exact(&mut header_size[..]).await?;
- let size = ChunkLength::from_be_bytes(header_size);
- trace!(
- "recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)",
- debug_name,
- id,
- size,
- size & !CHUNK_HAS_CONTINUATION
- );
-
- let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
- let size = size & !CHUNK_HAS_CONTINUATION;
-
- let mut next_slice = vec![0; size as usize];
- read.read_exact(&mut next_slice[..]).await?;
- trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len());
-
- let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
- msg_bytes.extend_from_slice(&next_slice[..]);
-
- if has_cont {
- receiving.insert(id, msg_bytes);
- } else {
- self.recv_handler(id, msg_bytes);
- }
- }
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
-
- #[test]
- fn test_priority_queue() {
- let i1 = SendQueueItem {
- id: 1,
- prio: PRIO_NORMAL,
- data: vec![],
- cursor: 0,
- };
- let i2 = SendQueueItem {
- id: 2,
- prio: PRIO_HIGH,
- data: vec![],
- cursor: 0,
- };
- let i2bis = SendQueueItem {
- id: 20,
- prio: PRIO_HIGH,
- data: vec![],
- cursor: 0,
- };
- let i3 = SendQueueItem {
- id: 3,
- prio: PRIO_HIGH | PRIO_SECONDARY,
- data: vec![],
- cursor: 0,
- };
- let i4 = SendQueueItem {
- id: 4,
- prio: PRIO_BACKGROUND | PRIO_SECONDARY,
- data: vec![],
- cursor: 0,
- };
- let i5 = SendQueueItem {
- id: 5,
- prio: PRIO_BACKGROUND | PRIO_PRIMARY,
- data: vec![],
- cursor: 0,
- };
-
- let mut q = SendQueue::new();
-
- q.push(i1); // 1
- let a = q.pop().unwrap(); // empty -> 1
- assert_eq!(a.id, 1);
- assert!(q.pop().is_none());
-
- q.push(a); // 1
- q.push(i2); // 2 1
- q.push(i2bis); // [2 20] 1
- let a = q.pop().unwrap(); // 20 1 -> 2
- assert_eq!(a.id, 2);
- let b = q.pop().unwrap(); // 1 -> 20
- assert_eq!(b.id, 20);
- let c = q.pop().unwrap(); // empty -> 1
- assert_eq!(c.id, 1);
- assert!(q.pop().is_none());
-
- q.push(a); // 2
- q.push(b); // [2 20]
- q.push(c); // [2 20] 1
- q.push(i3); // [2 20] 3 1
- q.push(i4); // [2 20] 3 1 4
- q.push(i5); // [2 20] 3 1 5 4
-
- let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2
- assert_eq!(a.id, 2);
- q.push(a); // [20 2] 3 1 5 4
-
- let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20
- assert_eq!(a.id, 20);
- let b = q.pop().unwrap(); // 3 1 5 4 -> 2
- assert_eq!(b.id, 2);
- q.push(b); // 2 3 1 5 4
- let b = q.pop().unwrap(); // 3 1 5 4 -> 2
- assert_eq!(b.id, 2);
- let c = q.pop().unwrap(); // 1 5 4 -> 3
- assert_eq!(c.id, 3);
- q.push(b); // 2 1 5 4
- let b = q.pop().unwrap(); // 1 5 4 -> 2
- assert_eq!(b.id, 2);
- let e = q.pop().unwrap(); // 5 4 -> 1
- assert_eq!(e.id, 1);
- let f = q.pop().unwrap(); // 4 -> 5
- assert_eq!(f.id, 5);
- let g = q.pop().unwrap(); // empty -> 4
- assert_eq!(g.id, 4);
- assert!(q.pop().is_none());
- }
-}