From 1b2e1296eb99630e969e585ede0424072adc2d0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 17:18:47 +0200 Subject: Compute hashes on dedicated threads --- src/util/async_hash.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/util/async_hash.rs (limited to 'src/util/async_hash.rs') diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..67776eb9 --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,55 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher { + sendblk: mpsc::UnboundedSender, + task: JoinHandle>, +} + +impl AsyncHasher { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::unbounded_channel::(); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some(blk) = recvblk.blocking_recv() { + digest.update(&blk[..]); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub fn update(&self, b: Bytes) { + self.sendblk.send(b).unwrap() + } + + pub async fn finalize(self) -> digest::Output { + drop(self.sendblk); + self.task.await.unwrap() + } +} -- cgit v1.2.3 From 2f111e6b3d772b10c8ed6279ce0c82d22852afd1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 18:40:57 +0200 Subject: Performance improvements: - reduce contention on mutation_lock by having 256 of them - better lmdb defaults --- src/util/async_hash.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'src/util/async_hash.rs') diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs index 67776eb9..be0535de 100644 --- a/src/util/async_hash.rs +++ b/src/util/async_hash.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use digest::Digest; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use crate::data::*; @@ -27,25 +27,28 @@ pub async fn async_blake2sum(data: Bytes) -> Hash { // ---- pub struct AsyncHasher { - sendblk: mpsc::UnboundedSender, + sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, task: JoinHandle>, } impl AsyncHasher { pub fn new() -> Self { - let (sendblk, mut recvblk) = mpsc::unbounded_channel::(); + let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); let task = tokio::task::spawn_blocking(move || { let mut digest = D::new(); - while let Some(blk) = recvblk.blocking_recv() { + while let Some((blk, ch)) = recvblk.blocking_recv() { digest.update(&blk[..]); + let _ = ch.send(()); } digest.finalize() }); Self { sendblk, task } } - pub fn update(&self, b: Bytes) { - self.sendblk.send(b).unwrap() + pub async fn update(&self, b: Bytes) { + let (tx, rx) = oneshot::channel(); + self.sendblk.send((b, tx)).unwrap(); + let _ = rx.await; } pub async fn finalize(self) -> digest::Output { -- cgit v1.2.3 From 0176da3ad2aae9d18cb04feb452e0243cfb940fc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:37:20 +0200 Subject: Make clippy happy --- src/util/async_hash.rs | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/util/async_hash.rs') diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs index be0535de..fa8ee7ff 100644 --- a/src/util/async_hash.rs +++ b/src/util/async_hash.rs @@ -56,3 +56,9 @@ impl AsyncHasher { self.task.await.unwrap() } } + +impl Default for AsyncHasher { + fn default() -> Self { + Self::new() + } +} -- cgit v1.2.3 From f91fab8582728f176f446a4a2e039d22f752167b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Sep 2022 16:23:43 +0200 Subject: Simplify+improve async hasher by using bounded channel --- src/util/async_hash.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'src/util/async_hash.rs') diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs index fa8ee7ff..5631ea6b 100644 --- a/src/util/async_hash.rs +++ b/src/util/async_hash.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use digest::Digest; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio::task::JoinHandle; use crate::data::*; @@ -27,18 +27,17 @@ pub async fn async_blake2sum(data: Bytes) -> Hash { // ---- pub struct AsyncHasher { - sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, + sendblk: mpsc::Sender, task: JoinHandle>, } impl AsyncHasher { pub fn new() -> Self { - let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); + let (sendblk, mut recvblk) = mpsc::channel::(1); let task = tokio::task::spawn_blocking(move || { let mut digest = D::new(); - while let Some((blk, ch)) = recvblk.blocking_recv() { + while let Some(blk) = recvblk.blocking_recv() { digest.update(&blk[..]); - let _ = ch.send(()); } digest.finalize() }); @@ -46,9 +45,7 @@ impl AsyncHasher { } pub async fn update(&self, b: Bytes) { - let (tx, rx) = oneshot::channel(); - self.sendblk.send((b, tx)).unwrap(); - let _ = rx.await; + self.sendblk.send(b).await.unwrap(); } pub async fn finalize(self) -> digest::Output { -- cgit v1.2.3