diff options
author | trinity-1686a <trinity@deuxfleurs.fr> | 2022-06-05 15:33:43 +0200 |
---|---|---|
committer | trinity-1686a <trinity@deuxfleurs.fr> | 2022-06-05 15:33:43 +0200 |
commit | 368ba908794901bc793c6a087c02241be046bdf2 (patch) | |
tree | 389910f1e1476c9531a01d2e53060e1056cca266 /src/server.rs | |
parent | 648e015e3a73b96973343e0a1f861c9ea41cc24d (diff) | |
download | netapp-368ba908794901bc793c6a087c02241be046bdf2.tar.gz netapp-368ba908794901bc793c6a087c02241be046bdf2.zip |
initial work on associated stream
still require testing, and fixing a few kinks:
- sending packets > 16k truncate them
- send one more packet than it could at eos
- probably update documentation
/!\ contains breaking changes
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 38 |
1 files changed, 26 insertions, 12 deletions
diff --git a/src/server.rs b/src/server.rs index 5465307..6cd4056 100644 --- a/src/server.rs +++ b/src/server.rs @@ -55,7 +55,7 @@ pub(crate) struct ServerConn { netapp: Arc<NetApp>, - resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>>, + resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Data)>>, } impl ServerConn { @@ -123,7 +123,11 @@ impl ServerConn { Ok(()) } - async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { + async fn recv_handler_aux( + self: &Arc<Self>, + bytes: &[u8], + stream: AssociatedStream, + ) -> Result<(Vec<u8>, Option<AssociatedStream>), Error> { let msg = QueryMessage::decode(bytes)?; let path = String::from_utf8(msg.path.to_vec())?; @@ -156,11 +160,11 @@ impl ServerConn { span.set_attribute(KeyValue::new("path", path.to_string())); span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64)); - handler.handle(msg.body, self.peer_id) + handler.handle(msg.body, stream, self.peer_id) .with_context(Context::current_with_span(span)) .await } else { - handler.handle(msg.body, self.peer_id).await + handler.handle(msg.body, stream, self.peer_id).await } } } else { @@ -173,7 +177,7 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>) { + fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>, stream: AssociatedStream) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); @@ -182,26 +186,36 @@ impl RecvLoop for ServerConn { let bytes: Bytes = bytes.into(); let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; - let resp = self2.recv_handler_aux(&bytes[..]).await; + let resp = self2.recv_handler_aux(&bytes[..], stream).await; - let resp_bytes = match resp { - Ok(rb) => { + let (resp_bytes, resp_stream) = match resp { + Ok((rb, rs)) => { let mut resp_bytes = vec![0u8]; resp_bytes.extend(rb); - resp_bytes + (resp_bytes, rs) } Err(e) => { let mut resp_bytes = vec![e.code()]; resp_bytes.extend(e.to_string().into_bytes()); - resp_bytes + (resp_bytes, None) } }; trace!("ServerConn sending response to {}: ", id); resp_send - .send((id, prio, resp_bytes)) - .log_err("ServerConn recv_handler send resp"); + .send((id, prio, Data::Full(resp_bytes))) + .log_err("ServerConn recv_handler send resp bytes"); + + if let Some(resp_stream) = resp_stream { + resp_send + .send((id + 1, prio, Data::Streaming(resp_stream))) + .log_err("ServerConn recv_handler send resp stream"); + } else { + resp_send + .send((id + 1, prio, Data::Full(Vec::new()))) + .log_err("ServerConn recv_handler send resp stream"); + } }); } } |