aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-18 20:10:46 +0100
committerAlex Auvolat <alex@adnab.me>2022-02-18 20:10:46 +0100
commitfb6b4dc9a95f4268775e674323a2b84c7b4ae7f0 (patch)
tree9ea5762b36d81c87bf0abdad1113cebf3d75a4c4 /src/client.rs
parentab0f7785ae73e2f5aaf912fc3c0f2cd724967546 (diff)
downloadnetapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.tar.gz
netapp-fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0.zip
Correct implementation of distributed tracing
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs39
1 files changed, 26 insertions, 13 deletions
diff --git a/src/client.rs b/src/client.rs
index 7ef772d..e08b30b 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -11,6 +11,14 @@ use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::compat::*;
+#[cfg(feature = "telemetry")]
+use opentelemetry::{
+ trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
+ Context,
+};
+#[cfg(feature = "telemetry")]
+use opentelemetry_contrib::trace::propagator::binary::*;
+
use futures::io::AsyncReadExt;
use async_trait::async_trait;
@@ -127,18 +135,14 @@ impl ClientConn {
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
- use opentelemetry::{trace::{TraceId, TraceContextExt}, Context};
- let trace_id_int = Context::current()
- .span()
- .span_context()
- .trace_id();
- let trace_id = if trace_id_int == TraceId::INVALID {
- None
- } else {
- Some(trace_id_int.to_bytes().to_vec())
- };
+ let tracer = opentelemetry::global::tracer("netapp");
+ let span = tracer.span_builder(format!("RPC >> {}", path))
+ .with_kind(SpanKind::Server)
+ .start(&tracer);
+ let propagator = BinaryPropagator::new();
+ let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec());
} else {
- let trace_id: Option<Vec<u8>> = None;
+ let telemetry_id: Option<Vec<u8>> = None;
}
};
@@ -148,7 +152,7 @@ impl ClientConn {
bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]);
bytes.extend_from_slice(path.as_bytes());
- if let Some(by) = trace_id {
+ if let Some(by) = telemetry_id {
bytes.push(by.len() as u8);
bytes.extend(by);
} else {
@@ -172,7 +176,16 @@ impl ClientConn {
trace!("request: query_send {}, {} bytes", id, bytes.len());
query_send.send((id, prio, bytes))?;
- let resp = resp_recv.await?;
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ let resp = resp_recv
+ .with_context(Context::current_with_span(span))
+ .await?;
+ } else {
+ let resp = resp_recv.await?;
+ }
+ }
+
if resp.is_empty() {
return Err(Error::Message(
"Response is 0 bytes, either a collision or a protocol error".into(),