aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/background.rs160
-rw-r--r--src/util/background/job_worker.rs48
-rw-r--r--src/util/background/mod.rs117
-rw-r--r--src/util/background/worker.rs261
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/tranquilizer.rs25
7 files changed, 450 insertions, 163 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 5d073436..57c70ffb 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
+async-trait = "0.1"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
diff --git a/src/util/background.rs b/src/util/background.rs
deleted file mode 100644
index d35425f5..00000000
--- a/src/util/background.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-//! Job runner for futures and async functions
-use core::future::Future;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::*;
-use futures::select;
-use futures::stream::FuturesUnordered;
-use futures::StreamExt;
-use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
-
-use crate::error::Error;
-
-type JobOutput = Result<(), Error>;
-type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
-/// Job runner for futures and async functions
-pub struct BackgroundRunner {
- stop_signal: watch::Receiver<bool>,
- queue_in: mpsc::UnboundedSender<(Job, bool)>,
- worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
-}
-
-impl BackgroundRunner {
- /// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
- let (worker_in, mut worker_out) = mpsc::unbounded_channel();
-
- let stop_signal_2 = stop_signal.clone();
- let await_all_done = tokio::spawn(async move {
- let mut workers = FuturesUnordered::new();
- let mut shutdown_timer = 0;
- loop {
- let closed = match worker_out.try_recv() {
- Ok(wkr) => {
- workers.push(wkr);
- false
- }
- Err(TryRecvError::Empty) => false,
- Err(TryRecvError::Disconnected) => true,
- };
- select! {
- res = workers.next() => {
- if let Some(Err(e)) = res {
- error!("Worker exited with error: {}", e);
- }
- }
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if closed || *stop_signal_2.borrow() {
- shutdown_timer += 1;
- if shutdown_timer >= 10 {
- break;
- }
- }
- }
- }
- }
- });
-
- let (queue_in, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
- let stop_signal = stop_signal.clone();
-
- worker_in
- .send(tokio::spawn(async move {
- loop {
- let (job, cancellable) = {
- select! {
- item = wait_job(&queue_out).fuse() => match item {
- // We received a task, process it
- Some(x) => x,
- // We received a signal that no more tasks will ever be sent
- // because the sending side was dropped. Exit now.
- None => break,
- },
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal.borrow() {
- // Nothing has been going on for 5 secs, and we are shutting
- // down. Exit now.
- break;
- } else {
- // Nothing is going on but we don't want to exit.
- continue;
- }
- }
- }
- };
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- }
- info!("Background worker {} exiting", i);
- }))
- .unwrap();
- }
-
- let bgrunner = Arc::new(Self {
- stop_signal,
- queue_in,
- worker_in,
- });
- (bgrunner, await_all_done)
- }
-
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, false))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, true))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- pub fn spawn_worker<F, T>(&self, name: String, worker: F)
- where
- F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = ()> + Send + 'static,
- {
- let stop_signal = self.stop_signal.clone();
- let task = tokio::spawn(async move {
- info!("Worker started: {}", name);
- worker(stop_signal).await;
- info!("Worker exited: {}", name);
- });
- self.worker_in
- .send(task)
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-}
-
-async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
- q.lock().await.recv().await
-}
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..2568ea11
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,48 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerState::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerState::Busy;
+ }
+ None => return WorkerState::Done,
+ }
+ }
+ }
+}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..619f5068
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,117 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::WorkerProcessor;
+pub use worker::{Worker, WorkerState};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub struct WorkerInfo {
+ pub name: String,
+ pub info: Option<String>,
+ pub state: WorkerState,
+ pub errors: usize,
+ pub consecutive_errors: usize,
+ pub last_error: Option<(String, u64)>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let mut worker_processor =
+ WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
+
+ let await_all_done = tokio::spawn(async move {
+ worker_processor.run().await;
+ });
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker
+ .send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ worker_info,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
+ self.worker_info.lock().unwrap().clone()
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..7f573a07
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,261 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use futures::future::*;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+use tokio::sync::{mpsc, watch};
+use tracing::*;
+
+use crate::background::WorkerInfo;
+use crate::error::Error;
+use crate::time::now_msec;
+
+#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+pub enum WorkerState {
+ Busy,
+ Throttled(f32),
+ Idle,
+ Done,
+}
+
+impl std::fmt::Display for WorkerState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ WorkerState::Busy => write!(f, "Busy"),
+ WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerState::Idle => write!(f, "Idle"),
+ WorkerState::Done => write!(f, "Done"),
+ }
+ }
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+
+ fn info(&self) -> Option<String> {
+ None
+ }
+
+ /// Work: do a basic unit of work, if one is available (otherwise, should return
+ /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
+ /// middle of processing, it will only be interrupted at the last minute when Garage is trying
+ /// to exit and this hasn't returned yet. This function may return an error to indicate that
+ /// its unit of work could not be processed due to an error: the error will be logged and
+ /// .work() will be called again after a short delay.
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
+
+ /// Wait for work: await for some task to become available. This future can be interrupted in
+ /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
+ /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
+ /// it to check if we are exiting.
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ worker_info,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ let stop_signal_worker = self.stop_signal.clone();
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ stop_signal_worker,
+ worker: new_worker,
+ state: WorkerState::Busy,
+ errors: 0,
+ consecutive_errors: 0,
+ last_error: None,
+ };
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state);
+
+ // Save worker info
+ let mut wi = self.worker_info.lock().unwrap();
+ match wi.get_mut(&worker.task_id) {
+ Some(i) => {
+ i.state = worker.state;
+ i.info = worker.worker.info();
+ i.errors = worker.errors;
+ i.consecutive_errors = worker.consecutive_errors;
+ if worker.last_error.is_some() {
+ i.last_error = worker.last_error.take();
+ }
+ }
+ None => {
+ wi.insert(worker.task_id, WorkerInfo {
+ name: worker.worker.name(),
+ state: worker.state,
+ info: worker.worker.info(),
+ errors: worker.errors,
+ consecutive_errors: worker.consecutive_errors,
+ last_error: worker.last_error.take(),
+ });
+ }
+ }
+
+ if worker.state == WorkerState::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.state == WorkerState::Done {
+ info!(
+ "Worker {} (TID {}) exited",
+ worker.worker.name(),
+ worker.task_id
+ );
+ } else if Instant::now() > drain_half_time {
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
+ } else {
+ workers.push(
+ async move {
+ worker.step().await;
+ worker
+ }
+ .boxed(),
+ );
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited peacefully \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ error!("Some workers could not exit in time, we are cancelling some things in the middle");
+ }
+ }
+ }
+}
+
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ stop_signal_worker: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ state: WorkerState,
+ errors: usize,
+ consecutive_errors: usize,
+ last_error: Option<(String, u64)>,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.state {
+ WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.state = s;
+ self.consecutive_errors = 0;
+ }
+ Err(e) => {
+ error!(
+ "Error in worker {} (TID {}): {}",
+ self.worker.name(),
+ self.task_id,
+ e
+ );
+ self.errors += 1;
+ self.consecutive_errors += 1;
+ self.last_error = Some((format!("{}", e), now_msec()));
+ // Sleep a bit so that error won't repeat immediately, exponential backoff
+ // strategy (min 1sec, max ~60sec)
+ self.state = WorkerState::Throttled(
+ (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
+ );
+ }
+ },
+ WorkerState::Throttled(delay) => {
+ // Sleep for given delay and go back to busy state
+ if !*self.stop_signal.borrow() {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+ self.state = WorkerState::Busy;
+ }
+ WorkerState::Idle => {
+ if *self.stop_signal.borrow() {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.state = new_st;
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)) => {
+ // stay in Idle state
+ }
+ }
+ } else {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.state = new_st;
+ }
+ _ = self.stop_signal.changed() => {
+ // stay in Idle state
+ }
+ }
+ }
+ }
+ WorkerState::Done => unreachable!(),
+ }
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 8ca6e310..fce151af 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,7 +11,6 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
-//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index 28711387..fdb2918b 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
+use crate::background::WorkerState;
+
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
@@ -33,7 +35,7 @@ impl Tranquilizer {
}
}
- pub async fn tranquilize(&mut self, tranquility: u32) {
+ fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
let observation = Instant::now() - self.last_step_begin;
self.observations.push_back(observation);
@@ -45,13 +47,32 @@ impl Tranquilizer {
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
+ Some(delay)
+ } else {
+ None
+ }
+ }
+
+ pub async fn tranquilize(&mut self, tranquility: u32) {
+ if let Some(delay) = self.tranquilize_internal(tranquility) {
sleep(delay).await;
+ self.reset();
}
+ }
- self.reset();
+ #[must_use]
+ pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState {
+ match self.tranquilize_internal(tranquility) {
+ Some(delay) => WorkerState::Throttled(delay.as_secs_f32()),
+ None => WorkerState::Busy,
+ }
}
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
+
+ pub fn clear(&mut self) {
+ self.observations.clear();
+ }
}