aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs55
1 files changed, 51 insertions, 4 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 1173c7b3..9e939c24 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -1,12 +1,13 @@
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures::future::*;
use futures::select;
+use opentelemetry::KeyValue;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -23,8 +24,8 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
+use crate::block_metrics::*;
use crate::block_ref_table::*;
-
use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
@@ -154,6 +155,8 @@ pub struct BlockManager {
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
+
+ metrics: BlockManagerMetrics,
}
// This custom struct contains functions that must only be ran
@@ -182,6 +185,8 @@ impl BlockManager {
let manager_locked = BlockManagerLocked();
+ let metrics = BlockManagerMetrics::new(resync_queue.clone());
+
let block_manager = Arc::new(Self {
replication,
data_dir,
@@ -192,6 +197,7 @@ impl BlockManager {
system,
endpoint,
garage: ArcSwapOption::from(None),
+ metrics,
});
block_manager.endpoint.set_handler(block_manager.clone());
@@ -380,15 +386,28 @@ impl BlockManager {
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
- self.mutation_lock
+ let request_start = SystemTime::now();
+ let write_size = data.inner_buffer().len() as u64;
+
+ let res = self
+ .mutation_lock
.lock()
.await
.write_block(hash, data, self)
- .await
+ .await?;
+
+ self.metrics.bytes_written.add(write_size);
+ self.metrics
+ .block_write_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
+ Ok(res)
}
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ let request_start = SystemTime::now();
+
let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c,
@@ -414,6 +433,8 @@ impl BlockManager {
};
if data.verify(*hash).is_err() {
+ self.metrics.corruption_counter.add(1);
+
self.mutation_lock
.lock()
.await
@@ -423,6 +444,13 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
+ self.metrics
+ .bytes_read
+ .add(data.inner_buffer().len() as u64);
+ self.metrics
+ .block_read_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
Ok(BlockRpc::PutBlock { hash: *hash, data })
}
@@ -521,9 +549,18 @@ impl BlockManager {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
if now >= time_msec {
+ let start_time = SystemTime::now();
+
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await;
+
+ self.metrics.resync_counter.add(1);
+ self.metrics
+ .resync_duration
+ .record(start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
if let Err(e) = &res {
+ self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
}
@@ -607,6 +644,12 @@ impl BlockManager {
need_nodes.len()
);
+ for node in need_nodes.iter() {
+ self.metrics
+ .resync_send_counter
+ .add(1, &[KeyValue::new("to", format!("{:?}", node))]);
+ }
+
let put_block_message = self.read_block(hash).await?;
self.system
.rpc
@@ -644,6 +687,9 @@ impl BlockManager {
);
let block_data = self.rpc_get_raw_block(hash).await?;
+
+ self.metrics.resync_recv_counter.add(1);
+
self.write_block(hash, &block_data).await?;
}
@@ -819,6 +865,7 @@ impl BlockManagerLocked {
path.set_extension("zst");
}
fs::remove_file(path).await?;
+ mgr.metrics.delete_counter.add(1);
}
Ok(())
}