diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/block.rs | 55 | ||||
-rw-r--r-- | src/model/block_metrics.rs | 93 | ||||
-rw-r--r-- | src/model/lib.rs | 1 |
4 files changed, 146 insertions, 4 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 14e49557..10a4c838 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -36,6 +36,7 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } netapp = "0.3.0" diff --git a/src/model/block.rs b/src/model/block.rs index 1173c7b3..9e939c24 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,12 +1,13 @@ use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures::future::*; use futures::select; +use opentelemetry::KeyValue; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -23,8 +24,8 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; +use crate::block_metrics::*; use crate::block_ref_table::*; - use crate::garage::Garage; /// Size under which data will be stored inlined in database instead of as files @@ -154,6 +155,8 @@ pub struct BlockManager { system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, pub(crate) garage: ArcSwapOption<Garage>, + + metrics: BlockManagerMetrics, } // This custom struct contains functions that must only be ran @@ -182,6 +185,8 @@ impl BlockManager { let manager_locked = BlockManagerLocked(); + let metrics = BlockManagerMetrics::new(resync_queue.clone()); + let block_manager = Arc::new(Self { replication, data_dir, @@ -192,6 +197,7 @@ impl BlockManager { system, endpoint, garage: ArcSwapOption::from(None), + metrics, }); block_manager.endpoint.set_handler(block_manager.clone()); @@ -380,15 +386,28 @@ impl BlockManager { /// Write a block to disk async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> { - self.mutation_lock + let request_start = SystemTime::now(); + let write_size = data.inner_buffer().len() as u64; + + let res = self + .mutation_lock .lock() .await .write_block(hash, data, self) - .await + .await?; + + self.metrics.bytes_written.add(write_size); + self.metrics + .block_write_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + + Ok(res) } /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { + let request_start = SystemTime::now(); + let mut path = self.block_path(hash); let compressed = match self.is_block_compressed(hash).await { Ok(c) => c, @@ -414,6 +433,8 @@ impl BlockManager { }; if data.verify(*hash).is_err() { + self.metrics.corruption_counter.add(1); + self.mutation_lock .lock() .await @@ -423,6 +444,13 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } + self.metrics + .bytes_read + .add(data.inner_buffer().len() as u64); + self.metrics + .block_read_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + Ok(BlockRpc::PutBlock { hash: *hash, data }) } @@ -521,9 +549,18 @@ impl BlockManager { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); if now >= time_msec { + let start_time = SystemTime::now(); + let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let res = self.resync_block(&hash).await; + + self.metrics.resync_counter.add(1); + self.metrics + .resync_duration + .record(start_time.elapsed().map_or(0.0, |d| d.as_secs_f64())); + if let Err(e) = &res { + self.metrics.resync_error_counter.add(1); warn!("Error when resyncing {:?}: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; } @@ -607,6 +644,12 @@ impl BlockManager { need_nodes.len() ); + for node in need_nodes.iter() { + self.metrics + .resync_send_counter + .add(1, &[KeyValue::new("to", format!("{:?}", node))]); + } + let put_block_message = self.read_block(hash).await?; self.system .rpc @@ -644,6 +687,9 @@ impl BlockManager { ); let block_data = self.rpc_get_raw_block(hash).await?; + + self.metrics.resync_recv_counter.add(1); + self.write_block(hash, &block_data).await?; } @@ -819,6 +865,7 @@ impl BlockManagerLocked { path.set_extension("zst"); } fs::remove_file(path).await?; + mgr.metrics.delete_counter.add(1); } Ok(()) } diff --git a/src/model/block_metrics.rs b/src/model/block_metrics.rs new file mode 100644 index 00000000..7ef9a117 --- /dev/null +++ b/src/model/block_metrics.rs @@ -0,0 +1,93 @@ +use opentelemetry::{global, metrics::*}; + +/// TableMetrics reference all counter used for metrics +pub struct BlockManagerMetrics { + pub(crate) _resync_queue_len: ValueObserver<u64>, + + pub(crate) resync_counter: BoundCounter<u64>, + pub(crate) resync_error_counter: BoundCounter<u64>, + pub(crate) resync_duration: BoundValueRecorder<f64>, + pub(crate) resync_send_counter: Counter<u64>, + pub(crate) resync_recv_counter: BoundCounter<u64>, + + pub(crate) bytes_read: BoundCounter<u64>, + pub(crate) block_read_duration: BoundValueRecorder<f64>, + pub(crate) bytes_written: BoundCounter<u64>, + pub(crate) block_write_duration: BoundValueRecorder<f64>, + pub(crate) delete_counter: BoundCounter<u64>, + + pub(crate) corruption_counter: BoundCounter<u64>, +} + +impl BlockManagerMetrics { + pub fn new(resync_queue: sled::Tree) -> Self { + let meter = global::meter("garage_model/block"); + Self { + _resync_queue_len: meter + .u64_value_observer("block.resync_queue_length", move |observer| { + observer.observe(resync_queue.len() as u64, &[]) + }) + .with_description( + "Number of block hashes queued for local check and possible resync", + ) + .init(), + + resync_counter: meter + .u64_counter("block.resync_counter") + .with_description("Number of calls to resync_block") + .init() + .bind(&[]), + resync_error_counter: meter + .u64_counter("block.resync_error_counter") + .with_description("Number of calls to resync_block that returned an error") + .init() + .bind(&[]), + resync_duration: meter + .f64_value_recorder("block.resync_duration") + .with_description("Duration of resync_block operations") + .init() + .bind(&[]), + resync_send_counter: meter + .u64_counter("block.resync_send_counter") + .with_description("Number of blocks sent to another node in resync operations") + .init(), + resync_recv_counter: meter + .u64_counter("block.resync_recv_counter") + .with_description("Number of blocks received from other nodes in resync operations") + .init() + .bind(&[]), + + bytes_read: meter + .u64_counter("block.bytes_read") + .with_description("Number of bytes read from disk") + .init() + .bind(&[]), + block_read_duration: meter + .f64_value_recorder("block.read_duration") + .with_description("Duration of block read operations") + .init() + .bind(&[]), + bytes_written: meter + .u64_counter("block.bytes_written") + .with_description("Number of bytes written to disk") + .init() + .bind(&[]), + block_write_duration: meter + .f64_value_recorder("block.write_duration") + .with_description("Duration of block write operations") + .init() + .bind(&[]), + delete_counter: meter + .u64_counter("block.delete_counter") + .with_description("Number of blocks deleted") + .init() + .bind(&[]), + + corruption_counter: meter + .u64_counter("block.corruption_counter") + .with_description("Data corruptions detected on block reads") + .init() + .bind(&[]), + } + } +} diff --git a/src/model/lib.rs b/src/model/lib.rs index 9deaae9d..c8677603 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -11,6 +11,7 @@ pub mod object_table; pub mod version_table; pub mod block; +mod block_metrics; pub mod garage; pub mod helper; |