aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs32
1 files changed, 31 insertions, 1 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 1df2b01d..08069ad0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,6 +6,7 @@ use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
+use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -312,6 +313,16 @@ where
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+ for to in nodes.iter() {
+ self.data.metrics.sync_items_sent.add(
+ values.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new("to", format!("{:?}", to)),
+ ],
+ );
+ }
+
self.system
.rpc
.try_call_many(
@@ -500,6 +511,14 @@ where
.map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>();
+ self.data.metrics.sync_items_sent.add(
+ values.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new("to", format!("{:?}", who)),
+ ],
+ );
+
let rpc_resp = self
.system
.rpc
@@ -527,7 +546,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@@ -539,6 +558,17 @@ where
Ok(SyncRpc::Node(k.clone(), node))
}
SyncRpc::Items(items) => {
+ self.data.metrics.sync_items_received.add(
+ items.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new(
+ "from",
+ format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()),
+ ),
+ ],
+ );
+
self.data.update_many(items)?;
Ok(SyncRpc::Ok)
}