From 2cab84b1fe423a41b356211e592a614c95ec4e0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 16 Feb 2022 14:23:04 +0100 Subject: Add many metrics in table/ and rpc/ --- src/model/block.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 4 deletions(-) (limited to 'src/model/block.rs') diff --git a/src/model/block.rs b/src/model/block.rs index 1173c7b3..9e939c24 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,12 +1,13 @@ use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures::future::*; use futures::select; +use opentelemetry::KeyValue; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -23,8 +24,8 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; +use crate::block_metrics::*; use crate::block_ref_table::*; - use crate::garage::Garage; /// Size under which data will be stored inlined in database instead of as files @@ -154,6 +155,8 @@ pub struct BlockManager { system: Arc, endpoint: Arc>, pub(crate) garage: ArcSwapOption, + + metrics: BlockManagerMetrics, } // This custom struct contains functions that must only be ran @@ -182,6 +185,8 @@ impl BlockManager { let manager_locked = BlockManagerLocked(); + let metrics = BlockManagerMetrics::new(resync_queue.clone()); + let block_manager = Arc::new(Self { replication, data_dir, @@ -192,6 +197,7 @@ impl BlockManager { system, endpoint, garage: ArcSwapOption::from(None), + metrics, }); block_manager.endpoint.set_handler(block_manager.clone()); @@ -380,15 +386,28 @@ impl BlockManager { /// Write a block to disk async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { - self.mutation_lock + let request_start = SystemTime::now(); + let write_size = data.inner_buffer().len() as u64; + + let res = self + .mutation_lock .lock() .await .write_block(hash, data, self) - .await + .await?; + + self.metrics.bytes_written.add(write_size); + self.metrics + .block_write_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + + Ok(res) } /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result { + let request_start = SystemTime::now(); + let mut path = self.block_path(hash); let compressed = match self.is_block_compressed(hash).await { Ok(c) => c, @@ -414,6 +433,8 @@ impl BlockManager { }; if data.verify(*hash).is_err() { + self.metrics.corruption_counter.add(1); + self.mutation_lock .lock() .await @@ -423,6 +444,13 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } + self.metrics + .bytes_read + .add(data.inner_buffer().len() as u64); + self.metrics + .block_read_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + Ok(BlockRpc::PutBlock { hash: *hash, data }) } @@ -521,9 +549,18 @@ impl BlockManager { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); if now >= time_msec { + let start_time = SystemTime::now(); + let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let res = self.resync_block(&hash).await; + + self.metrics.resync_counter.add(1); + self.metrics + .resync_duration + .record(start_time.elapsed().map_or(0.0, |d| d.as_secs_f64())); + if let Err(e) = &res { + self.metrics.resync_error_counter.add(1); warn!("Error when resyncing {:?}: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; } @@ -607,6 +644,12 @@ impl BlockManager { need_nodes.len() ); + for node in need_nodes.iter() { + self.metrics + .resync_send_counter + .add(1, &[KeyValue::new("to", format!("{:?}", node))]); + } + let put_block_message = self.read_block(hash).await?; self.system .rpc @@ -644,6 +687,9 @@ impl BlockManager { ); let block_data = self.rpc_get_raw_block(hash).await?; + + self.metrics.resync_recv_counter.add(1); + self.write_block(hash, &block_data).await?; } @@ -819,6 +865,7 @@ impl BlockManagerLocked { path.set_extension("zst"); } fs::remove_file(path).await?; + mgr.metrics.delete_counter.add(1); } Ok(()) } -- cgit v1.2.3