aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-18 20:39:55 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 10:52:30 +0100
commitbb04d94fa92740eebeee513030faf136bfd26da7 (patch)
tree5e353982171819f00f03211855baef078156964b /src
parent8c2fb0c066af7f68fdcfcdec96fa030af059bf63 (diff)
downloadgarage-bb04d94fa92740eebeee513030faf136bfd26da7.tar.gz
garage-bb04d94fa92740eebeee513030faf136bfd26da7.zip
Update to Netapp 0.4 which supports distributed tracing
Diffstat (limited to 'src')
-rw-r--r--src/api/api_server.rs10
-rw-r--r--src/garage/Cargo.toml5
-rw-r--r--src/garage/main.rs4
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/block.rs24
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/rpc/rpc_helper.rs11
-rw-r--r--src/rpc/system.rs16
-rw-r--r--src/util/Cargo.toml3
9 files changed, 51 insertions, 29 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 15b00dde..00d582d1 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -9,7 +9,7 @@ use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
- Context,
+ Context, KeyValue,
};
use garage_util::data::*;
@@ -50,15 +50,19 @@ pub async fn run_api_server(
let garage = garage.clone();
let tracer = opentelemetry::global::tracer("garage");
- let uuid = gen_uuid();
+ let trace_id = gen_uuid();
let span = tracer
.span_builder("S3 API call (unknown)")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(
- &uuid.as_slice()[..16],
+ &trace_id.as_slice()[..16],
))
.unwrap(),
)
+ .with_attributes(vec![
+ KeyValue::new("method", format!("{}", req.method())),
+ KeyValue::new("uri", req.uri().path().to_string()),
+ ])
.start(&tracer);
handler(garage, req, client_addr).with_context(Context::current_with_span(span))
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 351fa62f..ff0666a6 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -50,8 +50,9 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-netapp = "0.3.0"
+#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
+#netapp = { version = "0.4", path = "../../../netapp" }
+netapp = "0.4"
[dev-dependencies]
aws-sdk-s3 = "0.6"
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 7de7740f..08ec912b 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -55,7 +55,7 @@ struct Opt {
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
- std::env::set_var("RUST_LOG", "garage=info")
+ std::env::set_var("RUST_LOG", "netapp=info,garage=info")
}
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
@@ -106,7 +106,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
- let netapp = NetApp::new(network_key, sk);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host {
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 4465af1c..8083445e 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -39,4 +39,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-netapp = "0.3.0"
+#netapp = { version = "0.4", path = "../../../netapp" }
+netapp = "0.4"
diff --git a/src/model/block.rs b/src/model/block.rs
index ddda5e57..3799c6aa 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -14,7 +14,10 @@ use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
-use opentelemetry::KeyValue;
+use opentelemetry::{
+ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
+ Context, KeyValue,
+};
use garage_util::data::*;
use garage_util::error::*;
@@ -554,7 +557,24 @@ impl BlockManager {
let start_time = SystemTime::now();
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
- let res = self.resync_block(&hash).await;
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let trace_id = gen_uuid();
+ let span = tracer
+ .span_builder("Resync block")
+ .with_trace_id(
+ opentelemetry::trace::TraceId::from_hex(&hex::encode(
+ &trace_id.as_slice()[..16],
+ ))
+ .unwrap(),
+ )
+ .with_attributes(vec![KeyValue::new("block", format!("{:?}", hash))])
+ .start(&tracer);
+
+ let res = self
+ .resync_block(&hash)
+ .with_context(Context::current_with_span(span))
+ .await;
self.metrics.resync_counter.add(1);
self.metrics
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 00f609c9..3ab9e7da 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -46,6 +46,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-netapp = "0.3.1"
+#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
+netapp = "0.4"
+
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 97716b18..4b4235f1 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -152,14 +152,8 @@ impl RpcHelper {
self.0.metrics.rpc_counter.add(1, &metric_tags);
let rpc_start_time = SystemTime::now();
- let tracer = opentelemetry::global::tracer("garage");
- let mut span = tracer.start(format!("RPC {}", endpoint.path()));
- span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
-
let node_id = to.into();
- let rpc_call = endpoint
- .call(&node_id, &msg, strat.rs_priority)
- .with_context(Context::current_with_span(span));
+ let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority);
select! {
res = rpc_call => {
@@ -246,7 +240,7 @@ impl RpcHelper {
let quorum = strategy.rs_quorum.unwrap_or(to.len());
let tracer = opentelemetry::global::tracer("garage");
- let mut span = tracer.start(format!("RPC {} to {:?}", endpoint.path(), to));
+ let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len()));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
@@ -329,7 +323,6 @@ impl RpcHelper {
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
if let Some((_, _, _, req_to, fut)) = requests.next() {
- let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
fut.with_context(Context::current_with_span(span)),
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index c8fc0ad5..2123a37f 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -38,6 +38,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
+/// Version tag used for version check upon Netapp connection
+pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650006; // garage 0x0006
+
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
@@ -194,7 +197,10 @@ impl System {
) -> Arc<Self> {
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
- info!("Node public key: {}", hex::encode(&node_key.public_key()));
+ info!(
+ "Node ID of this node: {}",
+ hex::encode(&node_key.public_key()[..8])
+ );
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
@@ -222,13 +228,7 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
- if let Some(addr) = config.rpc_public_addr {
- println!("{}@{}", hex::encode(&node_key.public_key()), addr);
- } else {
- println!("{}", hex::encode(&node_key.public_key()));
- }
-
- let netapp = NetApp::new(network_key, node_key);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(),
config.bootstrap_peers.clone(),
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 3cc2031d..76b73f4b 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -34,7 +34,8 @@ futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-netapp = "0.3.0"
+#netapp = { version = "0.4", path = "../../../netapp" }
+netapp = "0.4"
http = "0.2"
hyper = "0.14"