From 1b2e1296eb99630e969e585ede0424072adc2d0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 17:18:47 +0200 Subject: Compute hashes on dedicated threads --- src/block/block.rs | 17 +++++++++++------ src/block/manager.rs | 6 ++++-- 2 files changed, 15 insertions(+), 8 deletions(-) (limited to 'src/block') diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..f17bd2c0 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -61,13 +62,17 @@ impl DataBlock { } } - pub fn from_buffer(data: Vec, level: Option) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + pub async fn from_buffer(data: Bytes, level: Option) -> DataBlock { + tokio::task::spawn_blocking(move || { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } } - } - DataBlock::Plain(data) + DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + }) + .await + .unwrap() } } diff --git a/src/block/manager.rs b/src/block/manager.rs index 017ba9da..890c247d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,6 +5,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use futures::future::*; @@ -211,14 +212,15 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level); + let data = DataBlock::from_buffer(data, self.compression_level).await; self.system .rpc .try_call_many( &self.endpoint, &who[..], + // TODO: remove to_vec() here BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) -- cgit v1.2.3 From 2f111e6b3d772b10c8ed6279ce0c82d22852afd1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 18:40:57 +0200 Subject: Performance improvements: - reduce contention on mutation_lock by having 256 of them - better lmdb defaults --- src/block/manager.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 890c247d..be53ec6e 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -93,7 +93,7 @@ pub struct BlockManager { compression_level: Option, background_tranquility: u32, - mutation_lock: Mutex, + mutation_lock: [Mutex; 256], pub(crate) rc: BlockRc, @@ -150,8 +150,6 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let manager_locked = BlockManagerLocked(); - let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone()); let block_manager = Arc::new(Self { @@ -159,7 +157,7 @@ impl BlockManager { data_dir, compression_level, background_tranquility, - mutation_lock: Mutex::new(manager_locked), + mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), rc, resync_queue, resync_notify: Notify::new(), @@ -313,14 +311,21 @@ impl BlockManager { /// Write a block to disk async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { + let tracer = opentelemetry::global::tracer("garage"); + let write_size = data.inner_buffer().len() as u64; - let res = self - .mutation_lock + let res = self.mutation_lock[hash.as_slice()[0] as usize] .lock() + .with_context(Context::current_with_span( + tracer.start("Acquire mutation_lock"), + )) .await .write_block(hash, data, self) .bound_record_duration(&self.metrics.block_write_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManagerLocked::write_block"), + )) .await?; self.metrics.bytes_written.add(write_size); @@ -370,7 +375,7 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .move_block_to_corrupted(hash, self) @@ -384,8 +389,7 @@ impl BlockManager { /// Check if this node should have a block, but don't actually have it async fn need_block(&self, hash: &Hash) -> Result { - let BlockStatus { exists, needed } = self - .mutation_lock + let BlockStatus { exists, needed } = self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .check_block_status(hash, self) @@ -608,8 +612,7 @@ impl BlockManager { } async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { - let BlockStatus { exists, needed } = self - .mutation_lock + let BlockStatus { exists, needed } = self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .check_block_status(hash, self) @@ -694,7 +697,7 @@ impl BlockManager { who.len() ); - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .delete_if_unneeded(hash, self) -- cgit v1.2.3