aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-18 19:01:23 +0100
committerAlex Auvolat <alex@adnab.me>2022-02-18 19:01:59 +0100
commitab0f7785ae73e2f5aaf912fc3c0f2cd724967546 (patch)
treeaa5b6d95dd7ccb4560e02352a8c980d077edfed1
parentdc0b5c0305284a1f33417e184c1e4d0ef6e7c032 (diff)
downloadnetapp-ab0f7785ae73e2f5aaf912fc3c0f2cd724967546.tar.gz
netapp-ab0f7785ae73e2f5aaf912fc3c0f2cd724967546.zip
Add telemetry
-rw-r--r--Cargo.lock176
-rw-r--r--Cargo.toml6
-rw-r--r--src/client.rs31
-rw-r--r--src/server.rs41
4 files changed, 248 insertions, 6 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c5bf7ba..36facc8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -63,6 +63,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
+name = "bumpalo"
+version = "3.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899"
+
+[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -126,6 +132,26 @@ dependencies = [
]
[[package]]
+name = "crossbeam-channel"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
+dependencies = [
+ "cfg-if",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6"
+dependencies = [
+ "cfg-if",
+ "lazy_static",
+]
+
+[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -303,6 +329,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
+name = "js-sys"
+version = "0.3.56"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04"
+dependencies = [
+ "wasm-bindgen",
+]
+
+[[package]]
name = "kuska-handshake"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -398,11 +433,12 @@ dependencies = [
[[package]]
name = "netapp"
-version = "0.3.1"
+version = "0.3.2"
dependencies = [
"arc-swap",
"async-trait",
"bytes 0.6.0",
+ "cfg-if",
"chrono",
"env_logger",
"err-derive",
@@ -412,7 +448,8 @@ dependencies = [
"kuska-sodiumoxide",
"log",
"lru",
- "rand",
+ "opentelemetry",
+ "rand 0.5.6",
"rmp-serde",
"serde",
"structopt",
@@ -466,6 +503,51 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
+name = "opentelemetry"
+version = "0.17.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
+dependencies = [
+ "async-trait",
+ "crossbeam-channel",
+ "futures-channel",
+ "futures-executor",
+ "futures-util",
+ "js-sys",
+ "lazy_static",
+ "percent-encoding",
+ "pin-project",
+ "rand 0.8.5",
+ "thiserror",
+]
+
+[[package]]
+name = "percent-encoding"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
+
+[[package]]
+name = "pin-project"
+version = "1.0.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.0.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "pin-project-lite"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -484,6 +566,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
+name = "ppv-lite86"
+version = "0.2.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
+
+[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -551,6 +639,27 @@ dependencies = [
]
[[package]]
+name = "rand"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
+dependencies = [
+ "libc",
+ "rand_chacha",
+ "rand_core 0.6.3",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
+dependencies = [
+ "ppv-lite86",
+ "rand_core 0.6.3",
+]
+
+[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -566,6 +675,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
+name = "rand_core"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
+dependencies = [
+ "getrandom",
+]
+
+[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -846,6 +964,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
+name = "wasm-bindgen"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06"
+dependencies = [
+ "cfg-if",
+ "wasm-bindgen-macro",
+]
+
+[[package]]
+name = "wasm-bindgen-backend"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca"
+dependencies = [
+ "bumpalo",
+ "lazy_static",
+ "log",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-macro"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01"
+dependencies = [
+ "quote",
+ "wasm-bindgen-macro-support",
+]
+
+[[package]]
+name = "wasm-bindgen-macro-support"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-backend",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-shared"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2"
+
+[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 3621309..4b6db48 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "netapp"
-version = "0.3.1"
+version = "0.3.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "LICENSE"
@@ -17,6 +17,7 @@ name = "netapp"
[features]
default = []
basalt = ["lru", "rand"]
+telemetry = ["opentelemetry", "rand"]
[dependencies]
futures = "0.3.17"
@@ -36,10 +37,13 @@ async-trait = "0.1.7"
err-derive = "0.2.3"
bytes = "0.6.0"
lru = { version = "0.6", optional = true }
+cfg-if = "1.0"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }
+opentelemetry = { version = "0.17", optional = true }
+
[dev-dependencies]
env_logger = "0.8"
structopt = { version = "0.3", default-features = false }
diff --git a/src/client.rs b/src/client.rs
index ca1bcf9..7ef772d 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -125,10 +125,39 @@ impl ClientConn {
.next_query_number
.fetch_add(1, atomic::Ordering::Relaxed);
- let mut bytes = vec![prio, path.as_bytes().len() as u8];
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ use opentelemetry::{trace::{TraceId, TraceContextExt}, Context};
+ let trace_id_int = Context::current()
+ .span()
+ .span_context()
+ .trace_id();
+ let trace_id = if trace_id_int == TraceId::INVALID {
+ None
+ } else {
+ Some(trace_id_int.to_bytes().to_vec())
+ };
+ } else {
+ let trace_id: Option<Vec<u8>> = None;
+ }
+ };
+
+ // Encode request
+ let mut bytes = vec![];
+
+ bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]);
bytes.extend_from_slice(path.as_bytes());
+
+ if let Some(by) = trace_id {
+ bytes.push(by.len() as u8);
+ bytes.extend(by);
+ } else {
+ bytes.push(0);
+ }
+
bytes.extend_from_slice(&rmp_to_vec_all_named(rq)?[..]);
+ // Send request through
let (resp_send, resp_recv) = oneshot::channel();
let old = self.inflight.lock().unwrap().insert(id, resp_send);
if let Some(old_ch) = old {
diff --git a/src/server.rs b/src/server.rs
index f23b810..937d65a 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -5,6 +5,9 @@ use arc_swap::ArcSwapOption;
use bytes::Bytes;
use log::{debug, trace};
+#[cfg(feature = "telemetry")]
+use rand::{thread_rng, Rng};
+
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -118,7 +121,10 @@ impl ServerConn {
let path = &bytes[2..2 + path_length];
let path = String::from_utf8(path.to_vec())?;
- let data = &bytes[2 + path_length..];
+
+ let trace_id_len = bytes[2 + path_length] as usize;
+
+ let data = &bytes[3 + path_length + trace_id_len..];
let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap();
@@ -126,7 +132,38 @@ impl ServerConn {
};
if let Some(handler) = handler_opt {
- handler.handle(data, self.peer_id).await
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ use opentelemetry::{
+ KeyValue,
+ trace::{FutureExt, TraceContextExt, Tracer},
+ Context, trace::TraceId
+ };
+ let trace_id = if trace_id_len == 16 {
+ let mut by = [0u8; 16];
+ by.copy_from_slice(&bytes[3+path_length..19+path_length]);
+ TraceId::from_bytes(by)
+ } else {
+ let mut rng = thread_rng();
+ TraceId::from_bytes(rng.gen())
+ };
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let span = tracer
+ .span_builder(format!("RPC handler {}", path))
+ .with_trace_id(trace_id)
+ .with_attributes(vec![
+ KeyValue::new("path", path),
+ ])
+ .start(&tracer);
+
+ handler.handle(data, self.peer_id)
+ .with_context(Context::current_with_span(span))
+ .await
+ } else {
+ handler.handle(data, self.peer_id).await
+ }
+ }
} else {
Err(Error::NoHandler)
}