diff options
Diffstat (limited to 'src/proto.rs')
-rw-r--r-- | src/proto.rs | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/src/proto.rs b/src/proto.rs index 18e7c44..2db3f83 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -96,6 +96,14 @@ impl SendQueue { } } +/// The SendLoop trait, which is implemented both by the client and the server +/// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` +/// that takes a channel of messages to send and an asynchronous writer, +/// and sends messages from the channel to the async writer, putting them in a queue +/// before being sent and doing the round-robin sending strategy. +/// +/// The `.send_loop()` exits when the sending end of the channel is closed, +/// or if there is an error at any time writing to the async writer. #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop<W>( @@ -128,9 +136,9 @@ pub(crate) trait SendLoop: Sync { write.write_all(&header_id[..]).await?; if item.data.len() - item.cursor > MAX_CHUNK_LENGTH as usize { - let header_size = + let size_header = ChunkLength::to_be_bytes(MAX_CHUNK_LENGTH | CHUNK_HAS_CONTINUATION); - write.write_all(&header_size[..]).await?; + write.write_all(&size_header[..]).await?; let new_cursor = item.cursor + MAX_CHUNK_LENGTH as usize; write.write_all(&item.data[item.cursor..new_cursor]).await?; @@ -140,8 +148,8 @@ pub(crate) trait SendLoop: Sync { } else { let send_len = (item.data.len() - item.cursor) as ChunkLength; - let header_size = ChunkLength::to_be_bytes(send_len); - write.write_all(&header_size[..]).await?; + let size_header = ChunkLength::to_be_bytes(send_len); + write.write_all(&size_header[..]).await?; write.write_all(&item.data[item.cursor..]).await?; } @@ -166,9 +174,15 @@ pub(crate) trait SendLoop: Sync { } } +/// The RecvLoop trait, which is implemented both by the client and the server +/// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()` +/// and a prototype of a handler for received messages `.recv_handler()` that +/// must be filled by implementors. `.recv_loop()` receives messages in a loop +/// according to the protocol defined above: chunks of message in progress of being +/// received are stored in a buffer, and when the last chunk of a message is received, +/// the full message is passed to the receive handler. #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { - // Returns true if we should stop receiving after this fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>); async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> |