diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-18 20:10:46 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-02-18 20:10:46 +0100 |
commit | fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0 (patch) | |
tree | 9ea5762b36d81c87bf0abdad1113cebf3d75a4c4 /src/server.rs | |
parent | ab0f7785ae73e2f5aaf912fc3c0f2cd724967546 (diff) | |
download | netapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.tar.gz netapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.zip |
Correct implementation of distributed tracing
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/src/server.rs b/src/server.rs index 937d65a..8b60e17 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,6 +6,13 @@ use bytes::Bytes; use log::{debug, trace}; #[cfg(feature = "telemetry")] +use opentelemetry::{ + trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, + Context, KeyValue, +}; +#[cfg(feature = "telemetry")] +use opentelemetry_contrib::trace::propagator::binary::*; +#[cfg(feature = "telemetry")] use rand::{thread_rng, Rng}; use tokio::net::TcpStream; @@ -122,9 +129,9 @@ impl ServerConn { let path = &bytes[2..2 + path_length]; let path = String::from_utf8(path.to_vec())?; - let trace_id_len = bytes[2 + path_length] as usize; + let telemetry_id_len = bytes[2 + path_length] as usize; - let data = &bytes[3 + path_length + trace_id_len..]; + let data = &bytes[3 + path_length + telemetry_id_len..]; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); @@ -134,28 +141,26 @@ impl ServerConn { if let Some(handler) = handler_opt { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - use opentelemetry::{ - KeyValue, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, trace::TraceId - }; - let trace_id = if trace_id_len == 16 { - let mut by = [0u8; 16]; - by.copy_from_slice(&bytes[3+path_length..19+path_length]); - TraceId::from_bytes(by) + let tracer = opentelemetry::global::tracer("netapp"); + + let mut span = if telemetry_id_len > 0 { + let by = bytes[3+path_length..3+path_length+telemetry_id_len].to_vec(); + let propagator = BinaryPropagator::new(); + let context = propagator.from_bytes(by); + let context = Context::new().with_remote_span_context(context); + tracer.span_builder(format!(">> RPC {}", path)) + .with_kind(SpanKind::Server) + .start_with_context(&tracer, &context) } else { let mut rng = thread_rng(); - TraceId::from_bytes(rng.gen()) + let trace_id = TraceId::from_bytes(rng.gen()); + tracer + .span_builder(format!(">> RPC {}", path)) + .with_kind(SpanKind::Server) + .with_trace_id(trace_id) + .start(&tracer) }; - - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("RPC handler {}", path)) - .with_trace_id(trace_id) - .with_attributes(vec![ - KeyValue::new("path", path), - ]) - .start(&tracer); + span.set_attribute(KeyValue::new("path", path.to_string())); handler.handle(data, self.peer_id) .with_context(Context::current_with_span(span)) |