aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-08-17 13:16:55 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-08-17 13:16:55 +0200
commit306a74379a7b5e56699ac6cdd98ae2c6e98efc0d (patch)
tree7f3344f312567ef11bfebacab47bd3e1c2189110 /src
parent14163b5853aea4357f4e17bb341ccd50ba3c89f1 (diff)
downloadgarage-306a74379a7b5e56699ac6cdd98ae2c6e98efc0d.tar.gz
garage-306a74379a7b5e56699ac6cdd98ae2c6e98efc0d.zip
add metrics to workers
Diffstat (limited to 'src')
-rw-r--r--src/db/metric_proxy.rs4
-rw-r--r--src/util/background/worker.rs24
2 files changed, 26 insertions, 2 deletions
diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs
index d246e7fe..e5221a7f 100644
--- a/src/db/metric_proxy.rs
+++ b/src/db/metric_proxy.rs
@@ -9,7 +9,7 @@ use crate::{
};
use opentelemetry::{
global,
- metrics::{Counter, Unit, ValueRecorder},
+ metrics::{Unit, ValueRecorder},
KeyValue,
};
@@ -21,7 +21,7 @@ pub struct MetricDbProxy {
impl MetricDbProxy {
pub fn init(db: LmdbDb) -> Db {
- let meter = global::meter("garage/web");
+ let meter = global::meter("garage/db");
let s = Self {
db,
op: meter
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 8165e2cb..22f747b6 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,11 +1,13 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
+use std::time::Instant;
use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
+use opentelemetry::{global, metrics::ValueRecorder, KeyValue};
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -62,6 +64,7 @@ pub(crate) struct WorkerProcessor {
stop_signal: watch::Receiver<bool>,
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+ metrics: ValueRecorder<f64>,
}
impl WorkerProcessor {
@@ -70,10 +73,15 @@ impl WorkerProcessor {
stop_signal: watch::Receiver<bool>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
) -> Self {
+ let meter = global::meter("garage/util");
Self {
stop_signal,
worker_chan,
worker_info,
+ metrics: meter
+ .f64_value_recorder("util.worker_step")
+ .with_description("Duration and amount of worker steps executed")
+ .init(),
}
}
@@ -103,6 +111,7 @@ impl WorkerProcessor {
errors: 0,
consecutive_errors: 0,
last_error: None,
+ metrics: self.metrics.clone(),
};
workers.push(async move {
worker.step().await;
@@ -183,10 +192,13 @@ struct WorkerHandler {
errors: usize,
consecutive_errors: usize,
last_error: Option<(String, u64)>,
+ metrics: ValueRecorder<f64>,
}
impl WorkerHandler {
async fn step(&mut self) {
+ let request_start = Instant::now();
+ //@FIXME we also want to track errors in metrics but I don't know how yet.
match self.state {
WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
@@ -229,5 +241,17 @@ impl WorkerHandler {
}
WorkerState::Done => unreachable!(),
}
+
+ // metrics
+ let metric_tags = [
+ KeyValue::new("state", self.state.to_string()),
+ KeyValue::new("name", self.worker.name()),
+ KeyValue::new("id", format!("{}", self.task_id)),
+ ];
+
+ let delay_secs = Instant::now()
+ .saturating_duration_since(request_start)
+ .as_secs_f64();
+ self.metrics.record(delay_secs, &metric_tags);
}
}