diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 4 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 11 | ||||
-rw-r--r-- | src/rpc/system.rs | 16 |
3 files changed, 13 insertions, 18 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 00f609c9..3ab9e7da 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -46,6 +46,8 @@ tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.1" +#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } +netapp = "0.4" + hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 97716b18..4b4235f1 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -152,14 +152,8 @@ impl RpcHelper { self.0.metrics.rpc_counter.add(1, &metric_tags); let rpc_start_time = SystemTime::now(); - let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {}", endpoint.path())); - span.set_attribute(KeyValue::new("to", format!("{:?}", to))); - let node_id = to.into(); - let rpc_call = endpoint - .call(&node_id, &msg, strat.rs_priority) - .with_context(Context::current_with_span(span)); + let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority); select! { res = rpc_call => { @@ -246,7 +240,7 @@ impl RpcHelper { let quorum = strategy.rs_quorum.unwrap_or(to.len()); let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {} to {:?}", endpoint.path(), to)); + let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len())); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); @@ -329,7 +323,6 @@ impl RpcHelper { // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { if let Some((_, _, _, req_to, fut)) = requests.next() { - let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( fut.with_context(Context::current_with_span(span)), diff --git a/src/rpc/system.rs b/src/rpc/system.rs index c8fc0ad5..2123a37f 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -38,6 +38,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); +/// Version tag used for version check upon Netapp connection +pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650006; // garage 0x0006 + /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; @@ -194,7 +197,10 @@ impl System { ) -> Arc<Self> { let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); - info!("Node public key: {}", hex::encode(&node_key.public_key())); + info!( + "Node ID of this node: {}", + hex::encode(&node_key.public_key()[..8]) + ); let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); @@ -222,13 +228,7 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); - if let Some(addr) = config.rpc_public_addr { - println!("{}@{}", hex::encode(&node_key.public_key()), addr); - } else { - println!("{}", hex::encode(&node_key.public_key())); - } - - let netapp = NetApp::new(network_key, node_key); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new( netapp.clone(), config.bootstrap_peers.clone(), |