aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-12 16:38:43 +0200
committerAlex <alex@adnab.me>2022-09-12 16:38:43 +0200
commit309d7aef3f05657e2b969ab72442b2f2c350da03 (patch)
tree448704ae3f07a10f86fcb5d40347ad70cdd81498 /src/block
parent03c40a0b24dd5bd2a51d3cd3df0ca1a42fb2d328 (diff)
parentf91fab8582728f176f446a4a2e039d22f752167b (diff)
downloadgarage-309d7aef3f05657e2b969ab72442b2f2c350da03.tar.gz
garage-309d7aef3f05657e2b969ab72442b2f2c350da03.zip
Merge pull request 'performance improvements' (#342) from lx-perf-improvements into main
Performance improvements included in this PR: - [x] Use `Bytes` at a few places where appropriate, instead of `Vec<u8>`, to reduce the number of copies - [x] StreamChunker now accumulates incoming slices in a `Vec<Bytes>` instead of a `VecDeque<u8>`. Replaces calls to `.extend()` and `.drain()` that were quite costly by a simple `concat()` on a vec of slices which is much more optimized - [x] Hashing (b2, sha256, md5) is now done on a Tokio thread dedicated to cpu-intensive tasks, using `spawn_blocking` - [x] Block manager now uses 256 independant locks instead of one big lock for writing, reduces contention when writing several/many objects in parallel - [x] Better LMDB defaults: we now put flags `NoSync` and `NoMetaSync` to avoid `fsync` at each transaction (extremely slow). Also increased number of LMDB readers to accomodate more intensive workloads Other changes included in this PR: - [x] Update to hashing and MAC crates: md5 and sha2 from 0.9 to 0.10, hmac from 0.10 to 0.12 - [x] switch to `tracing_subscriber` for logs, which allows to have timing of each event Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/342
Diffstat (limited to 'src/block')
-rw-r--r--src/block/block.rs17
-rw-r--r--src/block/manager.rs34
2 files changed, 34 insertions, 17 deletions
diff --git a/src/block/block.rs b/src/block/block.rs
index 4d3fbcb8..f17bd2c0 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -1,3 +1,4 @@
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@@ -61,13 +62,17 @@ impl DataBlock {
}
}
- pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
- if let Some(level) = level {
- if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
+ pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
+ tokio::task::spawn_blocking(move || {
+ if let Some(level) = level {
+ if let Ok(data) = zstd_encode(&data[..], level) {
+ return DataBlock::Compressed(data);
+ }
}
- }
- DataBlock::Plain(data)
+ DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here
+ })
+ .await
+ .unwrap()
}
}
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 9240db25..b5199b62 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,12 +3,18 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, Mutex};
+use opentelemetry::{
+ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
+ Context,
+};
+
use garage_db as db;
use garage_util::data::*;
@@ -68,7 +74,7 @@ pub struct BlockManager {
compression_level: Option<i32>,
- mutation_lock: Mutex<BlockManagerLocked>,
+ mutation_lock: [Mutex<BlockManagerLocked>; 256],
pub(crate) rc: BlockRc,
pub resync: BlockResyncManager,
@@ -105,8 +111,6 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
- let manager_locked = BlockManagerLocked();
-
let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone());
let (scrub_tx, scrub_rx) = mpsc::channel(1);
@@ -115,7 +119,7 @@ impl BlockManager {
replication,
data_dir,
compression_level,
- mutation_lock: Mutex::new(manager_locked),
+ mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
rc,
resync,
system,
@@ -174,14 +178,15 @@ impl BlockManager {
}
/// Send block to nodes that should have it
- pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- let data = DataBlock::from_buffer(data, self.compression_level);
+ let data = DataBlock::from_buffer(data, self.compression_level).await;
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
+ // TODO: remove to_vec() here
BlockRpc::PutBlock { hash, data },
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.replication.write_quorum())
@@ -260,14 +265,21 @@ impl BlockManager {
hash: &Hash,
data: &DataBlock,
) -> Result<BlockRpc, Error> {
+ let tracer = opentelemetry::global::tracer("garage");
+
let write_size = data.inner_buffer().len() as u64;
- let res = self
- .mutation_lock
+ let res = self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
+ .with_context(Context::current_with_span(
+ tracer.start("Acquire mutation_lock"),
+ ))
.await
.write_block(hash, data, self)
.bound_record_duration(&self.metrics.block_write_duration)
+ .with_context(Context::current_with_span(
+ tracer.start("BlockManagerLocked::write_block"),
+ ))
.await?;
self.metrics.bytes_written.add(write_size);
@@ -317,7 +329,7 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
- self.mutation_lock
+ self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.move_block_to_corrupted(hash, self)
@@ -331,7 +343,7 @@ impl BlockManager {
/// Check if this node has a block and whether it needs it
pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.mutation_lock
+ self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.check_block_status(hash, self)
@@ -346,7 +358,7 @@ impl BlockManager {
/// Delete block if it is not needed anymore
pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> {
- self.mutation_lock
+ self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.delete_if_unneeded(hash, self)