aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs27
1 files changed, 15 insertions, 12 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 890c247d..be53ec6e 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -93,7 +93,7 @@ pub struct BlockManager {
compression_level: Option<i32>,
background_tranquility: u32,
- mutation_lock: Mutex<BlockManagerLocked>,
+ mutation_lock: [Mutex<BlockManagerLocked>; 256],
pub(crate) rc: BlockRc,
@@ -150,8 +150,6 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
- let manager_locked = BlockManagerLocked();
-
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self {
@@ -159,7 +157,7 @@ impl BlockManager {
data_dir,
compression_level,
background_tranquility,
- mutation_lock: Mutex::new(manager_locked),
+ mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
rc,
resync_queue,
resync_notify: Notify::new(),
@@ -313,14 +311,21 @@ impl BlockManager {
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
+ let tracer = opentelemetry::global::tracer("garage");
+
let write_size = data.inner_buffer().len() as u64;
- let res = self
- .mutation_lock
+ let res = self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
+ .with_context(Context::current_with_span(
+ tracer.start("Acquire mutation_lock"),
+ ))
.await
.write_block(hash, data, self)
.bound_record_duration(&self.metrics.block_write_duration)
+ .with_context(Context::current_with_span(
+ tracer.start("BlockManagerLocked::write_block"),
+ ))
.await?;
self.metrics.bytes_written.add(write_size);
@@ -370,7 +375,7 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
- self.mutation_lock
+ self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.move_block_to_corrupted(hash, self)
@@ -384,8 +389,7 @@ impl BlockManager {
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
- let BlockStatus { exists, needed } = self
- .mutation_lock
+ let BlockStatus { exists, needed } = self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.check_block_status(hash, self)
@@ -608,8 +612,7 @@ impl BlockManager {
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
- let BlockStatus { exists, needed } = self
- .mutation_lock
+ let BlockStatus { exists, needed } = self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.check_block_status(hash, self)
@@ -694,7 +697,7 @@ impl BlockManager {
who.len()
);
- self.mutation_lock
+ self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.await
.delete_if_unneeded(hash, self)