From ab0f7785ae73e2f5aaf912fc3c0f2cd724967546 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 18 Feb 2022 19:01:23 +0100 Subject: Add telemetry --- src/client.rs | 31 ++++++++++++++++++++++++++++++- src/server.rs | 41 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index ca1bcf9..7ef772d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -125,10 +125,39 @@ impl ClientConn { .next_query_number .fetch_add(1, atomic::Ordering::Relaxed); - let mut bytes = vec![prio, path.as_bytes().len() as u8]; + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + use opentelemetry::{trace::{TraceId, TraceContextExt}, Context}; + let trace_id_int = Context::current() + .span() + .span_context() + .trace_id(); + let trace_id = if trace_id_int == TraceId::INVALID { + None + } else { + Some(trace_id_int.to_bytes().to_vec()) + }; + } else { + let trace_id: Option> = None; + } + }; + + // Encode request + let mut bytes = vec![]; + + bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]); bytes.extend_from_slice(path.as_bytes()); + + if let Some(by) = trace_id { + bytes.push(by.len() as u8); + bytes.extend(by); + } else { + bytes.push(0); + } + bytes.extend_from_slice(&rmp_to_vec_all_named(rq)?[..]); + // Send request through let (resp_send, resp_recv) = oneshot::channel(); let old = self.inflight.lock().unwrap().insert(id, resp_send); if let Some(old_ch) = old { diff --git a/src/server.rs b/src/server.rs index f23b810..937d65a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,9 @@ use arc_swap::ArcSwapOption; use bytes::Bytes; use log::{debug, trace}; +#[cfg(feature = "telemetry")] +use rand::{thread_rng, Rng}; + use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -118,7 +121,10 @@ impl ServerConn { let path = &bytes[2..2 + path_length]; let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; + + let trace_id_len = bytes[2 + path_length] as usize; + + let data = &bytes[3 + path_length + trace_id_len..]; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); @@ -126,7 +132,38 @@ impl ServerConn { }; if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + use opentelemetry::{ + KeyValue, + trace::{FutureExt, TraceContextExt, Tracer}, + Context, trace::TraceId + }; + let trace_id = if trace_id_len == 16 { + let mut by = [0u8; 16]; + by.copy_from_slice(&bytes[3+path_length..19+path_length]); + TraceId::from_bytes(by) + } else { + let mut rng = thread_rng(); + TraceId::from_bytes(rng.gen()) + }; + + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("RPC handler {}", path)) + .with_trace_id(trace_id) + .with_attributes(vec![ + KeyValue::new("path", path), + ]) + .start(&tracer); + + handler.handle(data, self.peer_id) + .with_context(Context::current_with_span(span)) + .await + } else { + handler.handle(data, self.peer_id).await + } + } } else { Err(Error::NoHandler) } -- cgit v1.2.3