diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/block.rs | 17 | ||||
-rw-r--r-- | src/block/manager.rs | 34 |
2 files changed, 34 insertions, 17 deletions
diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..f17bd2c0 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -61,13 +62,17 @@ impl DataBlock { } } - pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock { + tokio::task::spawn_blocking(move || { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } } - } - DataBlock::Plain(data) + DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + }) + .await + .unwrap() } } diff --git a/src/block/manager.rs b/src/block/manager.rs index 9240db25..b5199b62 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,12 +3,18 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{mpsc, Mutex}; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; + use garage_db as db; use garage_util::data::*; @@ -68,7 +74,7 @@ pub struct BlockManager { compression_level: Option<i32>, - mutation_lock: Mutex<BlockManagerLocked>, + mutation_lock: [Mutex<BlockManagerLocked>; 256], pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -105,8 +111,6 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let manager_locked = BlockManagerLocked(); - let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); let (scrub_tx, scrub_rx) = mpsc::channel(1); @@ -115,7 +119,7 @@ impl BlockManager { replication, data_dir, compression_level, - mutation_lock: Mutex::new(manager_locked), + mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), rc, resync, system, @@ -174,14 +178,15 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level); + let data = DataBlock::from_buffer(data, self.compression_level).await; self.system .rpc .try_call_many( &self.endpoint, &who[..], + // TODO: remove to_vec() here BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) @@ -260,14 +265,21 @@ impl BlockManager { hash: &Hash, data: &DataBlock, ) -> Result<BlockRpc, Error> { + let tracer = opentelemetry::global::tracer("garage"); + let write_size = data.inner_buffer().len() as u64; - let res = self - .mutation_lock + let res = self.mutation_lock[hash.as_slice()[0] as usize] .lock() + .with_context(Context::current_with_span( + tracer.start("Acquire mutation_lock"), + )) .await .write_block(hash, data, self) .bound_record_duration(&self.metrics.block_write_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManagerLocked::write_block"), + )) .await?; self.metrics.bytes_written.add(write_size); @@ -317,7 +329,7 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .move_block_to_corrupted(hash, self) @@ -331,7 +343,7 @@ impl BlockManager { /// Check if this node has a block and whether it needs it pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> { - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .check_block_status(hash, self) @@ -346,7 +358,7 @@ impl BlockManager { /// Delete block if it is not needed anymore pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> { - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .delete_if_unneeded(hash, self) |