aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs34
1 files changed, 33 insertions, 1 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 01789c11..3ac3bc5b 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use futures::stream::*;
@@ -81,6 +81,8 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
+ let request_start = SystemTime::now();
+
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@@ -99,10 +101,18 @@ where
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
+
+ self.data.metrics.put_request_counter.add(1);
+ self.data
+ .metrics
+ .put_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ let request_start = SystemTime::now();
+
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
@@ -140,6 +150,12 @@ where
if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
+ self.data.metrics.put_request_counter.add(1);
+ self.data
+ .metrics
+ .put_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
Ok(())
}
}
@@ -149,6 +165,8 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
+ let request_start = SystemTime::now();
+
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
@@ -198,6 +216,12 @@ where
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
+
+ self.data.metrics.get_request_counter.add(1);
+ self.data
+ .metrics
+ .get_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret)
}
@@ -208,6 +232,8 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
+ let request_start = SystemTime::now();
+
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
@@ -265,6 +291,12 @@ where
.take(limit)
.map(|(_k, v)| v.take().unwrap())
.collect::<Vec<_>>();
+
+ self.data.metrics.get_request_counter.add(1);
+ self.data
+ .metrics
+ .get_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret_vec)
}