aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-16 14:23:04 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 10:51:50 +0100
commit2cab84b1fe423a41b356211e592a614c95ec4e0c (patch)
treec7dc3227feccbd6f4a8aba0bf8025201f3acc229 /src/table/sync.rs
parent1e2cf26373ef1812a3152a0057774f6381e66914 (diff)
downloadgarage-2cab84b1fe423a41b356211e592a614c95ec4e0c.tar.gz
garage-2cab84b1fe423a41b356211e592a614c95ec4e0c.zip
Add many metrics in table/ and rpc/
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs32
1 files changed, 31 insertions, 1 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 1df2b01d..08069ad0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,6 +6,7 @@ use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
+use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -312,6 +313,16 @@ where
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+ for to in nodes.iter() {
+ self.data.metrics.sync_items_sent.add(
+ values.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new("to", format!("{:?}", to)),
+ ],
+ );
+ }
+
self.system
.rpc
.try_call_many(
@@ -500,6 +511,14 @@ where
.map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>();
+ self.data.metrics.sync_items_sent.add(
+ values.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new("to", format!("{:?}", who)),
+ ],
+ );
+
let rpc_resp = self
.system
.rpc
@@ -527,7 +546,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@@ -539,6 +558,17 @@ where
Ok(SyncRpc::Node(k.clone(), node))
}
SyncRpc::Items(items) => {
+ self.data.metrics.sync_items_received.add(
+ items.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new(
+ "from",
+ format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()),
+ ),
+ ],
+ );
+
self.data.update_many(items)?;
Ok(SyncRpc::Ok)
}