aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-18 19:01:23 +0100
committerAlex Auvolat <alex@adnab.me>2022-02-18 19:01:59 +0100
commitab0f7785ae73e2f5aaf912fc3c0f2cd724967546 (patch)
treeaa5b6d95dd7ccb4560e02352a8c980d077edfed1 /src/server.rs
parentdc0b5c0305284a1f33417e184c1e4d0ef6e7c032 (diff)
downloadnetapp-ab0f7785ae73e2f5aaf912fc3c0f2cd724967546.tar.gz
netapp-ab0f7785ae73e2f5aaf912fc3c0f2cd724967546.zip
Add telemetry
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs41
1 files changed, 39 insertions, 2 deletions
diff --git a/src/server.rs b/src/server.rs
index f23b810..937d65a 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -5,6 +5,9 @@ use arc_swap::ArcSwapOption;
use bytes::Bytes;
use log::{debug, trace};
+#[cfg(feature = "telemetry")]
+use rand::{thread_rng, Rng};
+
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -118,7 +121,10 @@ impl ServerConn {
let path = &bytes[2..2 + path_length];
let path = String::from_utf8(path.to_vec())?;
- let data = &bytes[2 + path_length..];
+
+ let trace_id_len = bytes[2 + path_length] as usize;
+
+ let data = &bytes[3 + path_length + trace_id_len..];
let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap();
@@ -126,7 +132,38 @@ impl ServerConn {
};
if let Some(handler) = handler_opt {
- handler.handle(data, self.peer_id).await
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ use opentelemetry::{
+ KeyValue,
+ trace::{FutureExt, TraceContextExt, Tracer},
+ Context, trace::TraceId
+ };
+ let trace_id = if trace_id_len == 16 {
+ let mut by = [0u8; 16];
+ by.copy_from_slice(&bytes[3+path_length..19+path_length]);
+ TraceId::from_bytes(by)
+ } else {
+ let mut rng = thread_rng();
+ TraceId::from_bytes(rng.gen())
+ };
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let span = tracer
+ .span_builder(format!("RPC handler {}", path))
+ .with_trace_id(trace_id)
+ .with_attributes(vec![
+ KeyValue::new("path", path),
+ ])
+ .start(&tracer);
+
+ handler.handle(data, self.peer_id)
+ .with_context(Context::current_with_span(span))
+ .await
+ } else {
+ handler.handle(data, self.peer_id).await
+ }
+ }
} else {
Err(Error::NoHandler)
}