diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-01 16:10:38 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-01 16:11:42 +0200 |
commit | b82ad70dd5d5e7ce9102f63fec37396dbda8de08 (patch) | |
tree | 5f651636bf9b2c8012f61c8a9565008239a7a47c /src/stream.rs | |
parent | b931d0d1cfb39d5feae1d4e0a7a49cdebd45761b (diff) | |
download | netapp-b82ad70dd5d5e7ce9102f63fec37396dbda8de08.tar.gz netapp-b82ad70dd5d5e7ce9102f63fec37396dbda8de08.zip |
Correctly defuse cancellation on simple requests
Diffstat (limited to 'src/stream.rs')
-rw-r--r-- | src/stream.rs | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/src/stream.rs b/src/stream.rs index efa0ebc..05ee051 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -95,6 +95,26 @@ impl ByteStreamReader { fn try_get(&mut self, read_len: usize) -> Option<Bytes> { self.buf.take_exact(read_len) } + + fn add_stream_next(&mut self, packet: Option<Packet>) { + match packet { + Some(Ok(slice)) => { + self.buf.extend(slice); + } + Some(Err(e)) => { + self.err = Some(e); + self.eos = true; + } + None => { + self.eos = true; + } + } + } + + pub async fn fill_buffer(&mut self) { + let packet = self.stream.next().await; + self.add_stream_next(packet); + } } pub enum ReadExactError { @@ -132,18 +152,8 @@ impl<'a> Future for ByteStreamReadExact<'a> { } } - match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { - Some(Ok(slice)) => { - this.reader.buf.extend(slice); - } - Some(Err(e)) => { - this.reader.err = Some(e); - this.reader.eos = true; - } - None => { - this.reader.eos = true; - } - } + let next_packet = futures::ready!(this.reader.stream.as_mut().poll_next(cx)); + this.reader.add_stream_next(next_packet); } } } |