aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/table.rs62
1 files changed, 34 insertions, 28 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 3ac3bc5b..9ba243c0 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
-use std::time::{Duration, SystemTime};
+use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
@@ -9,6 +9,7 @@ use serde_bytes::ByteBuf;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::metrics::RecordDuration;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -81,8 +82,6 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
- let request_start = SystemTime::now();
-
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@@ -100,19 +99,22 @@ where
.with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
+ .bound_record_duration(&self.data.metrics.put_request_duration)
.await?;
self.data.metrics.put_request_counter.add(1);
- self.data
- .metrics
- .put_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
- let request_start = SystemTime::now();
+ self.insert_many_internal(entries)
+ .bound_record_duration(&self.data.metrics.put_request_duration)
+ .await?;
+ self.data.metrics.put_request_counter.add(1);
+ Ok(())
+ }
+ async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> {
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
@@ -150,12 +152,6 @@ where
if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
- self.data.metrics.put_request_counter.add(1);
- self.data
- .metrics
- .put_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
-
Ok(())
}
}
@@ -165,11 +161,20 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
- let request_start = SystemTime::now();
+ let res = self.get_internal(partition_key, sort_key)
+ .bound_record_duration(&self.data.metrics.get_request_duration)
+ .await?;
+ self.data.metrics.get_request_counter.add(1);
+ Ok(res)
+ }
+ async fn get_internal(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ sort_key: &F::S,
+ ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
- //eprintln!("get who: {:?}", who);
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
@@ -217,11 +222,6 @@ where
}
}
- self.data.metrics.get_request_counter.add(1);
- self.data
- .metrics
- .get_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret)
}
@@ -232,8 +232,20 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
- let request_start = SystemTime::now();
+ let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit)
+ .bound_record_duration(&self.data.metrics.get_request_duration)
+ .await?;
+ self.data.metrics.get_request_counter.add(1);
+ Ok(res)
+ }
+ async fn get_range_internal(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ begin_sort_key: Option<F::S>,
+ filter: Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
@@ -291,12 +303,6 @@ where
.take(limit)
.map(|(_k, v)| v.take().unwrap())
.collect::<Vec<_>>();
-
- self.data.metrics.get_request_counter.add(1);
- self.data
- .metrics
- .get_request_duration
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(ret_vec)
}