aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-12 16:23:43 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-12 16:23:43 +0200
commitf91fab8582728f176f446a4a2e039d22f752167b (patch)
tree448704ae3f07a10f86fcb5d40347ad70cdd81498
parentd9d199a6c9c0ae2a6ee2b04103c78ef1eb311956 (diff)
downloadgarage-lx-perf-improvements.tar.gz
garage-lx-perf-improvements.zip
Simplify+improve async hasher by using bounded channellx-perf-improvements
-rw-r--r--src/util/async_hash.rs13
1 files changed, 5 insertions, 8 deletions
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs
index fa8ee7ff..5631ea6b 100644
--- a/src/util/async_hash.rs
+++ b/src/util/async_hash.rs
@@ -1,7 +1,7 @@
use bytes::Bytes;
use digest::Digest;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::data::*;
@@ -27,18 +27,17 @@ pub async fn async_blake2sum(data: Bytes) -> Hash {
// ----
pub struct AsyncHasher<D: Digest> {
- sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>,
+ sendblk: mpsc::Sender<Bytes>,
task: JoinHandle<digest::Output<D>>,
}
impl<D: Digest> AsyncHasher<D> {
pub fn new() -> Self {
- let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>();
+ let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1);
let task = tokio::task::spawn_blocking(move || {
let mut digest = D::new();
- while let Some((blk, ch)) = recvblk.blocking_recv() {
+ while let Some(blk) = recvblk.blocking_recv() {
digest.update(&blk[..]);
- let _ = ch.send(());
}
digest.finalize()
});
@@ -46,9 +45,7 @@ impl<D: Digest> AsyncHasher<D> {
}
pub async fn update(&self, b: Bytes) {
- let (tx, rx) = oneshot::channel();
- self.sendblk.send((b, tx)).unwrap();
- let _ = rx.await;
+ self.sendblk.send(b).await.unwrap();
}
pub async fn finalize(self) -> digest::Output<D> {