diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 1df2b01d..08069ad0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use futures::select; use futures_util::future::*; use futures_util::stream::*; +use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -312,6 +313,16 @@ where ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); + for to in nodes.iter() { + self.data.metrics.sync_items_sent.add( + values.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new("to", format!("{:?}", to)), + ], + ); + } + self.system .rpc .try_call_many( @@ -500,6 +511,14 @@ where .map(|x| Arc::new(ByteBuf::from(x))) .collect::<Vec<_>>(); + self.data.metrics.sync_items_sent.add( + values.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new("to", format!("{:?}", who)), + ], + ); + let rpc_resp = self .system .rpc @@ -527,7 +546,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> { + async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; @@ -539,6 +558,17 @@ where Ok(SyncRpc::Node(k.clone(), node)) } SyncRpc::Items(items) => { + self.data.metrics.sync_items_received.add( + items.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new( + "from", + format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()), + ), + ], + ); + self.data.update_many(items)?; Ok(SyncRpc::Ok) } |