aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-18 18:40:57 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:24:48 +0200
commit2f111e6b3d772b10c8ed6279ce0c82d22852afd1 (patch)
treec9f4673aa34c12bda389b523753968a9bb92e0ee /src/util
parent1b2e1296eb99630e969e585ede0424072adc2d0c (diff)
downloadgarage-2f111e6b3d772b10c8ed6279ce0c82d22852afd1.tar.gz
garage-2f111e6b3d772b10c8ed6279ce0c82d22852afd1.zip
Performance improvements:
- reduce contention on mutation_lock by having 256 of them - better lmdb defaults
Diffstat (limited to 'src/util')
-rw-r--r--src/util/async_hash.rs15
1 files changed, 9 insertions, 6 deletions
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs
index 67776eb9..be0535de 100644
--- a/src/util/async_hash.rs
+++ b/src/util/async_hash.rs
@@ -1,7 +1,7 @@
use bytes::Bytes;
use digest::Digest;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use crate::data::*;
@@ -27,25 +27,28 @@ pub async fn async_blake2sum(data: Bytes) -> Hash {
// ----
pub struct AsyncHasher<D: Digest> {
- sendblk: mpsc::UnboundedSender<Bytes>,
+ sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>,
task: JoinHandle<digest::Output<D>>,
}
impl<D: Digest> AsyncHasher<D> {
pub fn new() -> Self {
- let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>();
+ let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>();
let task = tokio::task::spawn_blocking(move || {
let mut digest = D::new();
- while let Some(blk) = recvblk.blocking_recv() {
+ while let Some((blk, ch)) = recvblk.blocking_recv() {
digest.update(&blk[..]);
+ let _ = ch.send(());
}
digest.finalize()
});
Self { sendblk, task }
}
- pub fn update(&self, b: Bytes) {
- self.sendblk.send(b).unwrap()
+ pub async fn update(&self, b: Bytes) {
+ let (tx, rx) = oneshot::channel();
+ self.sendblk.send((b, tx)).unwrap();
+ let _ = rx.await;
}
pub async fn finalize(self) -> digest::Output<D> {