diff options
author | Alex <alex@adnab.me> | 2022-09-12 16:38:43 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-09-12 16:38:43 +0200 |
commit | 309d7aef3f05657e2b969ab72442b2f2c350da03 (patch) | |
tree | 448704ae3f07a10f86fcb5d40347ad70cdd81498 /src/block | |
parent | 03c40a0b24dd5bd2a51d3cd3df0ca1a42fb2d328 (diff) | |
parent | f91fab8582728f176f446a4a2e039d22f752167b (diff) | |
download | garage-309d7aef3f05657e2b969ab72442b2f2c350da03.tar.gz garage-309d7aef3f05657e2b969ab72442b2f2c350da03.zip |
Merge pull request 'performance improvements' (#342) from lx-perf-improvements into main
Performance improvements included in this PR:
- [x] Use `Bytes` at a few places where appropriate, instead of `Vec<u8>`, to reduce the number of copies
- [x] StreamChunker now accumulates incoming slices in a `Vec<Bytes>` instead of a `VecDeque<u8>`. Replaces calls to `.extend()` and `.drain()` that were quite costly by a simple `concat()` on a vec of slices which is much more optimized
- [x] Hashing (b2, sha256, md5) is now done on a Tokio thread dedicated to cpu-intensive tasks, using `spawn_blocking`
- [x] Block manager now uses 256 independant locks instead of one big lock for writing, reduces contention when writing several/many objects in parallel
- [x] Better LMDB defaults: we now put flags `NoSync` and `NoMetaSync` to avoid `fsync` at each transaction (extremely slow). Also increased number of LMDB readers to accomodate more intensive workloads
Other changes included in this PR:
- [x] Update to hashing and MAC crates: md5 and sha2 from 0.9 to 0.10, hmac from 0.10 to 0.12
- [x] switch to `tracing_subscriber` for logs, which allows to have timing of each event
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/342
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/block.rs | 17 | ||||
-rw-r--r-- | src/block/manager.rs | 34 |
2 files changed, 34 insertions, 17 deletions
diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..f17bd2c0 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -61,13 +62,17 @@ impl DataBlock { } } - pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock { + tokio::task::spawn_blocking(move || { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } } - } - DataBlock::Plain(data) + DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + }) + .await + .unwrap() } } diff --git a/src/block/manager.rs b/src/block/manager.rs index 9240db25..b5199b62 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,12 +3,18 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{mpsc, Mutex}; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; + use garage_db as db; use garage_util::data::*; @@ -68,7 +74,7 @@ pub struct BlockManager { compression_level: Option<i32>, - mutation_lock: Mutex<BlockManagerLocked>, + mutation_lock: [Mutex<BlockManagerLocked>; 256], pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -105,8 +111,6 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let manager_locked = BlockManagerLocked(); - let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); let (scrub_tx, scrub_rx) = mpsc::channel(1); @@ -115,7 +119,7 @@ impl BlockManager { replication, data_dir, compression_level, - mutation_lock: Mutex::new(manager_locked), + mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), rc, resync, system, @@ -174,14 +178,15 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level); + let data = DataBlock::from_buffer(data, self.compression_level).await; self.system .rpc .try_call_many( &self.endpoint, &who[..], + // TODO: remove to_vec() here BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) @@ -260,14 +265,21 @@ impl BlockManager { hash: &Hash, data: &DataBlock, ) -> Result<BlockRpc, Error> { + let tracer = opentelemetry::global::tracer("garage"); + let write_size = data.inner_buffer().len() as u64; - let res = self - .mutation_lock + let res = self.mutation_lock[hash.as_slice()[0] as usize] .lock() + .with_context(Context::current_with_span( + tracer.start("Acquire mutation_lock"), + )) .await .write_block(hash, data, self) .bound_record_duration(&self.metrics.block_write_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManagerLocked::write_block"), + )) .await?; self.metrics.bytes_written.add(write_size); @@ -317,7 +329,7 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .move_block_to_corrupted(hash, self) @@ -331,7 +343,7 @@ impl BlockManager { /// Check if this node has a block and whether it needs it pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> { - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .check_block_status(hash, self) @@ -346,7 +358,7 @@ impl BlockManager { /// Delete block if it is not needed anymore pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> { - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .delete_if_unneeded(hash, self) |