aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authortrinity-1686a <trinity@deuxfleurs.fr>2022-06-05 15:33:43 +0200
committertrinity-1686a <trinity@deuxfleurs.fr>2022-06-05 15:33:43 +0200
commit368ba908794901bc793c6a087c02241be046bdf2 (patch)
tree389910f1e1476c9531a01d2e53060e1056cca266 /src/server.rs
parent648e015e3a73b96973343e0a1f861c9ea41cc24d (diff)
downloadnetapp-368ba908794901bc793c6a087c02241be046bdf2.tar.gz
netapp-368ba908794901bc793c6a087c02241be046bdf2.zip
initial work on associated stream
still require testing, and fixing a few kinks: - sending packets > 16k truncate them - send one more packet than it could at eos - probably update documentation /!\ contains breaking changes
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs38
1 files changed, 26 insertions, 12 deletions
diff --git a/src/server.rs b/src/server.rs
index 5465307..6cd4056 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -55,7 +55,7 @@ pub(crate) struct ServerConn {
netapp: Arc<NetApp>,
- resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>>,
+ resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Data)>>,
}
impl ServerConn {
@@ -123,7 +123,11 @@ impl ServerConn {
Ok(())
}
- async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> {
+ async fn recv_handler_aux(
+ self: &Arc<Self>,
+ bytes: &[u8],
+ stream: AssociatedStream,
+ ) -> Result<(Vec<u8>, Option<AssociatedStream>), Error> {
let msg = QueryMessage::decode(bytes)?;
let path = String::from_utf8(msg.path.to_vec())?;
@@ -156,11 +160,11 @@ impl ServerConn {
span.set_attribute(KeyValue::new("path", path.to_string()));
span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64));
- handler.handle(msg.body, self.peer_id)
+ handler.handle(msg.body, stream, self.peer_id)
.with_context(Context::current_with_span(span))
.await
} else {
- handler.handle(msg.body, self.peer_id).await
+ handler.handle(msg.body, stream, self.peer_id).await
}
}
} else {
@@ -173,7 +177,7 @@ impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>) {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>, stream: AssociatedStream) {
let resp_send = self.resp_send.load_full().unwrap();
let self2 = self.clone();
@@ -182,26 +186,36 @@ impl RecvLoop for ServerConn {
let bytes: Bytes = bytes.into();
let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 };
- let resp = self2.recv_handler_aux(&bytes[..]).await;
+ let resp = self2.recv_handler_aux(&bytes[..], stream).await;
- let resp_bytes = match resp {
- Ok(rb) => {
+ let (resp_bytes, resp_stream) = match resp {
+ Ok((rb, rs)) => {
let mut resp_bytes = vec![0u8];
resp_bytes.extend(rb);
- resp_bytes
+ (resp_bytes, rs)
}
Err(e) => {
let mut resp_bytes = vec![e.code()];
resp_bytes.extend(e.to_string().into_bytes());
- resp_bytes
+ (resp_bytes, None)
}
};
trace!("ServerConn sending response to {}: ", id);
resp_send
- .send((id, prio, resp_bytes))
- .log_err("ServerConn recv_handler send resp");
+ .send((id, prio, Data::Full(resp_bytes)))
+ .log_err("ServerConn recv_handler send resp bytes");
+
+ if let Some(resp_stream) = resp_stream {
+ resp_send
+ .send((id + 1, prio, Data::Streaming(resp_stream)))
+ .log_err("ServerConn recv_handler send resp stream");
+ } else {
+ resp_send
+ .send((id + 1, prio, Data::Full(Vec::new())))
+ .log_err("ServerConn recv_handler send resp stream");
+ }
});
}
}