aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-16 14:23:04 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 10:51:50 +0100
commit2cab84b1fe423a41b356211e592a614c95ec4e0c (patch)
treec7dc3227feccbd6f4a8aba0bf8025201f3acc229 /src/table/table.rs
parent1e2cf26373ef1812a3152a0057774f6381e66914 (diff)
downloadgarage-2cab84b1fe423a41b356211e592a614c95ec4e0c.tar.gz
garage-2cab84b1fe423a41b356211e592a614c95ec4e0c.zip
Add many metrics in table/ and rpc/
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs34
1 files changed, 33 insertions, 1 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 01789c11..3ac3bc5b 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use futures::stream::*;
@@ -81,6 +81,8 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
+ let request_start = SystemTime::now();
+
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@@ -99,10 +101,18 @@ where
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
+
+ self.data.metrics.put_request_counter.add(1);
+ self.data
+ .metrics
+ .put_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ let request_start = SystemTime::now();
+
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
@@ -140,6 +150,12 @@ where
if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
+ self.data.metrics.put_request_counter.add(1);
+ self.data
+ .metrics
+ .put_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+
Ok(())
}
}
@@ -149,6 +165,8 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
+ let request_start = SystemTime::now();
+
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
@@ -198,6 +216,12 @@ where
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
+
+ self.data.metrics.get_request_counter.add(1);
+ self.data
+ .metrics
+ .get_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret)
}
@@ -208,6 +232,8 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
+ let request_start = SystemTime::now();
+
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
@@ -265,6 +291,12 @@ where
.take(limit)
.map(|(_k, v)| v.take().unwrap())
.collect::<Vec<_>>();
+
+ self.data.metrics.get_request_counter.add(1);
+ self.data
+ .metrics
+ .get_request_duration
+ .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret_vec)
}