From 818daa5c786813fdf50fecb6022e29b18e509b62 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 22 Feb 2022 13:53:59 +0100 Subject: Refactor how durations are measured --- src/admin/Cargo.toml | 1 - src/admin/metrics.rs | 10 ++++----- src/garage/server.rs | 2 +- src/model/block.rs | 33 ++++++++++++--------------- src/rpc/Cargo.toml | 1 + src/rpc/rpc_helper.rs | 17 +++++--------- src/table/table.rs | 62 ++++++++++++++++++++++++++++----------------------- src/util/Cargo.toml | 2 ++ src/util/lib.rs | 1 + src/util/metrics.rs | 35 +++++++++++++++++++++++++++++ 10 files changed, 97 insertions(+), 67 deletions(-) create mode 100644 src/util/metrics.rs (limited to 'src') diff --git a/src/admin/Cargo.toml b/src/admin/Cargo.toml index b6bf2b3b..01c16c57 100644 --- a/src/admin/Cargo.toml +++ b/src/admin/Cargo.toml @@ -13,7 +13,6 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.6.0", path = "../model" } garage_util = { version = "0.6.0", path = "../util" } hex = "0.4" diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs index e12373ab..64721af2 100644 --- a/src/admin/metrics.rs +++ b/src/admin/metrics.rs @@ -1,6 +1,7 @@ use std::convert::Infallible; use std::sync::Arc; use std::time::SystemTime; +use std::net::SocketAddr; use futures::future::*; use hyper::{ @@ -19,7 +20,6 @@ use opentelemetry_prometheus::PrometheusExporter; use prometheus::{Encoder, TextEncoder}; -use garage_model::garage::Garage; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -111,7 +111,7 @@ impl AdminServer { /// run execute the admin server on the designated HTTP port and listen for requests pub async fn run( self, - garage: Arc, + bind_addr: SocketAddr, shutdown_signal: impl Future, ) -> Result<(), GarageError> { let admin_server = Arc::new(self); @@ -142,11 +142,9 @@ impl AdminServer { } }); - let addr = &garage.config.admin_api.bind_addr; - - let server = Server::bind(addr).serve(make_svc); + let server = Server::bind(&bind_addr).serve(make_svc); let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Admin server listening on http://{}", addr); + info!("Admin server listening on http://{}", bind_addr); graceful.await?; Ok(()) diff --git a/src/garage/server.rs b/src/garage/server.rs index 6ef36273..739dedbe 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -70,7 +70,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Configure and run admin web server..."); let admin_server = - tokio::spawn(admin_server_init.run(garage.clone(), wait_from(watch_cancel.clone()))); + tokio::spawn(admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone()))); // Stuff runs diff --git a/src/model/block.rs b/src/model/block.rs index 3799c6aa..058c71fd 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::{Duration}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -23,6 +23,7 @@ use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use garage_util::metrics::RecordDuration; use garage_rpc::system::System; use garage_rpc::*; @@ -391,7 +392,6 @@ impl BlockManager { /// Write a block to disk async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { - let request_start = SystemTime::now(); let write_size = data.inner_buffer().len() as u64; let res = self @@ -399,20 +399,26 @@ impl BlockManager { .lock() .await .write_block(hash, data, self) + .bound_record_duration(&self.metrics.block_write_duration) .await?; self.metrics.bytes_written.add(write_size); - self.metrics - .block_write_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(res) } /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result { - let request_start = SystemTime::now(); + let data = self.read_block_internal(hash) + .bound_record_duration(&self.metrics.block_read_duration) + .await?; + + self.metrics.bytes_read.add(data.inner_buffer().len() as u64); + + Ok(BlockRpc::PutBlock { hash: *hash, data }) + } + async fn read_block_internal(&self, hash: &Hash) -> Result { let mut path = self.block_path(hash); let compressed = match self.is_block_compressed(hash).await { Ok(c) => c, @@ -449,14 +455,7 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); - self.metrics - .block_read_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); - - Ok(BlockRpc::PutBlock { hash: *hash, data }) + Ok(data) } /// Check if this node should have a block, but don't actually have it @@ -554,8 +553,6 @@ impl BlockManager { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); if now >= time_msec { - let start_time = SystemTime::now(); - let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let tracer = opentelemetry::global::tracer("garage"); @@ -574,12 +571,10 @@ impl BlockManager { let res = self .resync_block(&hash) .with_context(Context::current_with_span(span)) + .bound_record_duration(&self.metrics.resync_duration) .await; self.metrics.resync_counter.add(1); - self.metrics - .resync_duration - .record(start_time.elapsed().map_or(0.0, |d| d.as_secs_f64())); if let Err(e) = &res { self.metrics.resync_error_counter.add(1); diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 3ab9e7da..94d6f682 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,6 +15,7 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.6.0", path = "../util" } +garage_admin = { version = "0.6.0", path = "../admin" } arc-swap = "1.0" bytes = "1.0" diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 4b4235f1..f8bef47f 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,6 +1,6 @@ //! Contain structs related to making RPCs use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::{Duration}; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; @@ -23,6 +23,7 @@ pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_util::metrics::RecordDuration; use crate::metrics::RpcMetrics; use crate::ring::Ring; @@ -133,7 +134,6 @@ impl RpcHelper { M: Rpc>, H: EndpointHandler, { - let queueing_start_time = SystemTime::now(); let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())]; let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; @@ -141,19 +141,14 @@ impl RpcHelper { .0 .request_buffer_semaphore .acquire_many(msg_size) + .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) .await?; - self.0.metrics.rpc_queueing_time.record( - queueing_start_time - .elapsed() - .map_or(0.0, |d| d.as_secs_f64()), - &metric_tags, - ); self.0.metrics.rpc_counter.add(1, &metric_tags); - let rpc_start_time = SystemTime::now(); let node_id = to.into(); - let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority); + let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority) + .record_duration(&self.0.metrics.rpc_duration, &metric_tags); select! { res = rpc_call => { @@ -164,8 +159,6 @@ impl RpcHelper { } let res = res?; - self.0.metrics.rpc_duration - .record(rpc_start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), &metric_tags); if res.is_err() { self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); } diff --git a/src/table/table.rs b/src/table/table.rs index 3ac3bc5b..9ba243c0 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use async_trait::async_trait; use futures::stream::*; @@ -9,6 +9,7 @@ use serde_bytes::ByteBuf; use garage_util::data::*; use garage_util::error::Error; +use garage_util::metrics::RecordDuration; use garage_rpc::system::System; use garage_rpc::*; @@ -81,8 +82,6 @@ where } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { - let request_start = SystemTime::now(); - let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); @@ -100,19 +99,22 @@ where .with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) + .bound_record_duration(&self.data.metrics.put_request_duration) .await?; self.data.metrics.put_request_counter.add(1); - self.data - .metrics - .put_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { - let request_start = SystemTime::now(); + self.insert_many_internal(entries) + .bound_record_duration(&self.data.metrics.put_request_duration) + .await?; + self.data.metrics.put_request_counter.add(1); + Ok(()) + } + async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { @@ -150,12 +152,6 @@ where if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { - self.data.metrics.put_request_counter.add(1); - self.data - .metrics - .put_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); - Ok(()) } } @@ -165,11 +161,20 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { - let request_start = SystemTime::now(); + let res = self.get_internal(partition_key, sort_key) + .bound_record_duration(&self.data.metrics.get_request_duration) + .await?; + self.data.metrics.get_request_counter.add(1); + Ok(res) + } + async fn get_internal( + self: &Arc, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - //eprintln!("get who: {:?}", who); let rpc = TableRpc::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self @@ -217,11 +222,6 @@ where } } - self.data.metrics.get_request_counter.add(1); - self.data - .metrics - .get_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret) } @@ -232,8 +232,20 @@ where filter: Option, limit: usize, ) -> Result, Error> { - let request_start = SystemTime::now(); + let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit) + .bound_record_duration(&self.data.metrics.get_request_duration) + .await?; + self.data.metrics.get_request_counter.add(1); + Ok(res) + } + async fn get_range_internal( + self: &Arc, + partition_key: &F::P, + begin_sort_key: Option, + filter: Option, + limit: usize, + ) -> Result, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); @@ -291,12 +303,6 @@ where .take(limit) .map(|(_k, v)| v.take().unwrap()) .collect::>(); - - self.data.metrics.get_request_counter.add(1); - self.data - .metrics - .get_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret_vec) } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 76b73f4b..e1d516de 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -42,3 +42,5 @@ hyper = "0.14" kube = { version = "0.62", features = ["runtime", "derive"] } k8s-openapi = { version = "0.13", features = ["v1_22"] } + +opentelemetry = "0.17" diff --git a/src/util/lib.rs b/src/util/lib.rs index 6856f656..7ed00034 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -8,6 +8,7 @@ pub mod config; pub mod crdt; pub mod data; pub mod error; +pub mod metrics; pub mod persister; pub mod time; pub mod token_bucket; diff --git a/src/util/metrics.rs b/src/util/metrics.rs new file mode 100644 index 00000000..b3b1fc3c --- /dev/null +++ b/src/util/metrics.rs @@ -0,0 +1,35 @@ +use std::time::SystemTime; + +use futures::{future::BoxFuture, Future, FutureExt}; + +use opentelemetry::{KeyValue, metrics::*}; + +pub trait RecordDuration<'a>: 'a { + type Output; + + fn record_duration(self, r: &'a ValueRecorder, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output>; + fn bound_record_duration(self, r: &'a BoundValueRecorder) -> BoxFuture<'a, Self::Output>; +} + +impl<'a, T, O> RecordDuration<'a> for T +where T: Future + Send + 'a { + type Output = O; + + fn record_duration(self, r: &'a ValueRecorder, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output> { + async move { + let request_start = SystemTime::now(); + let res = self.await; + r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), attributes); + res + }.boxed() + } + + fn bound_record_duration(self, r: &'a BoundValueRecorder) -> BoxFuture<'a, Self::Output> { + async move { + let request_start = SystemTime::now(); + let res = self.await; + r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + res + }.boxed() + } +} -- cgit v1.2.3