diff options
Diffstat (limited to 'src/util/async_hash.rs')
-rw-r--r-- | src/util/async_hash.rs | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs index 67776eb9..be0535de 100644 --- a/src/util/async_hash.rs +++ b/src/util/async_hash.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use digest::Digest; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use crate::data::*; @@ -27,25 +27,28 @@ pub async fn async_blake2sum(data: Bytes) -> Hash { // ---- pub struct AsyncHasher<D: Digest> { - sendblk: mpsc::UnboundedSender<Bytes>, + sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, task: JoinHandle<digest::Output<D>>, } impl<D: Digest> AsyncHasher<D> { pub fn new() -> Self { - let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>(); + let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); let task = tokio::task::spawn_blocking(move || { let mut digest = D::new(); - while let Some(blk) = recvblk.blocking_recv() { + while let Some((blk, ch)) = recvblk.blocking_recv() { digest.update(&blk[..]); + let _ = ch.send(()); } digest.finalize() }); Self { sendblk, task } } - pub fn update(&self, b: Bytes) { - self.sendblk.send(b).unwrap() + pub async fn update(&self, b: Bytes) { + let (tx, rx) = oneshot::channel(); + self.sendblk.send((b, tx)).unwrap(); + let _ = rx.await; } pub async fn finalize(self) -> digest::Output<D> { |