aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/block.rs55
-rw-r--r--src/model/block_metrics.rs93
-rw-r--r--src/model/lib.rs1
4 files changed, 146 insertions, 4 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 14e49557..10a4c838 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -36,6 +36,7 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
diff --git a/src/model/block.rs b/src/model/block.rs
index 1173c7b3..9e939c24 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -1,12 +1,13 @@
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures::future::*;
use futures::select;
+use opentelemetry::KeyValue;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -23,8 +24,8 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
+use crate::block_metrics::*;
use crate::block_ref_table::*;
-
use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
@@ -154,6 +155,8 @@ pub struct BlockManager {
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
+
+ metrics: BlockManagerMetrics,
}
// This custom struct contains functions that must only be ran
@@ -182,6 +185,8 @@ impl BlockManager {
let manager_locked = BlockManagerLocked();
+ let metrics = BlockManagerMetrics::new(resync_queue.clone());
+
let block_manager = Arc::new(Self {
replication,
data_dir,
@@ -192,6 +197,7 @@ impl BlockManager {
system,
endpoint,
garage: ArcSwapOption::from(None),
+ metrics,
});
block_manager.endpoint.set_handler(block_manager.clone());
@@ -380,15 +386,28 @@ impl BlockManager {
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
- self.mutation_lock
+ let request_start = SystemTime::now();
+ let write_size = data.inner_buffer().len() as u64;
+
+ let res = self
+ .mutation_lock
.lock()
.await
.write_block(hash, data, self)
- .await
+ .await?;
+
+ self.metrics.bytes_written.add(write_size);
+ self.metrics
+ .block_write_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
+ Ok(res)
}
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ let request_start = SystemTime::now();
+
let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c,
@@ -414,6 +433,8 @@ impl BlockManager {
};
if data.verify(*hash).is_err() {
+ self.metrics.corruption_counter.add(1);
+
self.mutation_lock
.lock()
.await
@@ -423,6 +444,13 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
+ self.metrics
+ .bytes_read
+ .add(data.inner_buffer().len() as u64);
+ self.metrics
+ .block_read_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
Ok(BlockRpc::PutBlock { hash: *hash, data })
}
@@ -521,9 +549,18 @@ impl BlockManager {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
if now >= time_msec {
+ let start_time = SystemTime::now();
+
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await;
+
+ self.metrics.resync_counter.add(1);
+ self.metrics
+ .resync_duration
+ .record(start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
if let Err(e) = &res {
+ self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
}
@@ -607,6 +644,12 @@ impl BlockManager {
need_nodes.len()
);
+ for node in need_nodes.iter() {
+ self.metrics
+ .resync_send_counter
+ .add(1, &[KeyValue::new("to", format!("{:?}", node))]);
+ }
+
let put_block_message = self.read_block(hash).await?;
self.system
.rpc
@@ -644,6 +687,9 @@ impl BlockManager {
);
let block_data = self.rpc_get_raw_block(hash).await?;
+
+ self.metrics.resync_recv_counter.add(1);
+
self.write_block(hash, &block_data).await?;
}
@@ -819,6 +865,7 @@ impl BlockManagerLocked {
path.set_extension("zst");
}
fs::remove_file(path).await?;
+ mgr.metrics.delete_counter.add(1);
}
Ok(())
}
diff --git a/src/model/block_metrics.rs b/src/model/block_metrics.rs
new file mode 100644
index 00000000..7ef9a117
--- /dev/null
+++ b/src/model/block_metrics.rs
@@ -0,0 +1,93 @@
+use opentelemetry::{global, metrics::*};
+
+/// TableMetrics reference all counter used for metrics
+pub struct BlockManagerMetrics {
+ pub(crate) _resync_queue_len: ValueObserver<u64>,
+
+ pub(crate) resync_counter: BoundCounter<u64>,
+ pub(crate) resync_error_counter: BoundCounter<u64>,
+ pub(crate) resync_duration: BoundValueRecorder<f64>,
+ pub(crate) resync_send_counter: Counter<u64>,
+ pub(crate) resync_recv_counter: BoundCounter<u64>,
+
+ pub(crate) bytes_read: BoundCounter<u64>,
+ pub(crate) block_read_duration: BoundValueRecorder<f64>,
+ pub(crate) bytes_written: BoundCounter<u64>,
+ pub(crate) block_write_duration: BoundValueRecorder<f64>,
+ pub(crate) delete_counter: BoundCounter<u64>,
+
+ pub(crate) corruption_counter: BoundCounter<u64>,
+}
+
+impl BlockManagerMetrics {
+ pub fn new(resync_queue: sled::Tree) -> Self {
+ let meter = global::meter("garage_model/block");
+ Self {
+ _resync_queue_len: meter
+ .u64_value_observer("block.resync_queue_length", move |observer| {
+ observer.observe(resync_queue.len() as u64, &[])
+ })
+ .with_description(
+ "Number of block hashes queued for local check and possible resync",
+ )
+ .init(),
+
+ resync_counter: meter
+ .u64_counter("block.resync_counter")
+ .with_description("Number of calls to resync_block")
+ .init()
+ .bind(&[]),
+ resync_error_counter: meter
+ .u64_counter("block.resync_error_counter")
+ .with_description("Number of calls to resync_block that returned an error")
+ .init()
+ .bind(&[]),
+ resync_duration: meter
+ .f64_value_recorder("block.resync_duration")
+ .with_description("Duration of resync_block operations")
+ .init()
+ .bind(&[]),
+ resync_send_counter: meter
+ .u64_counter("block.resync_send_counter")
+ .with_description("Number of blocks sent to another node in resync operations")
+ .init(),
+ resync_recv_counter: meter
+ .u64_counter("block.resync_recv_counter")
+ .with_description("Number of blocks received from other nodes in resync operations")
+ .init()
+ .bind(&[]),
+
+ bytes_read: meter
+ .u64_counter("block.bytes_read")
+ .with_description("Number of bytes read from disk")
+ .init()
+ .bind(&[]),
+ block_read_duration: meter
+ .f64_value_recorder("block.read_duration")
+ .with_description("Duration of block read operations")
+ .init()
+ .bind(&[]),
+ bytes_written: meter
+ .u64_counter("block.bytes_written")
+ .with_description("Number of bytes written to disk")
+ .init()
+ .bind(&[]),
+ block_write_duration: meter
+ .f64_value_recorder("block.write_duration")
+ .with_description("Duration of block write operations")
+ .init()
+ .bind(&[]),
+ delete_counter: meter
+ .u64_counter("block.delete_counter")
+ .with_description("Number of blocks deleted")
+ .init()
+ .bind(&[]),
+
+ corruption_counter: meter
+ .u64_counter("block.corruption_counter")
+ .with_description("Data corruptions detected on block reads")
+ .init()
+ .bind(&[]),
+ }
+ }
+}
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 9deaae9d..c8677603 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -11,6 +11,7 @@ pub mod object_table;
pub mod version_table;
pub mod block;
+mod block_metrics;
pub mod garage;
pub mod helper;