diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 4 | ||||
-rw-r--r-- | src/util/async_hash.rs | 64 | ||||
-rw-r--r-- | src/util/lib.rs | 1 |
3 files changed, 68 insertions, 1 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 163c1b77..d5c194e8 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -19,6 +19,8 @@ garage_db = { version = "0.8.0", path = "../db" } arc-swap = "1.0" async-trait = "0.1" blake2 = "0.9" +bytes = "1.0" +digest = "0.10" err-derive = "0.3" git-version = "0.3.4" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } @@ -26,7 +28,7 @@ hex = "0.4" lazy_static = "1.4" tracing = "0.1.30" rand = "0.8" -sha2 = "0.9" +sha2 = "0.10" chrono = "0.4" rmp-serde = "0.15" diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..fa8ee7ff --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,64 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher<D: Digest> { + sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, + task: JoinHandle<digest::Output<D>>, +} + +impl<D: Digest> AsyncHasher<D> { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some((blk, ch)) = recvblk.blocking_recv() { + digest.update(&blk[..]); + let _ = ch.send(()); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub async fn update(&self, b: Bytes) { + let (tx, rx) = oneshot::channel(); + self.sendblk.send((b, tx)).unwrap(); + let _ = rx.await; + } + + pub async fn finalize(self) -> digest::Output<D> { + drop(self.sendblk); + self.task.await.unwrap() + } +} + +impl<D: Digest> Default for AsyncHasher<D> { + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index 47c85c3a..264cc192 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate tracing; +pub mod async_hash; pub mod background; pub mod config; pub mod crdt; |