diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 41 |
1 files changed, 39 insertions, 2 deletions
diff --git a/src/server.rs b/src/server.rs index f23b810..937d65a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,9 @@ use arc_swap::ArcSwapOption; use bytes::Bytes; use log::{debug, trace}; +#[cfg(feature = "telemetry")] +use rand::{thread_rng, Rng}; + use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -118,7 +121,10 @@ impl ServerConn { let path = &bytes[2..2 + path_length]; let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; + + let trace_id_len = bytes[2 + path_length] as usize; + + let data = &bytes[3 + path_length + trace_id_len..]; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); @@ -126,7 +132,38 @@ impl ServerConn { }; if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + use opentelemetry::{ + KeyValue, + trace::{FutureExt, TraceContextExt, Tracer}, + Context, trace::TraceId + }; + let trace_id = if trace_id_len == 16 { + let mut by = [0u8; 16]; + by.copy_from_slice(&bytes[3+path_length..19+path_length]); + TraceId::from_bytes(by) + } else { + let mut rng = thread_rng(); + TraceId::from_bytes(rng.gen()) + }; + + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("RPC handler {}", path)) + .with_trace_id(trace_id) + .with_attributes(vec![ + KeyValue::new("path", path), + ]) + .start(&tracer); + + handler.handle(data, self.peer_id) + .with_context(Context::current_with_span(span)) + .await + } else { + handler.handle(data, self.peer_id).await + } + } } else { Err(Error::NoHandler) } |