aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/copy.rs5
-rw-r--r--src/api/s3/put.rs87
2 files changed, 69 insertions, 23 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 0fc16993..4415a037 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2.block_manager.rpc_put_block(final_hash, data).await
+ garage2
+ .block_manager
+ .rpc_put_block(final_hash, data.into())
+ .await
} else {
Ok(())
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 9ef37421..dc0530df 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -8,7 +8,13 @@ use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use opentelemetry::{
+ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
+ Context,
+};
+
use garage_table::*;
+use garage_util::async_hash::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -130,7 +136,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block_hash = blake2sum(&first_block[..]);
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
@@ -273,14 +280,23 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
- first_block: Vec<u8>,
+ first_block: Bytes,
first_block_hash: Hash,
chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
- let mut md5hasher = Md5::new();
- let mut sha256hasher = Sha256::new();
- md5hasher.update(&first_block[..]);
- sha256hasher.update(&first_block[..]);
+ let tracer = opentelemetry::global::tracer("garage");
+
+ let md5hasher = AsyncHasher::<Md5>::new();
+ let sha256hasher = AsyncHasher::<Sha256>::new();
+
+ futures::future::join(
+ md5hasher.update(first_block.clone()),
+ sha256hasher.update(first_block.clone()),
+ )
+ .with_context(Context::current_with_span(
+ tracer.start("Hash first block (md5, sha256)"),
+ ))
+ .await;
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
@@ -302,9 +318,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- md5hasher.update(&block[..]);
- sha256hasher.update(&block[..]);
- let block_hash = blake2sum(&block[..]);
+ let block = Bytes::from(block);
+ let (_, _, block_hash) = futures::future::join3(
+ md5hasher.update(block.clone()),
+ sha256hasher.update(block.clone()),
+ async_blake2sum(block.clone()),
+ )
+ .with_context(Context::current_with_span(
+ tracer.start("Hash block (md5, sha256, blake2)"),
+ ))
+ .await;
let block_len = block.len();
put_curr_version_block = put_block_meta(
garage,
@@ -322,9 +345,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
}
let total_size = next_offset as u64;
- let data_md5sum = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize().await;
- let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = sha256hasher.finalize().await;
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
@@ -364,7 +387,8 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<u8>,
+ buf: VecDeque<Bytes>,
+ buf_len: usize,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -373,29 +397,45 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(2 * block_size),
+ buf: VecDeque::with_capacity(8),
+ buf_len: 0,
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
- while !self.read_all && self.buf.len() < self.block_size {
+ while !self.read_all && self.buf_len < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf.extend(bytes);
+ self.buf_len += bytes.len();
+ self.buf.push_back(bytes);
} else {
self.read_all = true;
}
}
- if self.buf.is_empty() {
+ if self.buf_len == 0 {
Ok(None)
- } else if self.buf.len() <= self.block_size {
- let block = self.buf.drain(..).collect::<Vec<u8>>();
- Ok(Some(block))
} else {
- let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
- Ok(Some(block))
+ let mut slices = Vec::with_capacity(self.buf.len());
+ let mut taken = 0;
+ while self.buf_len > 0 && taken < self.block_size {
+ let front = self.buf.pop_front().unwrap();
+ if taken + front.len() <= self.block_size {
+ taken += front.len();
+ self.buf_len -= front.len();
+ slices.push(front);
+ } else {
+ let front_take = self.block_size - taken;
+ slices.push(front.slice(..front_take));
+ self.buf.push_front(front.slice(front_take..));
+ self.buf_len -= front_take;
+ break;
+ }
+ }
+ Ok(Some(
+ slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(),
+ ))
}
}
}
@@ -504,7 +544,10 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block_hash = blake2sum(&first_block[..]);
+
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
+
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
&version,