From f91fab8582728f176f446a4a2e039d22f752167b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Sep 2022 16:23:43 +0200 Subject: Simplify+improve async hasher by using bounded channel --- src/util/async_hash.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'src/util') diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs index fa8ee7ff..5631ea6b 100644 --- a/src/util/async_hash.rs +++ b/src/util/async_hash.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use digest::Digest; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio::task::JoinHandle; use crate::data::*; @@ -27,18 +27,17 @@ pub async fn async_blake2sum(data: Bytes) -> Hash { // ---- pub struct AsyncHasher { - sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, + sendblk: mpsc::Sender, task: JoinHandle>, } impl AsyncHasher { pub fn new() -> Self { - let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); + let (sendblk, mut recvblk) = mpsc::channel::(1); let task = tokio::task::spawn_blocking(move || { let mut digest = D::new(); - while let Some((blk, ch)) = recvblk.blocking_recv() { + while let Some(blk) = recvblk.blocking_recv() { digest.update(&blk[..]); - let _ = ch.send(()); } digest.finalize() }); @@ -46,9 +45,7 @@ impl AsyncHasher { } pub async fn update(&self, b: Bytes) { - let (tx, rx) = oneshot::channel(); - self.sendblk.send((b, tx)).unwrap(); - let _ = rx.await; + self.sendblk.send(b).await.unwrap(); } pub async fn finalize(self) -> digest::Output { -- cgit v1.2.3