aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/rpc/rpc_helper.rs11
-rw-r--r--src/rpc/system.rs16
3 files changed, 13 insertions, 18 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 00f609c9..3ab9e7da 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -46,6 +46,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-netapp = "0.3.1"
+#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
+netapp = "0.4"
+
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 97716b18..4b4235f1 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -152,14 +152,8 @@ impl RpcHelper {
self.0.metrics.rpc_counter.add(1, &metric_tags);
let rpc_start_time = SystemTime::now();
- let tracer = opentelemetry::global::tracer("garage");
- let mut span = tracer.start(format!("RPC {}", endpoint.path()));
- span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
-
let node_id = to.into();
- let rpc_call = endpoint
- .call(&node_id, &msg, strat.rs_priority)
- .with_context(Context::current_with_span(span));
+ let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority);
select! {
res = rpc_call => {
@@ -246,7 +240,7 @@ impl RpcHelper {
let quorum = strategy.rs_quorum.unwrap_or(to.len());
let tracer = opentelemetry::global::tracer("garage");
- let mut span = tracer.start(format!("RPC {} to {:?}", endpoint.path(), to));
+ let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len()));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
@@ -329,7 +323,6 @@ impl RpcHelper {
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
if let Some((_, _, _, req_to, fut)) = requests.next() {
- let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
fut.with_context(Context::current_with_span(span)),
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index c8fc0ad5..2123a37f 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -38,6 +38,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
+/// Version tag used for version check upon Netapp connection
+pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650006; // garage 0x0006
+
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
@@ -194,7 +197,10 @@ impl System {
) -> Arc<Self> {
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
- info!("Node public key: {}", hex::encode(&node_key.public_key()));
+ info!(
+ "Node ID of this node: {}",
+ hex::encode(&node_key.public_key()[..8])
+ );
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
@@ -222,13 +228,7 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
- if let Some(addr) = config.rpc_public_addr {
- println!("{}@{}", hex::encode(&node_key.public_key()), addr);
- } else {
- println!("{}", hex::encode(&node_key.public_key()));
- }
-
- let netapp = NetApp::new(network_key, node_key);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(),
config.bootstrap_peers.clone(),