aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-07 12:41:36 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-07 12:41:36 +0200
commit99ed18350f9572ebb1968107d3708d53682ee805 (patch)
tree026098257ef03fc3ee990c890633f4b7bb442644 /src/block
parentf38a31b3304726aa7c890ba1a9f7a3e67b11bc60 (diff)
downloadgarage-99ed18350f9572ebb1968107d3708d53682ee805.tar.gz
garage-99ed18350f9572ebb1968107d3708d53682ee805.zip
block manager: refactor and fix monitoring/statistics
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs47
1 files changed, 20 insertions, 27 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index eb498be0..0081f46c 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -491,8 +491,6 @@ impl BlockManager {
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
- let write_size = data.inner_buffer().len() as u64;
-
self.lock_mutate(hash)
.await
.write_block(hash, data, self)
@@ -502,8 +500,6 @@ impl BlockManager {
))
.await?;
- self.metrics.bytes_written.add(write_size);
-
Ok(())
}
@@ -530,31 +526,26 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let data = self
- .read_block_internal(hash)
- .bound_record_duration(&self.metrics.block_read_duration)
- .await?;
-
- self.metrics
- .bytes_read
- .add(data.inner_buffer().len() as u64);
-
- Ok(data)
- }
-
- async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
- match self.find_block(hash).await {
- Some(p) => self.read_block_from(hash, &p).await,
- None => {
- // Not found but maybe we should have had it ??
- self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
- return Err(Error::Message(format!(
- "block {:?} not found on node",
- hash
- )));
+ let tracer = opentelemetry::global::tracer("garage");
+ async {
+ match self.find_block(hash).await {
+ Some(p) => self.read_block_from(hash, &p).await,
+ None => {
+ // Not found but maybe we should have had it ??
+ self.resync
+ .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ return Err(Error::Message(format!(
+ "block {:?} not found on node",
+ hash
+ )));
+ }
}
}
+ .bound_record_duration(&self.metrics.block_read_duration)
+ .with_context(Context::current_with_span(
+ tracer.start("BlockManager::read_block"),
+ ))
+ .await
}
pub(crate) async fn read_block_from(
@@ -570,6 +561,7 @@ impl BlockManager {
let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
+ self.metrics.bytes_read.add(data.len() as u64);
drop(f);
let data = if compressed {
@@ -731,6 +723,7 @@ impl BlockManagerLocked {
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
+ mgr.metrics.bytes_written.add(data.len() as u64);
if mgr.data_fsync {
f.sync_all().await?;