diff options
-rw-r--r-- | src/client.rs | 9 | ||||
-rw-r--r-- | src/recv.rs | 19 | ||||
-rw-r--r-- | src/send.rs | 46 | ||||
-rw-r--r-- | src/server.rs | 7 | ||||
-rw-r--r-- | src/stream.rs | 8 |
5 files changed, 63 insertions, 26 deletions
diff --git a/src/client.rs b/src/client.rs index d51236b..2fccdb8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -179,10 +179,9 @@ impl ClientConn { }))); } - trace!( - "request: query_send {} (serialized message: {} bytes)", - id, - req_msg_len + debug!( + "request: query_send {}, path {}, prio {} (serialized message: {} bytes)", + id, path, prio, req_msg_len ); #[cfg(feature = "telemetry")] @@ -201,7 +200,7 @@ impl ClientConn { } let resp_enc = RespEnc::decode(stream).await?; - trace!("request response {}", id); + debug!("client: got response to request {} (path {})", id, path); Resp::from_enc(resp_enc) } } diff --git a/src/recv.rs b/src/recv.rs index e748f18..cba42cb 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -59,7 +59,6 @@ pub(crate) trait RecvLoop: Sync + 'static { { let mut streams: HashMap<RequestID, Sender> = HashMap::new(); loop { - trace!("recv_loop: reading packet"); let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), @@ -67,22 +66,31 @@ pub(crate) trait RecvLoop: Sync + 'static { Err(e) => return Err(e.into()), }; let id = RequestID::from_be_bytes(header_id); - trace!("recv_loop: got header id: {:04x}", id); let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); - trace!("recv_loop: got header size: {:04x}", size); let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let is_error = (size & ERROR_MARKER) != 0; let packet = if is_error { + trace!( + "recv_loop: got id {}, header_size {:04x}, error {}", + id, + size, + size & !ERROR_MARKER + ); Err((size & !ERROR_MARKER) as u8) } else { let size = size & !CHUNK_HAS_CONTINUATION; let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; - trace!("recv_loop: read {} bytes", next_slice.len()); + trace!( + "recv_loop: got id {}, header_size {:04x}, {} bytes", + id, + size, + next_slice.len() + ); Ok(Bytes::from(next_slice)) }; @@ -90,6 +98,7 @@ pub(crate) trait RecvLoop: Sync + 'static { send } else { let (send, recv) = mpsc::channel(4); + trace!("recv_loop: id {} is new channel", id); self.recv_handler( id, Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)), @@ -102,8 +111,10 @@ pub(crate) trait RecvLoop: Sync + 'static { let _ = sender.send(packet).await; if has_cont { + assert!(!is_error); streams.insert(id, sender); } else { + trace!("recv_loop: close channel id {}", id); sender.end(); } } diff --git a/src/send.rs b/src/send.rs index 46c4383..256fe4c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -74,36 +74,54 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { type Output = (RequestID, DataFrame); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { - for i in 0..self.queue.items.len() { - let (_prio, items_at_prio) = &mut self.queue.items[i]; - + for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() { let mut ready_item = None; for (j, item) in items_at_prio.iter_mut().enumerate() { let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); match Pin::new(&mut item_reader).poll(ctx) { Poll::Pending => (), Poll::Ready(ready_v) => { - ready_item = Some((j, ready_v, item.data.eos())); + ready_item = Some((j, ready_v)); break; } } } - if let Some((j, bytes_or_err, eos)) = ready_item { + if let Some((j, bytes_or_err)) = ready_item { + let item = items_at_prio.remove(j).unwrap(); + let id = item.id; + let eos = item.data.eos(); + let data_frame = match bytes_or_err { - Ok(bytes) => DataFrame::Data(bytes, !eos), + Ok(bytes) => { + trace!( + "send queue poll next ready: id {} eos {:?} bytes {}", + id, + eos, + bytes.len() + ); + DataFrame::Data(bytes, !eos) + } Err(e) => DataFrame::Error(match e { - ReadExactError::Stream(code) => code, + ReadExactError::Stream(code) => { + trace!( + "send queue poll next ready: id {} eos {:?} ERROR {}", + id, + eos, + code + ); + code + } _ => unreachable!(), }), }; - let item = items_at_prio.remove(j).unwrap(); - let id = item.id; - if !eos { + + if !eos && !matches!(data_frame, DataFrame::Error(_)) { items_at_prio.push_back(item); } else if items_at_prio.is_empty() { self.queue.items.remove(i); } + return Poll::Ready((id, data_frame)); } } @@ -173,6 +191,7 @@ pub(crate) trait SendLoop: Sync { match futures::future::select(recv_fut, send_fut).await { Either::Left((sth, _send_fut)) => { if let Some((id, prio, data)) = sth { + trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { id, prio, @@ -183,7 +202,12 @@ pub(crate) trait SendLoop: Sync { }; } Either::Right(((id, data), _recv_fut)) => { - trace!("send_loop: sending bytes for {}", id); + trace!( + "send_loop: id {}, send {} bytes, header_size {}", + id, + data.data().len(), + hex::encode(data.header()) + ); let header_id = RequestID::to_be_bytes(id); write.write_all(&header_id[..]).await?; diff --git a/src/server.rs b/src/server.rs index 4b232af..57062d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use log::{debug, trace}; +use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::{handshake_server, BoxStream}; @@ -175,7 +175,8 @@ impl RecvLoop for ServerConn { let self2 = self.clone(); tokio::spawn(async move { - trace!("ServerConn recv_handler {}", id); + debug!("server: recv_handler got {}", id); + let (prio, resp_enc) = match ReqEnc::decode(stream).await { Ok(req_enc) => { let prio = req_enc.prio; @@ -192,7 +193,7 @@ impl RecvLoop for ServerConn { Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), }; - trace!("ServerConn sending response to {}: ", id); + debug!("server: sending response to {}", id); resp_send .send((id, prio, resp_enc.encode())) diff --git a/src/stream.rs b/src/stream.rs index beb6b9c..5ba2ed4 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -93,7 +93,7 @@ impl ByteStreamReader { } pub fn eos(&self) -> bool { - self.buf.is_empty() && self.eos + self.buf_len == 0 && self.eos } fn try_get(&mut self, read_len: usize) -> Option<Bytes> { @@ -164,8 +164,10 @@ impl<'a> Future for ByteStreamReadExact<'a> { match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { Some(Ok(slice)) => { - this.reader.buf_len += slice.len(); - this.reader.buf.push_back(slice); + if !slice.is_empty() { + this.reader.buf_len += slice.len(); + this.reader.buf.push_back(slice); + } } Some(Err(e)) => { this.reader.err = Some(e); |