diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-18 20:39:55 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-14 10:52:30 +0100 |
commit | bb04d94fa92740eebeee513030faf136bfd26da7 (patch) | |
tree | 5e353982171819f00f03211855baef078156964b /src | |
parent | 8c2fb0c066af7f68fdcfcdec96fa030af059bf63 (diff) | |
download | garage-bb04d94fa92740eebeee513030faf136bfd26da7.tar.gz garage-bb04d94fa92740eebeee513030faf136bfd26da7.zip |
Update to Netapp 0.4 which supports distributed tracing
Diffstat (limited to 'src')
-rw-r--r-- | src/api/api_server.rs | 10 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 5 | ||||
-rw-r--r-- | src/garage/main.rs | 4 | ||||
-rw-r--r-- | src/model/Cargo.toml | 3 | ||||
-rw-r--r-- | src/model/block.rs | 24 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 4 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 11 | ||||
-rw-r--r-- | src/rpc/system.rs | 16 | ||||
-rw-r--r-- | src/util/Cargo.toml | 3 |
9 files changed, 51 insertions, 29 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 15b00dde..00d582d1 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -9,7 +9,7 @@ use hyper::{Body, Method, Request, Response, Server}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, - Context, + Context, KeyValue, }; use garage_util::data::*; @@ -50,15 +50,19 @@ pub async fn run_api_server( let garage = garage.clone(); let tracer = opentelemetry::global::tracer("garage"); - let uuid = gen_uuid(); + let trace_id = gen_uuid(); let span = tracer .span_builder("S3 API call (unknown)") .with_trace_id( opentelemetry::trace::TraceId::from_hex(&hex::encode( - &uuid.as_slice()[..16], + &trace_id.as_slice()[..16], )) .unwrap(), ) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().path().to_string()), + ]) .start(&tracer); handler(garage, req, client_addr).with_context(Context::current_with_span(span)) diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 351fa62f..ff0666a6 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -50,8 +50,9 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" } +#netapp = { version = "0.4", path = "../../../netapp" } +netapp = "0.4" [dev-dependencies] aws-sdk-s3 = "0.6" diff --git a/src/garage/main.rs b/src/garage/main.rs index 7de7740f..08ec912b 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -55,7 +55,7 @@ struct Opt { #[tokio::main] async fn main() { if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "garage=info") + std::env::set_var("RUST_LOG", "netapp=info,garage=info") } pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); @@ -106,7 +106,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { // Generate a temporary keypair for our RPC client let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); - let netapp = NetApp::new(network_key, sk); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); // Find and parse the address of the target host let (id, addr) = if let Some(h) = opt.rpc_host { diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 4465af1c..8083445e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -39,4 +39,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +#netapp = { version = "0.4", path = "../../../netapp" } +netapp = "0.4" diff --git a/src/model/block.rs b/src/model/block.rs index ddda5e57..3799c6aa 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -14,7 +14,10 @@ use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use opentelemetry::KeyValue; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, KeyValue, +}; use garage_util::data::*; use garage_util::error::*; @@ -554,7 +557,24 @@ impl BlockManager { let start_time = SystemTime::now(); let hash = Hash::try_from(&hash_bytes[..]).unwrap(); - let res = self.resync_block(&hash).await; + + let tracer = opentelemetry::global::tracer("garage"); + let trace_id = gen_uuid(); + let span = tracer + .span_builder("Resync block") + .with_trace_id( + opentelemetry::trace::TraceId::from_hex(&hex::encode( + &trace_id.as_slice()[..16], + )) + .unwrap(), + ) + .with_attributes(vec![KeyValue::new("block", format!("{:?}", hash))]) + .start(&tracer); + + let res = self + .resync_block(&hash) + .with_context(Context::current_with_span(span)) + .await; self.metrics.resync_counter.add(1); self.metrics diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 00f609c9..3ab9e7da 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -46,6 +46,8 @@ tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.1" +#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } +netapp = "0.4" + hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 97716b18..4b4235f1 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -152,14 +152,8 @@ impl RpcHelper { self.0.metrics.rpc_counter.add(1, &metric_tags); let rpc_start_time = SystemTime::now(); - let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {}", endpoint.path())); - span.set_attribute(KeyValue::new("to", format!("{:?}", to))); - let node_id = to.into(); - let rpc_call = endpoint - .call(&node_id, &msg, strat.rs_priority) - .with_context(Context::current_with_span(span)); + let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority); select! { res = rpc_call => { @@ -246,7 +240,7 @@ impl RpcHelper { let quorum = strategy.rs_quorum.unwrap_or(to.len()); let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {} to {:?}", endpoint.path(), to)); + let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len())); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); @@ -329,7 +323,6 @@ impl RpcHelper { // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { if let Some((_, _, _, req_to, fut)) = requests.next() { - let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( fut.with_context(Context::current_with_span(span)), diff --git a/src/rpc/system.rs b/src/rpc/system.rs index c8fc0ad5..2123a37f 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -38,6 +38,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); +/// Version tag used for version check upon Netapp connection +pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650006; // garage 0x0006 + /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; @@ -194,7 +197,10 @@ impl System { ) -> Arc<Self> { let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); - info!("Node public key: {}", hex::encode(&node_key.public_key())); + info!( + "Node ID of this node: {}", + hex::encode(&node_key.public_key()[..8]) + ); let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); @@ -222,13 +228,7 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); - if let Some(addr) = config.rpc_public_addr { - println!("{}@{}", hex::encode(&node_key.public_key()), addr); - } else { - println!("{}", hex::encode(&node_key.public_key())); - } - - let netapp = NetApp::new(network_key, node_key); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new( netapp.clone(), config.bootstrap_peers.clone(), diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 3cc2031d..76b73f4b 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -34,7 +34,8 @@ futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +#netapp = { version = "0.4", path = "../../../netapp" } +netapp = "0.4" http = "0.2" hyper = "0.14" |