diff options
author | Alex <alex@adnab.me> | 2022-09-13 12:56:53 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-09-13 12:56:53 +0200 |
commit | 8ac109e3a84bd34550d66baf65fe59b86b63bca2 (patch) | |
tree | a49a199a1049d18afaa60f47f46e04cb798aa4b2 /src/proto.rs | |
parent | a82700c5a27612002e6ee029ae77915b8114182f (diff) | |
parent | 298e956a199711b65ce3820931ca943108b78225 (diff) | |
download | netapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.tar.gz netapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.zip |
Merge pull request 'add streaming body to requests and responses' (#3) from stream-body into mainv0.5.0
Reviewed-on: https://git.deuxfleurs.fr/lx/netapp/pulls/3
Diffstat (limited to 'src/proto.rs')
-rw-r--r-- | src/proto.rs | 358 |
1 files changed, 0 insertions, 358 deletions
diff --git a/src/proto.rs b/src/proto.rs deleted file mode 100644 index 8f7e70f..0000000 --- a/src/proto.rs +++ /dev/null @@ -1,358 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::fmt::Write; -use std::sync::Arc; - -use log::trace; - -use futures::{AsyncReadExt, AsyncWriteExt}; -use kuska_handshake::async_std::BoxStreamWrite; - -use tokio::sync::mpsc; - -use async_trait::async_trait; - -use crate::error::*; - -/// Priority of a request (click to read more about priorities). -/// -/// This priority value is used to priorize messages -/// in the send queue of the client, and their responses in the send queue of the -/// server. Lower values mean higher priority. -/// -/// This mechanism is usefull for messages bigger than the maximum chunk size -/// (set at `0x4000` bytes), such as large file transfers. -/// In such case, all of the messages in the send queue with the highest priority -/// will take turns to send individual chunks, in a round-robin fashion. -/// Once all highest priority messages are sent successfully, the messages with -/// the next highest priority will begin being sent in the same way. -/// -/// The same priority value is given to a request and to its associated response. -pub type RequestPriority = u8; - -/// Priority class: high -pub const PRIO_HIGH: RequestPriority = 0x20; -/// Priority class: normal -pub const PRIO_NORMAL: RequestPriority = 0x40; -/// Priority class: background -pub const PRIO_BACKGROUND: RequestPriority = 0x80; -/// Priority: primary among given class -pub const PRIO_PRIMARY: RequestPriority = 0x00; -/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) -pub const PRIO_SECONDARY: RequestPriority = 0x01; - -// Messages are sent by chunks -// Chunk format: -// - u32 BE: request id (same for request and response) -// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag -// when this is not the last chunk of the message -// - [u8; chunk_length] chunk data - -pub(crate) type RequestID = u32; -type ChunkLength = u16; -const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; -const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; - -struct SendQueueItem { - id: RequestID, - prio: RequestPriority, - data: Vec<u8>, - cursor: usize, -} - -struct SendQueue { - items: VecDeque<(u8, VecDeque<SendQueueItem>)>, -} - -impl SendQueue { - fn new() -> Self { - Self { - items: VecDeque::with_capacity(64), - } - } - fn push(&mut self, item: SendQueueItem) { - let prio = item.prio; - let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { - Ok(i) => i, - Err(i) => { - self.items.insert(i, (prio, VecDeque::new())); - i - } - }; - self.items[pos_prio].1.push_back(item); - } - fn pop(&mut self) -> Option<SendQueueItem> { - match self.items.pop_front() { - None => None, - Some((prio, mut items_at_prio)) => { - let ret = items_at_prio.pop_front(); - if !items_at_prio.is_empty() { - self.items.push_front((prio, items_at_prio)); - } - ret.or_else(|| self.pop()) - } - } - } - fn is_empty(&self) -> bool { - self.items.iter().all(|(_k, v)| v.is_empty()) - } - fn dump(&self) -> String { - let mut ret = String::new(); - for (prio, q) in self.items.iter() { - for item in q.iter() { - write!( - &mut ret, - " [{} {} ({})]", - prio, - item.data.len() - item.cursor, - item.id - ) - .unwrap(); - } - } - ret - } -} - -/// The SendLoop trait, which is implemented both by the client and the server -/// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` -/// that takes a channel of messages to send and an asynchronous writer, -/// and sends messages from the channel to the async writer, putting them in a queue -/// before being sent and doing the round-robin sending strategy. -/// -/// The `.send_loop()` exits when the sending end of the channel is closed, -/// or if there is an error at any time writing to the async writer. -#[async_trait] -pub(crate) trait SendLoop: Sync { - async fn send_loop<W>( - self: Arc<Self>, - mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>, - mut write: BoxStreamWrite<W>, - debug_name: String, - ) -> Result<(), Error> - where - W: AsyncWriteExt + Unpin + Send + Sync, - { - let mut sending = SendQueue::new(); - let mut should_exit = false; - while !should_exit || !sending.is_empty() { - trace!("send_loop({}): queue = {}", debug_name, sending.dump()); - if let Ok((id, prio, data)) = msg_recv.try_recv() { - trace!( - "send_loop({}): new message to send, id = {}, prio = {}, {} bytes", - debug_name, - id, - prio, - data.len() - ); - sending.push(SendQueueItem { - id, - prio, - data, - cursor: 0, - }); - } else if let Some(mut item) = sending.pop() { - trace!( - "send_loop({}): sending bytes for {} ({} bytes, {} already sent)", - debug_name, - item.id, - item.data.len(), - item.cursor - ); - let header_id = RequestID::to_be_bytes(item.id); - write.write_all(&header_id[..]).await?; - - if item.data.len() - item.cursor > MAX_CHUNK_LENGTH as usize { - let size_header = - ChunkLength::to_be_bytes(MAX_CHUNK_LENGTH | CHUNK_HAS_CONTINUATION); - write.write_all(&size_header[..]).await?; - - let new_cursor = item.cursor + MAX_CHUNK_LENGTH as usize; - write.write_all(&item.data[item.cursor..new_cursor]).await?; - item.cursor = new_cursor; - - sending.push(item); - } else { - let send_len = (item.data.len() - item.cursor) as ChunkLength; - - let size_header = ChunkLength::to_be_bytes(send_len); - write.write_all(&size_header[..]).await?; - - write.write_all(&item.data[item.cursor..]).await?; - } - write.flush().await?; - } else { - let sth = msg_recv.recv().await; - if let Some((id, prio, data)) = sth { - trace!( - "send_loop({}): new message to send, id = {}, prio = {}, {} bytes", - debug_name, - id, - prio, - data.len() - ); - sending.push(SendQueueItem { - id, - prio, - data, - cursor: 0, - }); - } else { - should_exit = true; - } - } - } - - let _ = write.goodbye().await; - Ok(()) - } -} - -/// The RecvLoop trait, which is implemented both by the client and the server -/// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()` -/// and a prototype of a handler for received messages `.recv_handler()` that -/// must be filled by implementors. `.recv_loop()` receives messages in a loop -/// according to the protocol defined above: chunks of message in progress of being -/// received are stored in a buffer, and when the last chunk of a message is received, -/// the full message is passed to the receive handler. -#[async_trait] -pub(crate) trait RecvLoop: Sync + 'static { - fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>); - - async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error> - where - R: AsyncReadExt + Unpin + Send + Sync, - { - let mut receiving = HashMap::new(); - loop { - let mut header_id = [0u8; RequestID::BITS as usize / 8]; - match read.read_exact(&mut header_id[..]).await { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, - Err(e) => return Err(e.into()), - }; - let id = RequestID::from_be_bytes(header_id); - - let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; - read.read_exact(&mut header_size[..]).await?; - let size = ChunkLength::from_be_bytes(header_size); - trace!( - "recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)", - debug_name, - id, - size, - size & !CHUNK_HAS_CONTINUATION - ); - - let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; - let size = size & !CHUNK_HAS_CONTINUATION; - - let mut next_slice = vec![0; size as usize]; - read.read_exact(&mut next_slice[..]).await?; - trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len()); - - let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); - msg_bytes.extend_from_slice(&next_slice[..]); - - if has_cont { - receiving.insert(id, msg_bytes); - } else { - self.recv_handler(id, msg_bytes); - } - } - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_priority_queue() { - let i1 = SendQueueItem { - id: 1, - prio: PRIO_NORMAL, - data: vec![], - cursor: 0, - }; - let i2 = SendQueueItem { - id: 2, - prio: PRIO_HIGH, - data: vec![], - cursor: 0, - }; - let i2bis = SendQueueItem { - id: 20, - prio: PRIO_HIGH, - data: vec![], - cursor: 0, - }; - let i3 = SendQueueItem { - id: 3, - prio: PRIO_HIGH | PRIO_SECONDARY, - data: vec![], - cursor: 0, - }; - let i4 = SendQueueItem { - id: 4, - prio: PRIO_BACKGROUND | PRIO_SECONDARY, - data: vec![], - cursor: 0, - }; - let i5 = SendQueueItem { - id: 5, - prio: PRIO_BACKGROUND | PRIO_PRIMARY, - data: vec![], - cursor: 0, - }; - - let mut q = SendQueue::new(); - - q.push(i1); // 1 - let a = q.pop().unwrap(); // empty -> 1 - assert_eq!(a.id, 1); - assert!(q.pop().is_none()); - - q.push(a); // 1 - q.push(i2); // 2 1 - q.push(i2bis); // [2 20] 1 - let a = q.pop().unwrap(); // 20 1 -> 2 - assert_eq!(a.id, 2); - let b = q.pop().unwrap(); // 1 -> 20 - assert_eq!(b.id, 20); - let c = q.pop().unwrap(); // empty -> 1 - assert_eq!(c.id, 1); - assert!(q.pop().is_none()); - - q.push(a); // 2 - q.push(b); // [2 20] - q.push(c); // [2 20] 1 - q.push(i3); // [2 20] 3 1 - q.push(i4); // [2 20] 3 1 4 - q.push(i5); // [2 20] 3 1 5 4 - - let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2 - assert_eq!(a.id, 2); - q.push(a); // [20 2] 3 1 5 4 - - let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20 - assert_eq!(a.id, 20); - let b = q.pop().unwrap(); // 3 1 5 4 -> 2 - assert_eq!(b.id, 2); - q.push(b); // 2 3 1 5 4 - let b = q.pop().unwrap(); // 3 1 5 4 -> 2 - assert_eq!(b.id, 2); - let c = q.pop().unwrap(); // 1 5 4 -> 3 - assert_eq!(c.id, 3); - q.push(b); // 2 1 5 4 - let b = q.pop().unwrap(); // 1 5 4 -> 2 - assert_eq!(b.id, 2); - let e = q.pop().unwrap(); // 5 4 -> 1 - assert_eq!(e.id, 1); - let f = q.pop().unwrap(); // 4 -> 5 - assert_eq!(f.id, 5); - let g = q.pop().unwrap(); // empty -> 4 - assert_eq!(g.id, 4); - assert!(q.pop().is_none()); - } -} |