aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/copy.rs5
-rw-r--r--src/api/s3/put.rs32
2 files changed, 24 insertions, 13 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 0fc16993..4415a037 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2.block_manager.rpc_put_block(final_hash, data).await
+ garage2
+ .block_manager
+ .rpc_put_block(final_hash, data.into())
+ .await
} else {
Ok(())
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 9ef37421..fbfa6f0d 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -9,6 +9,7 @@ use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
use garage_table::*;
+use garage_util::async_hash::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -130,7 +131,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block_hash = blake2sum(&first_block[..]);
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
@@ -273,14 +275,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
- first_block: Vec<u8>,
+ first_block: Bytes,
first_block_hash: Hash,
chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
- let mut md5hasher = Md5::new();
- let mut sha256hasher = Sha256::new();
- md5hasher.update(&first_block[..]);
- sha256hasher.update(&first_block[..]);
+ let first_block = Bytes::from(first_block);
+
+ let md5hasher = AsyncHasher::<Md5>::new();
+ let sha256hasher = AsyncHasher::<Sha256>::new();
+ md5hasher.update(first_block.clone());
+ sha256hasher.update(first_block.clone());
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
@@ -302,9 +306,10 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- md5hasher.update(&block[..]);
- sha256hasher.update(&block[..]);
- let block_hash = blake2sum(&block[..]);
+ let block = Bytes::from(block);
+ md5hasher.update(block.clone());
+ sha256hasher.update(block.clone());
+ let block_hash = async_blake2sum(block.clone()).await;
let block_len = block.len();
put_curr_version_block = put_block_meta(
garage,
@@ -322,9 +327,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
}
let total_size = next_offset as u64;
- let data_md5sum = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize().await;
- let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = sha256hasher.finalize().await;
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
@@ -504,7 +509,10 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block_hash = blake2sum(&first_block[..]);
+
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
+
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
&version,