diff options
-rw-r--r-- | src/db/metric_proxy.rs | 4 | ||||
-rw-r--r-- | src/util/background/worker.rs | 24 |
2 files changed, 26 insertions, 2 deletions
diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs index d246e7fe..e5221a7f 100644 --- a/src/db/metric_proxy.rs +++ b/src/db/metric_proxy.rs @@ -9,7 +9,7 @@ use crate::{ }; use opentelemetry::{ global, - metrics::{Counter, Unit, ValueRecorder}, + metrics::{Unit, ValueRecorder}, KeyValue, }; @@ -21,7 +21,7 @@ pub struct MetricDbProxy { impl MetricDbProxy { pub fn init(db: LmdbDb) -> Db { - let meter = global::meter("garage/web"); + let meter = global::meter("garage/db"); let s = Self { db, op: meter diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 8165e2cb..22f747b6 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use async_trait::async_trait; use futures::future::*; use futures::stream::FuturesUnordered; use futures::StreamExt; +use opentelemetry::{global, metrics::ValueRecorder, KeyValue}; use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -62,6 +64,7 @@ pub(crate) struct WorkerProcessor { stop_signal: watch::Receiver<bool>, worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, + metrics: ValueRecorder<f64>, } impl WorkerProcessor { @@ -70,10 +73,15 @@ impl WorkerProcessor { stop_signal: watch::Receiver<bool>, worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, ) -> Self { + let meter = global::meter("garage/util"); Self { stop_signal, worker_chan, worker_info, + metrics: meter + .f64_value_recorder("util.worker_step") + .with_description("Duration and amount of worker steps executed") + .init(), } } @@ -103,6 +111,7 @@ impl WorkerProcessor { errors: 0, consecutive_errors: 0, last_error: None, + metrics: self.metrics.clone(), }; workers.push(async move { worker.step().await; @@ -183,10 +192,13 @@ struct WorkerHandler { errors: usize, consecutive_errors: usize, last_error: Option<(String, u64)>, + metrics: ValueRecorder<f64>, } impl WorkerHandler { async fn step(&mut self) { + let request_start = Instant::now(); + //@FIXME we also want to track errors in metrics but I don't know how yet. match self.state { WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { @@ -229,5 +241,17 @@ impl WorkerHandler { } WorkerState::Done => unreachable!(), } + + // metrics + let metric_tags = [ + KeyValue::new("state", self.state.to_string()), + KeyValue::new("name", self.worker.name()), + KeyValue::new("id", format!("{}", self.task_id)), + ]; + + let delay_secs = Instant::now() + .saturating_duration_since(request_start) + .as_secs_f64(); + self.metrics.record(delay_secs, &metric_tags); } } |