aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 16:10:38 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 16:11:42 +0200
commitb82ad70dd5d5e7ce9102f63fec37396dbda8de08 (patch)
tree5f651636bf9b2c8012f61c8a9565008239a7a47c
parentb931d0d1cfb39d5feae1d4e0a7a49cdebd45761b (diff)
downloadnetapp-b82ad70dd5d5e7ce9102f63fec37396dbda8de08.tar.gz
netapp-b82ad70dd5d5e7ce9102f63fec37396dbda8de08.zip
Correctly defuse cancellation on simple requests
-rw-r--r--src/message.rs2
-rw-r--r--src/stream.rs34
2 files changed, 24 insertions, 12 deletions
diff --git a/src/message.rs b/src/message.rs
index 1834f28..ec9433a 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -454,6 +454,8 @@ impl RespEnc {
let msg_len = reader.read_u32().await?;
let msg = reader.read_exact(msg_len as usize).await?;
+ reader.fill_buffer().await;
+
Ok(Self {
msg,
stream: Some(reader.into_stream()),
diff --git a/src/stream.rs b/src/stream.rs
index efa0ebc..05ee051 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -95,6 +95,26 @@ impl ByteStreamReader {
fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
self.buf.take_exact(read_len)
}
+
+ fn add_stream_next(&mut self, packet: Option<Packet>) {
+ match packet {
+ Some(Ok(slice)) => {
+ self.buf.extend(slice);
+ }
+ Some(Err(e)) => {
+ self.err = Some(e);
+ self.eos = true;
+ }
+ None => {
+ self.eos = true;
+ }
+ }
+ }
+
+ pub async fn fill_buffer(&mut self) {
+ let packet = self.stream.next().await;
+ self.add_stream_next(packet);
+ }
}
pub enum ReadExactError {
@@ -132,18 +152,8 @@ impl<'a> Future for ByteStreamReadExact<'a> {
}
}
- match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) {
- Some(Ok(slice)) => {
- this.reader.buf.extend(slice);
- }
- Some(Err(e)) => {
- this.reader.err = Some(e);
- this.reader.eos = true;
- }
- None => {
- this.reader.eos = true;
- }
- }
+ let next_packet = futures::ready!(this.reader.stream.as_mut().poll_next(cx));
+ this.reader.add_stream_next(next_packet);
}
}
}