diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 34 |
1 files changed, 33 insertions, 1 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 01789c11..3ac3bc5b 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use async_trait::async_trait; use futures::stream::*; @@ -81,6 +81,8 @@ where } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let request_start = SystemTime::now(); + let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); @@ -99,10 +101,18 @@ where .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; + + self.data.metrics.put_request_counter.add(1); + self.data + .metrics + .put_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let request_start = SystemTime::now(); + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { @@ -140,6 +150,12 @@ where if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { + self.data.metrics.put_request_counter.add(1); + self.data + .metrics + .put_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + Ok(()) } } @@ -149,6 +165,8 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result<Option<F::E>, Error> { + let request_start = SystemTime::now(); + let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); @@ -198,6 +216,12 @@ where .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } } + + self.data.metrics.get_request_counter.add(1); + self.data + .metrics + .get_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret) } @@ -208,6 +232,8 @@ where filter: Option<F::Filter>, limit: usize, ) -> Result<Vec<F::E>, Error> { + let request_start = SystemTime::now(); + let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); @@ -265,6 +291,12 @@ where .take(limit) .map(|(_k, v)| v.take().unwrap()) .collect::<Vec<_>>(); + + self.data.metrics.get_request_counter.add(1); + self.data + .metrics + .get_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret_vec) } |