diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-22 13:53:59 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-14 10:53:35 +0100 |
commit | 818daa5c786813fdf50fecb6022e29b18e509b62 (patch) | |
tree | 5f4df9917e7939d7722daaafcbcebe4961ad1184 /src/table/table.rs | |
parent | f0d0cd9a20979f59db246e6a545ddc1b7bbb20b3 (diff) | |
download | garage-818daa5c786813fdf50fecb6022e29b18e509b62.tar.gz garage-818daa5c786813fdf50fecb6022e29b18e509b62.zip |
Refactor how durations are measured
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 62 |
1 files changed, 34 insertions, 28 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 3ac3bc5b..9ba243c0 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use async_trait::async_trait; use futures::stream::*; @@ -9,6 +9,7 @@ use serde_bytes::ByteBuf; use garage_util::data::*; use garage_util::error::Error; +use garage_util::metrics::RecordDuration; use garage_rpc::system::System; use garage_rpc::*; @@ -81,8 +82,6 @@ where } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { - let request_start = SystemTime::now(); - let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); @@ -100,19 +99,22 @@ where .with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) + .bound_record_duration(&self.data.metrics.put_request_duration) .await?; self.data.metrics.put_request_counter.add(1); - self.data - .metrics - .put_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { - let request_start = SystemTime::now(); + self.insert_many_internal(entries) + .bound_record_duration(&self.data.metrics.put_request_duration) + .await?; + self.data.metrics.put_request_counter.add(1); + Ok(()) + } + async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { @@ -150,12 +152,6 @@ where if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { - self.data.metrics.put_request_counter.add(1); - self.data - .metrics - .put_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); - Ok(()) } } @@ -165,11 +161,20 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result<Option<F::E>, Error> { - let request_start = SystemTime::now(); + let res = self.get_internal(partition_key, sort_key) + .bound_record_duration(&self.data.metrics.get_request_duration) + .await?; + self.data.metrics.get_request_counter.add(1); + Ok(res) + } + async fn get_internal( + self: &Arc<Self>, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - //eprintln!("get who: {:?}", who); let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self @@ -217,11 +222,6 @@ where } } - self.data.metrics.get_request_counter.add(1); - self.data - .metrics - .get_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret) } @@ -232,8 +232,20 @@ where filter: Option<F::Filter>, limit: usize, ) -> Result<Vec<F::E>, Error> { - let request_start = SystemTime::now(); + let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit) + .bound_record_duration(&self.data.metrics.get_request_duration) + .await?; + self.data.metrics.get_request_counter.add(1); + Ok(res) + } + async fn get_range_internal( + self: &Arc<Self>, + partition_key: &F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); @@ -291,12 +303,6 @@ where .take(limit) .map(|(_k, v)| v.take().unwrap()) .collect::<Vec<_>>(); - - self.data.metrics.get_request_counter.add(1); - self.data - .metrics - .get_request_duration - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); Ok(ret_vec) } |