From 1b2e1296eb99630e969e585ede0424072adc2d0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 17:18:47 +0200 Subject: Compute hashes on dedicated threads --- src/util/Cargo.toml | 4 +++- src/util/async_hash.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/util/lib.rs | 1 + 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 src/util/async_hash.rs (limited to 'src/util') diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 57c70ffb..7d79f21a 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -18,12 +18,14 @@ garage_db = { version = "0.8.0", path = "../db" } async-trait = "0.1" blake2 = "0.9" +bytes = "1.0" +digest = "0.10" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" tracing = "0.1.30" rand = "0.8" -sha2 = "0.9" +sha2 = "0.10" chrono = "0.4" rmp-serde = "0.15" diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..67776eb9 --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,55 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher { + sendblk: mpsc::UnboundedSender, + task: JoinHandle>, +} + +impl AsyncHasher { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::unbounded_channel::(); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some(blk) = recvblk.blocking_recv() { + digest.update(&blk[..]); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub fn update(&self, b: Bytes) { + self.sendblk.send(b).unwrap() + } + + pub async fn finalize(self) -> digest::Output { + drop(self.sendblk); + self.task.await.unwrap() + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index fce151af..7152f92a 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate tracing; +pub mod async_hash; pub mod background; pub mod config; pub mod crdt; -- cgit v1.2.3