aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs6
-rw-r--r--src/table/metrics.rs75
-rw-r--r--src/table/sync.rs32
-rw-r--r--src/table/table.rs34
4 files changed, 139 insertions, 8 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index a5209c26..684afdcd 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -54,7 +54,7 @@ where
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
- let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone());
+ let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
Arc::new(Self {
system,
@@ -171,6 +171,8 @@ where
})?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ self.metrics.internal_update_counter.add(1);
+
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
self.merkle_todo_notify.notify_one();
@@ -205,6 +207,8 @@ where
})?;
if removed {
+ self.metrics.internal_delete_counter.add(1);
+
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
self.merkle_todo_notify.notify_one();
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 38e93904..548bf0d6 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -2,15 +2,27 @@ use opentelemetry::{global, metrics::*, KeyValue};
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
- merkle_updater_todo_queue_length: ValueObserver<u64>,
+ pub(crate) _merkle_todo_len: ValueObserver<u64>,
+ pub(crate) _gc_todo_len: ValueObserver<u64>,
+
+ pub(crate) get_request_counter: BoundCounter<u64>,
+ pub(crate) get_request_duration: BoundValueRecorder<f64>,
+ pub(crate) put_request_counter: BoundCounter<u64>,
+ pub(crate) put_request_duration: BoundValueRecorder<f64>,
+
+ pub(crate) internal_update_counter: BoundCounter<u64>,
+ pub(crate) internal_delete_counter: BoundCounter<u64>,
+
+ pub(crate) sync_items_sent: Counter<u64>,
+ pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
- pub fn new(table_name: &'static str, merkle_todo: sled::Tree) -> Self {
+ pub fn new(table_name: &'static str, merkle_todo: sled::Tree, gc_todo: sled::Tree) -> Self {
let meter = global::meter(table_name);
TableMetrics {
- merkle_updater_todo_queue_length: meter
+ _merkle_todo_len: meter
.u64_value_observer(
- format!("merkle_updater_todo_queue_length"),
+ "table.merkle_updater_todo_queue_length",
move |observer| {
observer.observe(
merkle_todo.len() as u64,
@@ -18,7 +30,60 @@ impl TableMetrics {
)
},
)
- .with_description("Bucket merkle updater TODO queue length")
+ .with_description("Merkle tree updater TODO queue length")
+ .init(),
+ _gc_todo_len: meter
+ .u64_value_observer(
+ "table.gc_todo_queue_length",
+ move |observer| {
+ observer.observe(
+ gc_todo.len() as u64,
+ &[KeyValue::new("table_name", table_name)],
+ )
+ },
+ )
+ .with_description("Table garbage collector TODO queue length")
+ .init(),
+
+ get_request_counter: meter
+ .u64_counter("table.get_request_counter")
+ .with_description("Number of get/get_range requests internally made on this table")
+ .init()
+ .bind(&[KeyValue::new("table_name", table_name)]),
+ get_request_duration: meter
+ .f64_value_recorder("table.get_request_duration")
+ .with_description("Duration of get/get_range requests internally made on this table, in seconds")
+ .init()
+ .bind(&[KeyValue::new("table_name", table_name)]),
+ put_request_counter: meter
+ .u64_counter("table.put_request_counter")
+ .with_description("Number of insert/insert_many requests internally made on this table")
+ .init()
+ .bind(&[KeyValue::new("table_name", table_name)]),
+ put_request_duration: meter
+ .f64_value_recorder("table.put_request_duration")
+ .with_description("Duration of insert/insert_many requests internally made on this table, in seconds")
+ .init()
+ .bind(&[KeyValue::new("table_name", table_name)]),
+
+ internal_update_counter: meter
+ .u64_counter("table.internal_update_counter")
+ .with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)")
+ .init()
+ .bind(&[KeyValue::new("table_name", table_name)]),
+ internal_delete_counter: meter
+ .u64_counter("table.internal_delete_counter")
+ .with_description("Number of value deletions in the tree (due to GC or repartitioning)")
+ .init()
+ .bind(&[KeyValue::new("table_name", table_name)]),
+
+ sync_items_sent: meter
+ .u64_counter("table.sync_items_sent")
+ .with_description("Number of data items sent to other nodes during resync procedures")
+ .init(),
+ sync_items_received: meter
+ .u64_counter("table.sync_items_received")
+ .with_description("Number of data items received from other nodes during resync procedures")
.init(),
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 1df2b01d..08069ad0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,6 +6,7 @@ use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
+use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -312,6 +313,16 @@ where
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+ for to in nodes.iter() {
+ self.data.metrics.sync_items_sent.add(
+ values.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new("to", format!("{:?}", to)),
+ ],
+ );
+ }
+
self.system
.rpc
.try_call_many(
@@ -500,6 +511,14 @@ where
.map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>();
+ self.data.metrics.sync_items_sent.add(
+ values.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new("to", format!("{:?}", who)),
+ ],
+ );
+
let rpc_resp = self
.system
.rpc
@@ -527,7 +546,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@@ -539,6 +558,17 @@ where
Ok(SyncRpc::Node(k.clone(), node))
}
SyncRpc::Items(items) => {
+ self.data.metrics.sync_items_received.add(
+ items.len() as u64,
+ &[
+ KeyValue::new("table_name", F::TABLE_NAME),
+ KeyValue::new(
+ "from",
+ format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()),
+ ),
+ ],
+ );
+
self.data.update_many(items)?;
Ok(SyncRpc::Ok)
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 01789c11..3ac3bc5b 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use futures::stream::*;
@@ -81,6 +81,8 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
+ let request_start = SystemTime::now();
+
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@@ -99,10 +101,18 @@ where
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
+
+ self.data.metrics.put_request_counter.add(1);
+ self.data
+ .metrics
+ .put_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ let request_start = SystemTime::now();
+
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
@@ -140,6 +150,12 @@ where
if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
+ self.data.metrics.put_request_counter.add(1);
+ self.data
+ .metrics
+ .put_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
Ok(())
}
}
@@ -149,6 +165,8 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
+ let request_start = SystemTime::now();
+
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
@@ -198,6 +216,12 @@ where
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
+
+ self.data.metrics.get_request_counter.add(1);
+ self.data
+ .metrics
+ .get_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret)
}
@@ -208,6 +232,8 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
+ let request_start = SystemTime::now();
+
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
@@ -265,6 +291,12 @@ where
.take(limit)
.map(|(_k, v)| v.take().unwrap())
.collect::<Vec<_>>();
+
+ self.data.metrics.get_request_counter.add(1);
+ self.data
+ .metrics
+ .get_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret_vec)
}