aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-18 20:10:46 +0100
committerAlex Auvolat <alex@adnab.me>2022-02-18 20:10:46 +0100
commitfb6b4dc9a95f4268775e674323a2b84c7b4ae7f0 (patch)
tree9ea5762b36d81c87bf0abdad1113cebf3d75a4c4
parentab0f7785ae73e2f5aaf912fc3c0f2cd724967546 (diff)
downloadnetapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.tar.gz
netapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.zip
Correct implementation of distributed tracing
-rw-r--r--Cargo.lock13
-rw-r--r--Cargo.toml5
-rw-r--r--src/client.rs39
-rw-r--r--src/server.rs47
4 files changed, 67 insertions, 37 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 36facc8..3b7f2d0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -433,7 +433,7 @@ dependencies = [
[[package]]
name = "netapp"
-version = "0.3.2"
+version = "0.4.0"
dependencies = [
"arc-swap",
"async-trait",
@@ -449,6 +449,7 @@ dependencies = [
"log",
"lru",
"opentelemetry",
+ "opentelemetry-contrib",
"rand 0.5.6",
"rmp-serde",
"serde",
@@ -522,6 +523,16 @@ dependencies = [
]
[[package]]
+name = "opentelemetry-contrib"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85637add8f60bb4cac673469c14f47a329c6cec7365c72d72cd32f2d104a721a"
+dependencies = [
+ "lazy_static",
+ "opentelemetry",
+]
+
+[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 4b6db48..8eb32db 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "netapp"
-version = "0.3.2"
+version = "0.4.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "LICENSE"
@@ -17,7 +17,7 @@ name = "netapp"
[features]
default = []
basalt = ["lru", "rand"]
-telemetry = ["opentelemetry", "rand"]
+telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"]
[dependencies]
futures = "0.3.17"
@@ -43,6 +43,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }
opentelemetry = { version = "0.17", optional = true }
+opentelemetry-contrib = { version = "0.9", optional = true }
[dev-dependencies]
env_logger = "0.8"
diff --git a/src/client.rs b/src/client.rs
index 7ef772d..e08b30b 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -11,6 +11,14 @@ use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::compat::*;
+#[cfg(feature = "telemetry")]
+use opentelemetry::{
+ trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
+ Context,
+};
+#[cfg(feature = "telemetry")]
+use opentelemetry_contrib::trace::propagator::binary::*;
+
use futures::io::AsyncReadExt;
use async_trait::async_trait;
@@ -127,18 +135,14 @@ impl ClientConn {
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
- use opentelemetry::{trace::{TraceId, TraceContextExt}, Context};
- let trace_id_int = Context::current()
- .span()
- .span_context()
- .trace_id();
- let trace_id = if trace_id_int == TraceId::INVALID {
- None
- } else {
- Some(trace_id_int.to_bytes().to_vec())
- };
+ let tracer = opentelemetry::global::tracer("netapp");
+ let span = tracer.span_builder(format!("RPC >> {}", path))
+ .with_kind(SpanKind::Server)
+ .start(&tracer);
+ let propagator = BinaryPropagator::new();
+ let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec());
} else {
- let trace_id: Option<Vec<u8>> = None;
+ let telemetry_id: Option<Vec<u8>> = None;
}
};
@@ -148,7 +152,7 @@ impl ClientConn {
bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]);
bytes.extend_from_slice(path.as_bytes());
- if let Some(by) = trace_id {
+ if let Some(by) = telemetry_id {
bytes.push(by.len() as u8);
bytes.extend(by);
} else {
@@ -172,7 +176,16 @@ impl ClientConn {
trace!("request: query_send {}, {} bytes", id, bytes.len());
query_send.send((id, prio, bytes))?;
- let resp = resp_recv.await?;
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ let resp = resp_recv
+ .with_context(Context::current_with_span(span))
+ .await?;
+ } else {
+ let resp = resp_recv.await?;
+ }
+ }
+
if resp.is_empty() {
return Err(Error::Message(
"Response is 0 bytes, either a collision or a protocol error".into(),
diff --git a/src/server.rs b/src/server.rs
index 937d65a..8b60e17 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -6,6 +6,13 @@ use bytes::Bytes;
use log::{debug, trace};
#[cfg(feature = "telemetry")]
+use opentelemetry::{
+ trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer},
+ Context, KeyValue,
+};
+#[cfg(feature = "telemetry")]
+use opentelemetry_contrib::trace::propagator::binary::*;
+#[cfg(feature = "telemetry")]
use rand::{thread_rng, Rng};
use tokio::net::TcpStream;
@@ -122,9 +129,9 @@ impl ServerConn {
let path = &bytes[2..2 + path_length];
let path = String::from_utf8(path.to_vec())?;
- let trace_id_len = bytes[2 + path_length] as usize;
+ let telemetry_id_len = bytes[2 + path_length] as usize;
- let data = &bytes[3 + path_length + trace_id_len..];
+ let data = &bytes[3 + path_length + telemetry_id_len..];
let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap();
@@ -134,28 +141,26 @@ impl ServerConn {
if let Some(handler) = handler_opt {
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
- use opentelemetry::{
- KeyValue,
- trace::{FutureExt, TraceContextExt, Tracer},
- Context, trace::TraceId
- };
- let trace_id = if trace_id_len == 16 {
- let mut by = [0u8; 16];
- by.copy_from_slice(&bytes[3+path_length..19+path_length]);
- TraceId::from_bytes(by)
+ let tracer = opentelemetry::global::tracer("netapp");
+
+ let mut span = if telemetry_id_len > 0 {
+ let by = bytes[3+path_length..3+path_length+telemetry_id_len].to_vec();
+ let propagator = BinaryPropagator::new();
+ let context = propagator.from_bytes(by);
+ let context = Context::new().with_remote_span_context(context);
+ tracer.span_builder(format!(">> RPC {}", path))
+ .with_kind(SpanKind::Server)
+ .start_with_context(&tracer, &context)
} else {
let mut rng = thread_rng();
- TraceId::from_bytes(rng.gen())
+ let trace_id = TraceId::from_bytes(rng.gen());
+ tracer
+ .span_builder(format!(">> RPC {}", path))
+ .with_kind(SpanKind::Server)
+ .with_trace_id(trace_id)
+ .start(&tracer)
};
-
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer
- .span_builder(format!("RPC handler {}", path))
- .with_trace_id(trace_id)
- .with_attributes(vec![
- KeyValue::new("path", path),
- ])
- .start(&tracer);
+ span.set_attribute(KeyValue::new("path", path.to_string()));
handler.handle(data, self.peer_id)
.with_context(Context::current_with_span(span))