diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 8 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 5 | ||||
-rw-r--r-- | src/api/s3/put.rs | 32 | ||||
-rw-r--r-- | src/api/signature/mod.rs | 14 | ||||
-rw-r--r-- | src/block/block.rs | 17 | ||||
-rw-r--r-- | src/block/manager.rs | 6 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 2 | ||||
-rw-r--r-- | src/util/Cargo.toml | 4 | ||||
-rw-r--r-- | src/util/async_hash.rs | 55 | ||||
-rw-r--r-- | src/util/lib.rs | 1 |
10 files changed, 110 insertions, 34 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index db77cf38..901cb959 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -24,15 +24,15 @@ async-trait = "0.1.7" base64 = "0.13" bytes = "1.0" chrono = "0.4" -crypto-mac = "0.10" +crypto-common = "0.1" err-derive = "0.3" hex = "0.4" -hmac = "0.10" +hmac = "0.12" idna = "0.2" tracing = "0.1.30" -md-5 = "0.9" +md-5 = "0.10" nom = "7.1" -sha2 = "0.9" +sha2 = "0.10" futures = "0.3" futures-util = "0.3" diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 0fc16993..4415a037 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await + garage2 + .block_manager + .rpc_put_block(final_hash, data.into()) + .await } else { Ok(()) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 9ef37421..fbfa6f0d 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -9,6 +9,7 @@ use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; use garage_table::*; +use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -130,7 +131,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = blake2sum(&first_block[..]); + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( @@ -273,14 +275,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, - first_block: Vec<u8>, + first_block: Bytes, first_block_hash: Hash, chunker: &mut StreamChunker<S>, ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); + let first_block = Bytes::from(first_block); + + let md5hasher = AsyncHasher::<Md5>::new(); + let sha256hasher = AsyncHasher::<Sha256>::new(); + md5hasher.update(first_block.clone()); + sha256hasher.update(first_block.clone()); let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( @@ -302,9 +306,10 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); + let block = Bytes::from(block); + md5hasher.update(block.clone()); + sha256hasher.update(block.clone()); + let block_hash = async_blake2sum(block.clone()).await; let block_len = block.len(); put_curr_version_block = put_block_meta( garage, @@ -322,9 +327,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( } let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); + let data_md5sum = md5hasher.finalize().await; - let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); Ok((total_size, data_md5sum, data_sha256sum)) @@ -504,7 +509,10 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block_hash = blake2sum(&first_block[..]); + + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; + let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, &version, diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index dd5b590c..4b8b990f 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::{Hmac, Mac}; use sha2::Sha256; use garage_util::data::{sha256sum, Hash}; @@ -29,17 +29,17 @@ pub fn signing_hmac( secret_key: &str, region: &str, service: &str, -) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> { +) -> Result<HmacSha256, crypto_common::InvalidLength> { let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?; date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?; region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?; service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?; signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?; Ok(hmac) } diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..f17bd2c0 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -61,13 +62,17 @@ impl DataBlock { } } - pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock { + tokio::task::spawn_blocking(move || { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } } - } - DataBlock::Plain(data) + DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + }) + .await + .unwrap() } } diff --git a/src/block/manager.rs b/src/block/manager.rs index 017ba9da..890c247d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,6 +5,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use futures::future::*; @@ -211,14 +212,15 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level); + let data = DataBlock::from_buffer(data, self.compression_level).await; self.system .rpc .try_call_many( &self.endpoint, &who[..], + // TODO: remove to_vec() here BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 8948e750..80802a16 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -65,7 +65,7 @@ chrono = "0.4" http = "0.2" hmac = "0.10" hyper = { version = "0.14", features = ["client", "http1", "runtime"] } -sha2 = "0.9" +sha2 = "0.10" static_init = "1.0" assert-json-diff = "2.0" diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 57c70ffb..7d79f21a 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -18,12 +18,14 @@ garage_db = { version = "0.8.0", path = "../db" } async-trait = "0.1" blake2 = "0.9" +bytes = "1.0" +digest = "0.10" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" tracing = "0.1.30" rand = "0.8" -sha2 = "0.9" +sha2 = "0.10" chrono = "0.4" rmp-serde = "0.15" diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..67776eb9 --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,55 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher<D: Digest> { + sendblk: mpsc::UnboundedSender<Bytes>, + task: JoinHandle<digest::Output<D>>, +} + +impl<D: Digest> AsyncHasher<D> { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>(); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some(blk) = recvblk.blocking_recv() { + digest.update(&blk[..]); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub fn update(&self, b: Bytes) { + self.sendblk.send(b).unwrap() + } + + pub async fn finalize(self) -> digest::Output<D> { + drop(self.sendblk); + self.task.await.unwrap() + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index fce151af..7152f92a 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate tracing; +pub mod async_hash; pub mod background; pub mod config; pub mod crdt; |