diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 6 | ||||
-rw-r--r-- | src/table/metrics.rs | 75 | ||||
-rw-r--r-- | src/table/sync.rs | 32 | ||||
-rw-r--r-- | src/table/table.rs | 34 |
4 files changed, 139 insertions, 8 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index a5209c26..684afdcd 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -54,7 +54,7 @@ where .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone()); + let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); Arc::new(Self { system, @@ -171,6 +171,8 @@ where })?; if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + self.metrics.internal_update_counter.add(1); + let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); self.merkle_todo_notify.notify_one(); @@ -205,6 +207,8 @@ where })?; if removed { + self.metrics.internal_delete_counter.add(1); + let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); self.merkle_todo_notify.notify_one(); diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 38e93904..548bf0d6 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -2,15 +2,27 @@ use opentelemetry::{global, metrics::*, KeyValue}; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { - merkle_updater_todo_queue_length: ValueObserver<u64>, + pub(crate) _merkle_todo_len: ValueObserver<u64>, + pub(crate) _gc_todo_len: ValueObserver<u64>, + + pub(crate) get_request_counter: BoundCounter<u64>, + pub(crate) get_request_duration: BoundValueRecorder<f64>, + pub(crate) put_request_counter: BoundCounter<u64>, + pub(crate) put_request_duration: BoundValueRecorder<f64>, + + pub(crate) internal_update_counter: BoundCounter<u64>, + pub(crate) internal_delete_counter: BoundCounter<u64>, + + pub(crate) sync_items_sent: Counter<u64>, + pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new(table_name: &'static str, merkle_todo: sled::Tree) -> Self { + pub fn new(table_name: &'static str, merkle_todo: sled::Tree, gc_todo: sled::Tree) -> Self { let meter = global::meter(table_name); TableMetrics { - merkle_updater_todo_queue_length: meter + _merkle_todo_len: meter .u64_value_observer( - format!("merkle_updater_todo_queue_length"), + "table.merkle_updater_todo_queue_length", move |observer| { observer.observe( merkle_todo.len() as u64, @@ -18,7 +30,60 @@ impl TableMetrics { ) }, ) - .with_description("Bucket merkle updater TODO queue length") + .with_description("Merkle tree updater TODO queue length") + .init(), + _gc_todo_len: meter + .u64_value_observer( + "table.gc_todo_queue_length", + move |observer| { + observer.observe( + gc_todo.len() as u64, + &[KeyValue::new("table_name", table_name)], + ) + }, + ) + .with_description("Table garbage collector TODO queue length") + .init(), + + get_request_counter: meter + .u64_counter("table.get_request_counter") + .with_description("Number of get/get_range requests internally made on this table") + .init() + .bind(&[KeyValue::new("table_name", table_name)]), + get_request_duration: meter + .f64_value_recorder("table.get_request_duration") + .with_description("Duration of get/get_range requests internally made on this table, in seconds") + .init() + .bind(&[KeyValue::new("table_name", table_name)]), + put_request_counter: meter + .u64_counter("table.put_request_counter") + .with_description("Number of insert/insert_many requests internally made on this table") + .init() + .bind(&[KeyValue::new("table_name", table_name)]), + put_request_duration: meter + .f64_value_recorder("table.put_request_duration") + .with_description("Duration of insert/insert_many requests internally made on this table, in seconds") + .init() + .bind(&[KeyValue::new("table_name", table_name)]), + + internal_update_counter: meter + .u64_counter("table.internal_update_counter") + .with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)") + .init() + .bind(&[KeyValue::new("table_name", table_name)]), + internal_delete_counter: meter + .u64_counter("table.internal_delete_counter") + .with_description("Number of value deletions in the tree (due to GC or repartitioning)") + .init() + .bind(&[KeyValue::new("table_name", table_name)]), + + sync_items_sent: meter + .u64_counter("table.sync_items_sent") + .with_description("Number of data items sent to other nodes during resync procedures") + .init(), + sync_items_received: meter + .u64_counter("table.sync_items_received") + .with_description("Number of data items received from other nodes during resync procedures") .init(), } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 1df2b01d..08069ad0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use futures::select; use futures_util::future::*; use futures_util::stream::*; +use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -312,6 +313,16 @@ where ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); + for to in nodes.iter() { + self.data.metrics.sync_items_sent.add( + values.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new("to", format!("{:?}", to)), + ], + ); + } + self.system .rpc .try_call_many( @@ -500,6 +511,14 @@ where .map(|x| Arc::new(ByteBuf::from(x))) .collect::<Vec<_>>(); + self.data.metrics.sync_items_sent.add( + values.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new("to", format!("{:?}", who)), + ], + ); + let rpc_resp = self .system .rpc @@ -527,7 +546,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> { + async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; @@ -539,6 +558,17 @@ where Ok(SyncRpc::Node(k.clone(), node)) } SyncRpc::Items(items) => { + self.data.metrics.sync_items_received.add( + items.len() as u64, + &[ + KeyValue::new("table_name", F::TABLE_NAME), + KeyValue::new( + "from", + format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()), + ), + ], + ); + self.data.update_many(items)?; Ok(SyncRpc::Ok) } diff --git a/src/table/table.rs b/src/table/table.rs index 01789c11..3ac3bc5b 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use async_trait::async_trait; use futures::stream::*; @@ -81,6 +81,8 @@ where } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let request_start = SystemTime::now(); + let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); @@ -99,10 +101,18 @@ where .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; + + self.data.metrics.put_request_counter.add(1); + self.data + .metrics + .put_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let request_start = SystemTime::now(); + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { @@ -140,6 +150,12 @@ where if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { + self.data.metrics.put_request_counter.add(1); + self.data + .metrics + .put_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + Ok(()) } } @@ -149,6 +165,8 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result<Option<F::E>, Error> { + let request_start = SystemTime::now(); + let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); @@ -198,6 +216,12 @@ where .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } } + + self.data.metrics.get_request_counter.add(1); + self.data + .metrics + .get_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret) } @@ -208,6 +232,8 @@ where filter: Option<F::Filter>, limit: usize, ) -> Result<Vec<F::E>, Error> { + let request_start = SystemTime::now(); + let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); @@ -265,6 +291,12 @@ where .take(limit) .map(|(_k, v)| v.take().unwrap()) .collect::<Vec<_>>(); + + self.data.metrics.get_request_counter.add(1); + self.data + .metrics + .get_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret_vec) } |