aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-18 17:18:47 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:24:44 +0200
commit1b2e1296eb99630e969e585ede0424072adc2d0c (patch)
treee16b953e1f53e57c6efd11e688e955fa36cbb186 /src
parenta184f0d0b5f1d84524f7ba7302a4919567acec56 (diff)
downloadgarage-1b2e1296eb99630e969e585ede0424072adc2d0c.tar.gz
garage-1b2e1296eb99630e969e585ede0424072adc2d0c.zip
Compute hashes on dedicated threads
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml8
-rw-r--r--src/api/s3/copy.rs5
-rw-r--r--src/api/s3/put.rs32
-rw-r--r--src/api/signature/mod.rs14
-rw-r--r--src/block/block.rs17
-rw-r--r--src/block/manager.rs6
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/util/Cargo.toml4
-rw-r--r--src/util/async_hash.rs55
-rw-r--r--src/util/lib.rs1
10 files changed, 110 insertions, 34 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index db77cf38..901cb959 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -24,15 +24,15 @@ async-trait = "0.1.7"
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
-crypto-mac = "0.10"
+crypto-common = "0.1"
err-derive = "0.3"
hex = "0.4"
-hmac = "0.10"
+hmac = "0.12"
idna = "0.2"
tracing = "0.1.30"
-md-5 = "0.9"
+md-5 = "0.10"
nom = "7.1"
-sha2 = "0.9"
+sha2 = "0.10"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 0fc16993..4415a037 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2.block_manager.rpc_put_block(final_hash, data).await
+ garage2
+ .block_manager
+ .rpc_put_block(final_hash, data.into())
+ .await
} else {
Ok(())
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 9ef37421..fbfa6f0d 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -9,6 +9,7 @@ use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
use garage_table::*;
+use garage_util::async_hash::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -130,7 +131,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block_hash = blake2sum(&first_block[..]);
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
@@ -273,14 +275,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
- first_block: Vec<u8>,
+ first_block: Bytes,
first_block_hash: Hash,
chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
- let mut md5hasher = Md5::new();
- let mut sha256hasher = Sha256::new();
- md5hasher.update(&first_block[..]);
- sha256hasher.update(&first_block[..]);
+ let first_block = Bytes::from(first_block);
+
+ let md5hasher = AsyncHasher::<Md5>::new();
+ let sha256hasher = AsyncHasher::<Sha256>::new();
+ md5hasher.update(first_block.clone());
+ sha256hasher.update(first_block.clone());
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
@@ -302,9 +306,10 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- md5hasher.update(&block[..]);
- sha256hasher.update(&block[..]);
- let block_hash = blake2sum(&block[..]);
+ let block = Bytes::from(block);
+ md5hasher.update(block.clone());
+ sha256hasher.update(block.clone());
+ let block_hash = async_blake2sum(block.clone()).await;
let block_len = block.len();
put_curr_version_block = put_block_meta(
garage,
@@ -322,9 +327,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
}
let total_size = next_offset as u64;
- let data_md5sum = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize().await;
- let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = sha256hasher.finalize().await;
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
@@ -504,7 +509,10 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block_hash = blake2sum(&first_block[..]);
+
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
+
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
&version,
diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs
index dd5b590c..4b8b990f 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/signature/mod.rs
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
-use hmac::{Hmac, Mac, NewMac};
+use hmac::{Hmac, Mac};
use sha2::Sha256;
use garage_util::data::{sha256sum, Hash};
@@ -29,17 +29,17 @@ pub fn signing_hmac(
secret_key: &str,
region: &str,
service: &str,
-) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
+) -> Result<HmacSha256, crypto_common::InvalidLength> {
let secret = String::from("AWS4") + secret_key;
- let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
+ let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?;
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
- let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
+ let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?;
region_hmac.update(region.as_bytes());
- let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
+ let mut service_hmac = HmacSha256::new_from_slice(&region_hmac.finalize().into_bytes())?;
service_hmac.update(service.as_bytes());
- let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
+ let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?;
signing_hmac.update(b"aws4_request");
- let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
+ let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?;
Ok(hmac)
}
diff --git a/src/block/block.rs b/src/block/block.rs
index 4d3fbcb8..f17bd2c0 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -1,3 +1,4 @@
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@@ -61,13 +62,17 @@ impl DataBlock {
}
}
- pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
- if let Some(level) = level {
- if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
+ pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
+ tokio::task::spawn_blocking(move || {
+ if let Some(level) = level {
+ if let Ok(data) = zstd_encode(&data[..], level) {
+ return DataBlock::Compressed(data);
+ }
}
- }
- DataBlock::Plain(data)
+ DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here
+ })
+ .await
+ .unwrap()
}
}
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 017ba9da..890c247d 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -5,6 +5,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
use futures::future::*;
@@ -211,14 +212,15 @@ impl BlockManager {
}
/// Send block to nodes that should have it
- pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- let data = DataBlock::from_buffer(data, self.compression_level);
+ let data = DataBlock::from_buffer(data, self.compression_level).await;
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
+ // TODO: remove to_vec() here
BlockRpc::PutBlock { hash, data },
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.replication.write_quorum())
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 8948e750..80802a16 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -65,7 +65,7 @@ chrono = "0.4"
http = "0.2"
hmac = "0.10"
hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
-sha2 = "0.9"
+sha2 = "0.10"
static_init = "1.0"
assert-json-diff = "2.0"
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 57c70ffb..7d79f21a 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -18,12 +18,14 @@ garage_db = { version = "0.8.0", path = "../db" }
async-trait = "0.1"
blake2 = "0.9"
+bytes = "1.0"
+digest = "0.10"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
-sha2 = "0.9"
+sha2 = "0.10"
chrono = "0.4"
rmp-serde = "0.15"
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs
new file mode 100644
index 00000000..67776eb9
--- /dev/null
+++ b/src/util/async_hash.rs
@@ -0,0 +1,55 @@
+use bytes::Bytes;
+use digest::Digest;
+
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+use crate::data::*;
+
+/// Compute the sha256 of a slice,
+/// spawning on a tokio thread for CPU-intensive processing
+/// The argument has to be an owned Bytes, as it is moved out to a new thread.
+pub async fn async_sha256sum(data: Bytes) -> Hash {
+ tokio::task::spawn_blocking(move || sha256sum(&data))
+ .await
+ .unwrap()
+}
+
+/// Compute the blake2sum of a slice,
+/// spawning on a tokio thread for CPU-intensive processing.
+/// The argument has to be an owned Bytes, as it is moved out to a new thread.
+pub async fn async_blake2sum(data: Bytes) -> Hash {
+ tokio::task::spawn_blocking(move || blake2sum(&data))
+ .await
+ .unwrap()
+}
+
+// ----
+
+pub struct AsyncHasher<D: Digest> {
+ sendblk: mpsc::UnboundedSender<Bytes>,
+ task: JoinHandle<digest::Output<D>>,
+}
+
+impl<D: Digest> AsyncHasher<D> {
+ pub fn new() -> Self {
+ let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>();
+ let task = tokio::task::spawn_blocking(move || {
+ let mut digest = D::new();
+ while let Some(blk) = recvblk.blocking_recv() {
+ digest.update(&blk[..]);
+ }
+ digest.finalize()
+ });
+ Self { sendblk, task }
+ }
+
+ pub fn update(&self, b: Bytes) {
+ self.sendblk.send(b).unwrap()
+ }
+
+ pub async fn finalize(self) -> digest::Output<D> {
+ drop(self.sendblk);
+ self.task.await.unwrap()
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index fce151af..7152f92a 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -3,6 +3,7 @@
#[macro_use]
extern crate tracing;
+pub mod async_hash;
pub mod background;
pub mod config;
pub mod crdt;