aboutsummaryrefslogtreecommitdiff
path: root/src/util/async_hash.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-18 17:18:47 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:24:44 +0200
commit1b2e1296eb99630e969e585ede0424072adc2d0c (patch)
treee16b953e1f53e57c6efd11e688e955fa36cbb186 /src/util/async_hash.rs
parenta184f0d0b5f1d84524f7ba7302a4919567acec56 (diff)
downloadgarage-1b2e1296eb99630e969e585ede0424072adc2d0c.tar.gz
garage-1b2e1296eb99630e969e585ede0424072adc2d0c.zip
Compute hashes on dedicated threads
Diffstat (limited to 'src/util/async_hash.rs')
-rw-r--r--src/util/async_hash.rs55
1 files changed, 55 insertions, 0 deletions
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs
new file mode 100644
index 00000000..67776eb9
--- /dev/null
+++ b/src/util/async_hash.rs
@@ -0,0 +1,55 @@
+use bytes::Bytes;
+use digest::Digest;
+
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+use crate::data::*;
+
+/// Compute the sha256 of a slice,
+/// spawning on a tokio thread for CPU-intensive processing
+/// The argument has to be an owned Bytes, as it is moved out to a new thread.
+pub async fn async_sha256sum(data: Bytes) -> Hash {
+ tokio::task::spawn_blocking(move || sha256sum(&data))
+ .await
+ .unwrap()
+}
+
+/// Compute the blake2sum of a slice,
+/// spawning on a tokio thread for CPU-intensive processing.
+/// The argument has to be an owned Bytes, as it is moved out to a new thread.
+pub async fn async_blake2sum(data: Bytes) -> Hash {
+ tokio::task::spawn_blocking(move || blake2sum(&data))
+ .await
+ .unwrap()
+}
+
+// ----
+
+pub struct AsyncHasher<D: Digest> {
+ sendblk: mpsc::UnboundedSender<Bytes>,
+ task: JoinHandle<digest::Output<D>>,
+}
+
+impl<D: Digest> AsyncHasher<D> {
+ pub fn new() -> Self {
+ let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>();
+ let task = tokio::task::spawn_blocking(move || {
+ let mut digest = D::new();
+ while let Some(blk) = recvblk.blocking_recv() {
+ digest.update(&blk[..]);
+ }
+ digest.finalize()
+ });
+ Self { sendblk, task }
+ }
+
+ pub fn update(&self, b: Bytes) {
+ self.sendblk.send(b).unwrap()
+ }
+
+ pub async fn finalize(self) -> digest::Output<D> {
+ drop(self.sendblk);
+ self.task.await.unwrap()
+ }
+}