diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-16 14:23:04 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-14 10:51:50 +0100 |
commit | 2cab84b1fe423a41b356211e592a614c95ec4e0c (patch) | |
tree | c7dc3227feccbd6f4a8aba0bf8025201f3acc229 /src/table/table.rs | |
parent | 1e2cf26373ef1812a3152a0057774f6381e66914 (diff) | |
download | garage-2cab84b1fe423a41b356211e592a614c95ec4e0c.tar.gz garage-2cab84b1fe423a41b356211e592a614c95ec4e0c.zip |
Add many metrics in table/ and rpc/
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 34 |
1 files changed, 33 insertions, 1 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 01789c11..3ac3bc5b 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use async_trait::async_trait; use futures::stream::*; @@ -81,6 +81,8 @@ where } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let request_start = SystemTime::now(); + let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); @@ -99,10 +101,18 @@ where .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; + + self.data.metrics.put_request_counter.add(1); + self.data + .metrics + .put_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let request_start = SystemTime::now(); + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { @@ -140,6 +150,12 @@ where if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { + self.data.metrics.put_request_counter.add(1); + self.data + .metrics + .put_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + Ok(()) } } @@ -149,6 +165,8 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result<Option<F::E>, Error> { + let request_start = SystemTime::now(); + let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); @@ -198,6 +216,12 @@ where .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } } + + self.data.metrics.get_request_counter.add(1); + self.data + .metrics + .get_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret) } @@ -208,6 +232,8 @@ where filter: Option<F::Filter>, limit: usize, ) -> Result<Vec<F::E>, Error> { + let request_start = SystemTime::now(); + let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); @@ -265,6 +291,12 @@ where .take(limit) .map(|(_k, v)| v.take().unwrap()) .collect::<Vec<_>>(); + + self.data.metrics.get_request_counter.add(1); + self.data + .metrics + .get_request_duration + .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret_vec) } |