diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-18 20:10:46 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-02-18 20:10:46 +0100 |
commit | fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0 (patch) | |
tree | 9ea5762b36d81c87bf0abdad1113cebf3d75a4c4 /src/client.rs | |
parent | ab0f7785ae73e2f5aaf912fc3c0f2cd724967546 (diff) | |
download | netapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.tar.gz netapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.zip |
Correct implementation of distributed tracing
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 39 |
1 files changed, 26 insertions, 13 deletions
diff --git a/src/client.rs b/src/client.rs index 7ef772d..e08b30b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,6 +11,14 @@ use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::compat::*; +#[cfg(feature = "telemetry")] +use opentelemetry::{ + trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, + Context, +}; +#[cfg(feature = "telemetry")] +use opentelemetry_contrib::trace::propagator::binary::*; + use futures::io::AsyncReadExt; use async_trait::async_trait; @@ -127,18 +135,14 @@ impl ClientConn { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - use opentelemetry::{trace::{TraceId, TraceContextExt}, Context}; - let trace_id_int = Context::current() - .span() - .span_context() - .trace_id(); - let trace_id = if trace_id_int == TraceId::INVALID { - None - } else { - Some(trace_id_int.to_bytes().to_vec()) - }; + let tracer = opentelemetry::global::tracer("netapp"); + let span = tracer.span_builder(format!("RPC >> {}", path)) + .with_kind(SpanKind::Server) + .start(&tracer); + let propagator = BinaryPropagator::new(); + let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec()); } else { - let trace_id: Option<Vec<u8>> = None; + let telemetry_id: Option<Vec<u8>> = None; } }; @@ -148,7 +152,7 @@ impl ClientConn { bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]); bytes.extend_from_slice(path.as_bytes()); - if let Some(by) = trace_id { + if let Some(by) = telemetry_id { bytes.push(by.len() as u8); bytes.extend(by); } else { @@ -172,7 +176,16 @@ impl ClientConn { trace!("request: query_send {}, {} bytes", id, bytes.len()); query_send.send((id, prio, bytes))?; - let resp = resp_recv.await?; + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + let resp = resp_recv + .with_context(Context::current_with_span(span)) + .await?; + } else { + let resp = resp_recv.await?; + } + } + if resp.is_empty() { return Err(Error::Message( "Response is 0 bytes, either a collision or a protocol error".into(), |