aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-12 16:38:43 +0200
committerAlex <alex@adnab.me>2022-09-12 16:38:43 +0200
commit309d7aef3f05657e2b969ab72442b2f2c350da03 (patch)
tree448704ae3f07a10f86fcb5d40347ad70cdd81498 /src/api/s3/put.rs
parent03c40a0b24dd5bd2a51d3cd3df0ca1a42fb2d328 (diff)
parentf91fab8582728f176f446a4a2e039d22f752167b (diff)
downloadgarage-309d7aef3f05657e2b969ab72442b2f2c350da03.tar.gz
garage-309d7aef3f05657e2b969ab72442b2f2c350da03.zip
Merge pull request 'performance improvements' (#342) from lx-perf-improvements into main
Performance improvements included in this PR: - [x] Use `Bytes` at a few places where appropriate, instead of `Vec<u8>`, to reduce the number of copies - [x] StreamChunker now accumulates incoming slices in a `Vec<Bytes>` instead of a `VecDeque<u8>`. Replaces calls to `.extend()` and `.drain()` that were quite costly by a simple `concat()` on a vec of slices which is much more optimized - [x] Hashing (b2, sha256, md5) is now done on a Tokio thread dedicated to cpu-intensive tasks, using `spawn_blocking` - [x] Block manager now uses 256 independant locks instead of one big lock for writing, reduces contention when writing several/many objects in parallel - [x] Better LMDB defaults: we now put flags `NoSync` and `NoMetaSync` to avoid `fsync` at each transaction (extremely slow). Also increased number of LMDB readers to accomodate more intensive workloads Other changes included in this PR: - [x] Update to hashing and MAC crates: md5 and sha2 from 0.9 to 0.10, hmac from 0.10 to 0.12 - [x] switch to `tracing_subscriber` for logs, which allows to have timing of each event Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/342
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs87
1 files changed, 65 insertions, 22 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 9ef37421..dc0530df 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -8,7 +8,13 @@ use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use opentelemetry::{
+ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
+ Context,
+};
+
use garage_table::*;
+use garage_util::async_hash::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -130,7 +136,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block_hash = blake2sum(&first_block[..]);
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
@@ -273,14 +280,23 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
- first_block: Vec<u8>,
+ first_block: Bytes,
first_block_hash: Hash,
chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
- let mut md5hasher = Md5::new();
- let mut sha256hasher = Sha256::new();
- md5hasher.update(&first_block[..]);
- sha256hasher.update(&first_block[..]);
+ let tracer = opentelemetry::global::tracer("garage");
+
+ let md5hasher = AsyncHasher::<Md5>::new();
+ let sha256hasher = AsyncHasher::<Sha256>::new();
+
+ futures::future::join(
+ md5hasher.update(first_block.clone()),
+ sha256hasher.update(first_block.clone()),
+ )
+ .with_context(Context::current_with_span(
+ tracer.start("Hash first block (md5, sha256)"),
+ ))
+ .await;
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
@@ -302,9 +318,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- md5hasher.update(&block[..]);
- sha256hasher.update(&block[..]);
- let block_hash = blake2sum(&block[..]);
+ let block = Bytes::from(block);
+ let (_, _, block_hash) = futures::future::join3(
+ md5hasher.update(block.clone()),
+ sha256hasher.update(block.clone()),
+ async_blake2sum(block.clone()),
+ )
+ .with_context(Context::current_with_span(
+ tracer.start("Hash block (md5, sha256, blake2)"),
+ ))
+ .await;
let block_len = block.len();
put_curr_version_block = put_block_meta(
garage,
@@ -322,9 +345,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
}
let total_size = next_offset as u64;
- let data_md5sum = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize().await;
- let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = sha256hasher.finalize().await;
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
@@ -364,7 +387,8 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<u8>,
+ buf: VecDeque<Bytes>,
+ buf_len: usize,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -373,29 +397,45 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(2 * block_size),
+ buf: VecDeque::with_capacity(8),
+ buf_len: 0,
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
- while !self.read_all && self.buf.len() < self.block_size {
+ while !self.read_all && self.buf_len < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf.extend(bytes);
+ self.buf_len += bytes.len();
+ self.buf.push_back(bytes);
} else {
self.read_all = true;
}
}
- if self.buf.is_empty() {
+ if self.buf_len == 0 {
Ok(None)
- } else if self.buf.len() <= self.block_size {
- let block = self.buf.drain(..).collect::<Vec<u8>>();
- Ok(Some(block))
} else {
- let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
- Ok(Some(block))
+ let mut slices = Vec::with_capacity(self.buf.len());
+ let mut taken = 0;
+ while self.buf_len > 0 && taken < self.block_size {
+ let front = self.buf.pop_front().unwrap();
+ if taken + front.len() <= self.block_size {
+ taken += front.len();
+ self.buf_len -= front.len();
+ slices.push(front);
+ } else {
+ let front_take = self.block_size - taken;
+ slices.push(front.slice(..front_take));
+ self.buf.push_front(front.slice(front_take..));
+ self.buf_len -= front_take;
+ break;
+ }
+ }
+ Ok(Some(
+ slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(),
+ ))
}
}
}
@@ -504,7 +544,10 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block_hash = blake2sum(&first_block[..]);
+
+ let first_block = Bytes::from(first_block);
+ let first_block_hash = async_blake2sum(first_block.clone()).await;
+
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
&version,