aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/table.rs39
1 files changed, 37 insertions, 2 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 69cac41a..7f87a449 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -7,6 +7,11 @@ use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use opentelemetry::{
+ trace::{FutureExt, TraceContextExt, Tracer},
+ Context,
+};
+
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -82,6 +87,20 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
+ let tracer = opentelemetry::global::tracer("garage_table");
+ let span = tracer.start(format!("{} insert", F::TABLE_NAME));
+
+ self.insert_internal(e)
+ .bound_record_duration(&self.data.metrics.put_request_duration)
+ .with_context(Context::current_with_span(span))
+ .await?;
+
+ self.data.metrics.put_request_counter.add(1);
+
+ Ok(())
+ }
+
+ async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@@ -99,18 +118,22 @@ where
.with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
- .bound_record_duration(&self.data.metrics.put_request_duration)
.await?;
- self.data.metrics.put_request_counter.add(1);
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ let tracer = opentelemetry::global::tracer("garage_table");
+ let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len()));
+
self.insert_many_internal(entries)
.bound_record_duration(&self.data.metrics.put_request_duration)
+ .with_context(Context::current_with_span(span))
.await?;
+
self.data.metrics.put_request_counter.add(1);
+
Ok(())
}
@@ -161,11 +184,17 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
+ let tracer = opentelemetry::global::tracer("garage_table");
+ let span = tracer.start(format!("{} get", F::TABLE_NAME));
+
let res = self
.get_internal(partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration)
+ .with_context(Context::current_with_span(span))
.await?;
+
self.data.metrics.get_request_counter.add(1);
+
Ok(res)
}
@@ -233,11 +262,17 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
+ let tracer = opentelemetry::global::tracer("garage_table");
+ let span = tracer.start(format!("{} get_range", F::TABLE_NAME));
+
let res = self
.get_range_internal(partition_key, begin_sort_key, filter, limit)
.bound_record_duration(&self.data.metrics.get_request_duration)
+ .with_context(Context::current_with_span(span))
.await?;
+
self.data.metrics.get_request_counter.add(1);
+
Ok(res)
}