aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock3
-rw-r--r--src/admin/Cargo.toml1
-rw-r--r--src/admin/metrics.rs10
-rw-r--r--src/garage/server.rs2
-rw-r--r--src/model/block.rs33
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/rpc_helper.rs17
-rw-r--r--src/table/table.rs62
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/metrics.rs35
11 files changed, 99 insertions, 68 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e70c1ea7..2f05701f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -841,7 +841,6 @@ version = "0.6.0"
dependencies = [
"futures",
"futures-util",
- "garage_model 0.6.0",
"garage_util 0.6.0",
"hex",
"http",
@@ -979,6 +978,7 @@ dependencies = [
"bytes 1.1.0",
"futures",
"futures-util",
+ "garage_admin",
"garage_util 0.6.0",
"gethostname",
"hex",
@@ -1084,6 +1084,7 @@ dependencies = [
"k8s-openapi",
"kube",
"netapp 0.4.0",
+ "opentelemetry",
"rand",
"rmp-serde 0.15.5",
"serde",
diff --git a/src/admin/Cargo.toml b/src/admin/Cargo.toml
index b6bf2b3b..01c16c57 100644
--- a/src/admin/Cargo.toml
+++ b/src/admin/Cargo.toml
@@ -13,7 +13,6 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_model = { version = "0.6.0", path = "../model" }
garage_util = { version = "0.6.0", path = "../util" }
hex = "0.4"
diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs
index e12373ab..64721af2 100644
--- a/src/admin/metrics.rs
+++ b/src/admin/metrics.rs
@@ -1,6 +1,7 @@
use std::convert::Infallible;
use std::sync::Arc;
use std::time::SystemTime;
+use std::net::SocketAddr;
use futures::future::*;
use hyper::{
@@ -19,7 +20,6 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
-use garage_model::garage::Garage;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -111,7 +111,7 @@ impl AdminServer {
/// run execute the admin server on the designated HTTP port and listen for requests
pub async fn run(
self,
- garage: Arc<Garage>,
+ bind_addr: SocketAddr,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let admin_server = Arc::new(self);
@@ -142,11 +142,9 @@ impl AdminServer {
}
});
- let addr = &garage.config.admin_api.bind_addr;
-
- let server = Server::bind(addr).serve(make_svc);
+ let server = Server::bind(&bind_addr).serve(make_svc);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("Admin server listening on http://{}", addr);
+ info!("Admin server listening on http://{}", bind_addr);
graceful.await?;
Ok(())
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 6ef36273..739dedbe 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -70,7 +70,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Configure and run admin web server...");
let admin_server =
- tokio::spawn(admin_server_init.run(garage.clone(), wait_from(watch_cancel.clone())));
+ tokio::spawn(admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())));
// Stuff runs
diff --git a/src/model/block.rs b/src/model/block.rs
index 3799c6aa..058c71fd 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -1,7 +1,7 @@
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use std::time::{Duration, SystemTime};
+use std::time::{Duration};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
@@ -23,6 +23,7 @@ use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use garage_util::metrics::RecordDuration;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -391,7 +392,6 @@ impl BlockManager {
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
- let request_start = SystemTime::now();
let write_size = data.inner_buffer().len() as u64;
let res = self
@@ -399,20 +399,26 @@ impl BlockManager {
.lock()
.await
.write_block(hash, data, self)
+ .bound_record_duration(&self.metrics.block_write_duration)
.await?;
self.metrics.bytes_written.add(write_size);
- self.metrics
- .block_write_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(res)
}
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
- let request_start = SystemTime::now();
+ let data = self.read_block_internal(hash)
+ .bound_record_duration(&self.metrics.block_read_duration)
+ .await?;
+
+ self.metrics.bytes_read.add(data.inner_buffer().len() as u64);
+
+ Ok(BlockRpc::PutBlock { hash: *hash, data })
+ }
+ async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c,
@@ -449,14 +455,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
- self.metrics
- .bytes_read
- .add(data.inner_buffer().len() as u64);
- self.metrics
- .block_read_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
-
- Ok(BlockRpc::PutBlock { hash: *hash, data })
+ Ok(data)
}
/// Check if this node should have a block, but don't actually have it
@@ -554,8 +553,6 @@ impl BlockManager {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
if now >= time_msec {
- let start_time = SystemTime::now();
-
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let tracer = opentelemetry::global::tracer("garage");
@@ -574,12 +571,10 @@ impl BlockManager {
let res = self
.resync_block(&hash)
.with_context(Context::current_with_span(span))
+ .bound_record_duration(&self.metrics.resync_duration)
.await;
self.metrics.resync_counter.add(1);
- self.metrics
- .resync_duration
- .record(start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()));
if let Err(e) = &res {
self.metrics.resync_error_counter.add(1);
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 3ab9e7da..94d6f682 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.6.0", path = "../util" }
+garage_admin = { version = "0.6.0", path = "../admin" }
arc-swap = "1.0"
bytes = "1.0"
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 4b4235f1..f8bef47f 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,6 +1,6 @@
//! Contain structs related to making RPCs
use std::sync::Arc;
-use std::time::{Duration, SystemTime};
+use std::time::{Duration};
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
@@ -23,6 +23,7 @@ pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
@@ -133,7 +134,6 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
- let queueing_start_time = SystemTime::now();
let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
@@ -141,19 +141,14 @@ impl RpcHelper {
.0
.request_buffer_semaphore
.acquire_many(msg_size)
+ .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?;
- self.0.metrics.rpc_queueing_time.record(
- queueing_start_time
- .elapsed()
- .map_or(0.0, |d| d.as_secs_f64()),
- &metric_tags,
- );
self.0.metrics.rpc_counter.add(1, &metric_tags);
- let rpc_start_time = SystemTime::now();
let node_id = to.into();
- let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority);
+ let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority)
+ .record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! {
res = rpc_call => {
@@ -164,8 +159,6 @@ impl RpcHelper {
}
let res = res?;
- self.0.metrics.rpc_duration
- .record(rpc_start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), &metric_tags);
if res.is_err() {
self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 3ac3bc5b..9ba243c0 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
-use std::time::{Duration, SystemTime};
+use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
@@ -9,6 +9,7 @@ use serde_bytes::ByteBuf;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::metrics::RecordDuration;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -81,8 +82,6 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
- let request_start = SystemTime::now();
-
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@@ -100,19 +99,22 @@ where
.with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
+ .bound_record_duration(&self.data.metrics.put_request_duration)
.await?;
self.data.metrics.put_request_counter.add(1);
- self.data
- .metrics
- .put_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
- let request_start = SystemTime::now();
+ self.insert_many_internal(entries)
+ .bound_record_duration(&self.data.metrics.put_request_duration)
+ .await?;
+ self.data.metrics.put_request_counter.add(1);
+ Ok(())
+ }
+ async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> {
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
@@ -150,12 +152,6 @@ where
if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
- self.data.metrics.put_request_counter.add(1);
- self.data
- .metrics
- .put_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
-
Ok(())
}
}
@@ -165,11 +161,20 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
- let request_start = SystemTime::now();
+ let res = self.get_internal(partition_key, sort_key)
+ .bound_record_duration(&self.data.metrics.get_request_duration)
+ .await?;
+ self.data.metrics.get_request_counter.add(1);
+ Ok(res)
+ }
+ async fn get_internal(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ sort_key: &F::S,
+ ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
- //eprintln!("get who: {:?}", who);
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
@@ -217,11 +222,6 @@ where
}
}
- self.data.metrics.get_request_counter.add(1);
- self.data
- .metrics
- .get_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret)
}
@@ -232,8 +232,20 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
- let request_start = SystemTime::now();
+ let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit)
+ .bound_record_duration(&self.data.metrics.get_request_duration)
+ .await?;
+ self.data.metrics.get_request_counter.add(1);
+ Ok(res)
+ }
+ async fn get_range_internal(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ begin_sort_key: Option<F::S>,
+ filter: Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
@@ -291,12 +303,6 @@ where
.take(limit)
.map(|(_k, v)| v.take().unwrap())
.collect::<Vec<_>>();
-
- self.data.metrics.get_request_counter.add(1);
- self.data
- .metrics
- .get_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret_vec)
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 76b73f4b..e1d516de 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -42,3 +42,5 @@ hyper = "0.14"
kube = { version = "0.62", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.13", features = ["v1_22"] }
+
+opentelemetry = "0.17"
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 6856f656..7ed00034 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -8,6 +8,7 @@ pub mod config;
pub mod crdt;
pub mod data;
pub mod error;
+pub mod metrics;
pub mod persister;
pub mod time;
pub mod token_bucket;
diff --git a/src/util/metrics.rs b/src/util/metrics.rs
new file mode 100644
index 00000000..b3b1fc3c
--- /dev/null
+++ b/src/util/metrics.rs
@@ -0,0 +1,35 @@
+use std::time::SystemTime;
+
+use futures::{future::BoxFuture, Future, FutureExt};
+
+use opentelemetry::{KeyValue, metrics::*};
+
+pub trait RecordDuration<'a>: 'a {
+ type Output;
+
+ fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output>;
+ fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>;
+}
+
+impl<'a, T, O> RecordDuration<'a> for T
+where T: Future<Output=O> + Send + 'a {
+ type Output = O;
+
+ fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output> {
+ async move {
+ let request_start = SystemTime::now();
+ let res = self.await;
+ r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), attributes);
+ res
+ }.boxed()
+ }
+
+ fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
+ async move {
+ let request_start = SystemTime::now();
+ let res = self.await;
+ r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+ res
+ }.boxed()
+ }
+}