aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block/manager.rs29
1 files changed, 16 insertions, 13 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index b9cd09e7..ec694fc8 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -11,7 +11,7 @@ use futures::Stream;
use futures_util::stream::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, Mutex, MutexGuard};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -261,7 +261,7 @@ impl BlockManager {
> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header {
- DataBlockHeader::Plain => Ok(Box::pin(stream)),
+ DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => {
// Too many things, I hate it.
let reader = stream_asyncread(stream);
@@ -389,11 +389,7 @@ impl BlockManager {
let write_size = data.inner_buffer().len() as u64;
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
- .with_context(Context::current_with_span(
- tracer.start("Acquire mutation_lock"),
- ))
+ self.lock_mutate(hash)
.await
.write_block(hash, data, self)
.bound_record_duration(&self.metrics.block_write_duration)
@@ -470,8 +466,7 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
+ self.lock_mutate(hash)
.await
.move_block_to_corrupted(hash, self)
.await?;
@@ -484,8 +479,7 @@ impl BlockManager {
/// Check if this node has a block and whether it needs it
pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
+ self.lock_mutate(hash)
.await
.check_block_status(hash, self)
.await
@@ -499,8 +493,7 @@ impl BlockManager {
/// Delete block if it is not needed anymore
pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> {
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
+ self.lock_mutate(hash)
.await
.delete_if_unneeded(hash, self)
.await
@@ -532,6 +525,16 @@ impl BlockManager {
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
+
+ async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
+ let tracer = opentelemetry::global::tracer("garage");
+ self.mutation_lock[hash.as_slice()[0] as usize]
+ .lock()
+ .with_context(Context::current_with_span(
+ tracer.start("Acquire mutation_lock"),
+ ))
+ .await
+ }
}
#[async_trait]