From fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 18 Feb 2022 20:10:46 +0100 Subject: Correct implementation of distributed tracing --- src/client.rs | 39 ++++++++++++++++++++++++++------------- src/server.rs | 47 ++++++++++++++++++++++++++--------------------- 2 files changed, 52 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index 7ef772d..e08b30b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,6 +11,14 @@ use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::compat::*; +#[cfg(feature = "telemetry")] +use opentelemetry::{ + trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, + Context, +}; +#[cfg(feature = "telemetry")] +use opentelemetry_contrib::trace::propagator::binary::*; + use futures::io::AsyncReadExt; use async_trait::async_trait; @@ -127,18 +135,14 @@ impl ClientConn { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - use opentelemetry::{trace::{TraceId, TraceContextExt}, Context}; - let trace_id_int = Context::current() - .span() - .span_context() - .trace_id(); - let trace_id = if trace_id_int == TraceId::INVALID { - None - } else { - Some(trace_id_int.to_bytes().to_vec()) - }; + let tracer = opentelemetry::global::tracer("netapp"); + let span = tracer.span_builder(format!("RPC >> {}", path)) + .with_kind(SpanKind::Server) + .start(&tracer); + let propagator = BinaryPropagator::new(); + let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec()); } else { - let trace_id: Option> = None; + let telemetry_id: Option> = None; } }; @@ -148,7 +152,7 @@ impl ClientConn { bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]); bytes.extend_from_slice(path.as_bytes()); - if let Some(by) = trace_id { + if let Some(by) = telemetry_id { bytes.push(by.len() as u8); bytes.extend(by); } else { @@ -172,7 +176,16 @@ impl ClientConn { trace!("request: query_send {}, {} bytes", id, bytes.len()); query_send.send((id, prio, bytes))?; - let resp = resp_recv.await?; + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + let resp = resp_recv + .with_context(Context::current_with_span(span)) + .await?; + } else { + let resp = resp_recv.await?; + } + } + if resp.is_empty() { return Err(Error::Message( "Response is 0 bytes, either a collision or a protocol error".into(), diff --git a/src/server.rs b/src/server.rs index 937d65a..8b60e17 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,13 @@ use arc_swap::ArcSwapOption; use bytes::Bytes; use log::{debug, trace}; +#[cfg(feature = "telemetry")] +use opentelemetry::{ + trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, + Context, KeyValue, +}; +#[cfg(feature = "telemetry")] +use opentelemetry_contrib::trace::propagator::binary::*; #[cfg(feature = "telemetry")] use rand::{thread_rng, Rng}; @@ -122,9 +129,9 @@ impl ServerConn { let path = &bytes[2..2 + path_length]; let path = String::from_utf8(path.to_vec())?; - let trace_id_len = bytes[2 + path_length] as usize; + let telemetry_id_len = bytes[2 + path_length] as usize; - let data = &bytes[3 + path_length + trace_id_len..]; + let data = &bytes[3 + path_length + telemetry_id_len..]; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); @@ -134,28 +141,26 @@ impl ServerConn { if let Some(handler) = handler_opt { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - use opentelemetry::{ - KeyValue, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, trace::TraceId - }; - let trace_id = if trace_id_len == 16 { - let mut by = [0u8; 16]; - by.copy_from_slice(&bytes[3+path_length..19+path_length]); - TraceId::from_bytes(by) + let tracer = opentelemetry::global::tracer("netapp"); + + let mut span = if telemetry_id_len > 0 { + let by = bytes[3+path_length..3+path_length+telemetry_id_len].to_vec(); + let propagator = BinaryPropagator::new(); + let context = propagator.from_bytes(by); + let context = Context::new().with_remote_span_context(context); + tracer.span_builder(format!(">> RPC {}", path)) + .with_kind(SpanKind::Server) + .start_with_context(&tracer, &context) } else { let mut rng = thread_rng(); - TraceId::from_bytes(rng.gen()) + let trace_id = TraceId::from_bytes(rng.gen()); + tracer + .span_builder(format!(">> RPC {}", path)) + .with_kind(SpanKind::Server) + .with_trace_id(trace_id) + .start(&tracer) }; - - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("RPC handler {}", path)) - .with_trace_id(trace_id) - .with_attributes(vec![ - KeyValue::new("path", path), - ]) - .start(&tracer); + span.set_attribute(KeyValue::new("path", path.to_string())); handler.handle(data, self.peer_id) .with_context(Context::current_with_span(span)) -- cgit v1.2.3