diff options
-rw-r--r-- | src/admin/metrics.rs | 10 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 16 | ||||
-rw-r--r-- | src/table/table.rs | 39 |
3 files changed, 54 insertions, 11 deletions
diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs index 02549fe9..cbc737d3 100644 --- a/src/admin/metrics.rs +++ b/src/admin/metrics.rs @@ -20,8 +20,8 @@ use opentelemetry_prometheus::PrometheusExporter; use prometheus::{Encoder, TextEncoder}; -use garage_util::data::*; use garage_util::error::Error as GarageError; +use garage_util::metrics::*; // serve_req on metric endpoint async fn serve_req( @@ -125,15 +125,9 @@ impl AdminServer { async move { Ok::<_, Infallible>(service_fn(move |req| { let tracer = opentelemetry::global::tracer("garage"); - let uuid = gen_uuid(); let span = tracer .span_builder("admin/request") - .with_trace_id( - opentelemetry::trace::TraceId::from_hex(&hex::encode( - &uuid.as_slice()[..16], - )) - .unwrap(), - ) + .with_trace_id(gen_trace_id()) .start(&tracer); serve_req(req, admin_server.clone()) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 4114724f..89225511 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -234,9 +234,23 @@ impl RpcHelper { let quorum = strategy.rs_quorum.unwrap_or(to.len()); let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len())); + let span_name = if strategy.rs_interrupt_after_quorum { + format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len()) + } else { + format!( + "RPC {} to {} (quorum {})", + endpoint.path(), + to.len(), + quorum + ) + }; + let mut span = tracer.start(span_name); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); + span.set_attribute(KeyValue::new( + "interrupt_after_quorum", + strategy.rs_interrupt_after_quorum.to_string(), + )); self.try_call_many_internal(endpoint, to, msg, strategy, quorum) .with_context(Context::current_with_span(span)) diff --git a/src/table/table.rs b/src/table/table.rs index 69cac41a..7f87a449 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -7,6 +7,11 @@ use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use opentelemetry::{ + trace::{FutureExt, TraceContextExt, Tracer}, + Context, +}; + use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -82,6 +87,20 @@ where } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let tracer = opentelemetry::global::tracer("garage_table"); + let span = tracer.start(format!("{} insert", F::TABLE_NAME)); + + self.insert_internal(e) + .bound_record_duration(&self.data.metrics.put_request_duration) + .with_context(Context::current_with_span(span)) + .await?; + + self.data.metrics.put_request_counter.add(1); + + Ok(()) + } + + async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); @@ -99,18 +118,22 @@ where .with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) - .bound_record_duration(&self.data.metrics.put_request_duration) .await?; - self.data.metrics.put_request_counter.add(1); Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let tracer = opentelemetry::global::tracer("garage_table"); + let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) + .with_context(Context::current_with_span(span)) .await?; + self.data.metrics.put_request_counter.add(1); + Ok(()) } @@ -161,11 +184,17 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result<Option<F::E>, Error> { + let tracer = opentelemetry::global::tracer("garage_table"); + let span = tracer.start(format!("{} get", F::TABLE_NAME)); + let res = self .get_internal(partition_key, sort_key) .bound_record_duration(&self.data.metrics.get_request_duration) + .with_context(Context::current_with_span(span)) .await?; + self.data.metrics.get_request_counter.add(1); + Ok(res) } @@ -233,11 +262,17 @@ where filter: Option<F::Filter>, limit: usize, ) -> Result<Vec<F::E>, Error> { + let tracer = opentelemetry::global::tracer("garage_table"); + let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); + let res = self .get_range_internal(partition_key, begin_sort_key, filter, limit) .bound_record_duration(&self.data.metrics.get_request_duration) + .with_context(Context::current_with_span(span)) .await?; + self.data.metrics.get_request_counter.add(1); + Ok(res) } |