From 2cab84b1fe423a41b356211e592a614c95ec4e0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 16 Feb 2022 14:23:04 +0100 Subject: Add many metrics in table/ and rpc/ --- src/table/sync.rs | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index 1df2b01d..08069ad0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use futures::select; use futures_util::future::*; use futures_util::stream::*; +use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -312,6 +313,16 @@ where ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); + for to in nodes.iter() { + self.data.metrics.sync_items_sent.add( + values.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new("to", format!("{:?}", to)), + ], + ); + } + self.system .rpc .try_call_many( @@ -500,6 +511,14 @@ where .map(|x| Arc::new(ByteBuf::from(x))) .collect::>(); + self.data.metrics.sync_items_sent.add( + values.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new("to", format!("{:?}", who)), + ], + ); + let rpc_resp = self .system .rpc @@ -527,7 +546,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - async fn handle(self: &Arc, message: &SyncRpc, _from: NodeID) -> Result { + async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; @@ -539,6 +558,17 @@ where Ok(SyncRpc::Node(k.clone(), node)) } SyncRpc::Items(items) => { + self.data.metrics.sync_items_received.add( + items.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new( + "from", + format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()), + ), + ], + ); + self.data.update_many(items)?; Ok(SyncRpc::Ok) } -- cgit v1.2.3