diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 1 | ||||
-rw-r--r-- | src/util/background.rs | 160 | ||||
-rw-r--r-- | src/util/background/job_worker.rs | 51 | ||||
-rw-r--r-- | src/util/background/mod.rs | 117 | ||||
-rw-r--r-- | src/util/background/worker.rs | 261 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/tranquilizer.rs | 25 |
7 files changed, 453 insertions, 163 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 5d073436..57c70ffb 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } +async-trait = "0.1" blake2 = "0.9" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } diff --git a/src/util/background.rs b/src/util/background.rs deleted file mode 100644 index d35425f5..00000000 --- a/src/util/background.rs +++ /dev/null @@ -1,160 +0,0 @@ -//! Job runner for futures and async functions -use core::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; - -use futures::future::*; -use futures::select; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; - -use crate::error::Error; - -type JobOutput = Result<(), Error>; -type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; - -/// Job runner for futures and async functions -pub struct BackgroundRunner { - stop_signal: watch::Receiver<bool>, - queue_in: mpsc::UnboundedSender<(Job, bool)>, - worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, -} - -impl BackgroundRunner { - /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver<bool>, - ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { - let (worker_in, mut worker_out) = mpsc::unbounded_channel(); - - let stop_signal_2 = stop_signal.clone(); - let await_all_done = tokio::spawn(async move { - let mut workers = FuturesUnordered::new(); - let mut shutdown_timer = 0; - loop { - let closed = match worker_out.try_recv() { - Ok(wkr) => { - workers.push(wkr); - false - } - Err(TryRecvError::Empty) => false, - Err(TryRecvError::Disconnected) => true, - }; - select! { - res = workers.next() => { - if let Some(Err(e)) = res { - error!("Worker exited with error: {}", e); - } - } - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if closed || *stop_signal_2.borrow() { - shutdown_timer += 1; - if shutdown_timer >= 10 { - break; - } - } - } - } - } - }); - - let (queue_in, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - let stop_signal = stop_signal.clone(); - - worker_in - .send(tokio::spawn(async move { - loop { - let (job, cancellable) = { - select! { - item = wait_job(&queue_out).fuse() => match item { - // We received a task, process it - Some(x) => x, - // We received a signal that no more tasks will ever be sent - // because the sending side was dropped. Exit now. - None => break, - }, - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal.borrow() { - // Nothing has been going on for 5 secs, and we are shutting - // down. Exit now. - break; - } else { - // Nothing is going on but we don't want to exit. - continue; - } - } - } - }; - if cancellable && *stop_signal.borrow() { - continue; - } - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } - info!("Background worker {} exiting", i); - })) - .unwrap(); - } - - let bgrunner = Arc::new(Self { - stop_signal, - queue_in, - worker_in, - }); - (bgrunner, await_all_done) - } - - /// Spawn a task to be run in background - pub fn spawn<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, false)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, true)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - pub fn spawn_worker<F, T>(&self, name: String, worker: F) - where - F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, - T: Future<Output = ()> + Send + 'static, - { - let stop_signal = self.stop_signal.clone(); - let task = tokio::spawn(async move { - info!("Worker started: {}", name); - worker(stop_signal).await; - info!("Worker exited: {}", name); - }); - self.worker_in - .send(task) - .map_err(|_| "could not put job in queue") - .unwrap(); - } -} - -async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> { - q.lock().await.recv().await -} diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs new file mode 100644 index 00000000..6754382a --- /dev/null +++ b/src/util/background/job_worker.rs @@ -0,0 +1,51 @@ +//! Job worker: a generic worker that just processes incoming +//! jobs one by one + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::{mpsc, Mutex}; + +use crate::background::worker::*; +use crate::background::*; + +pub(crate) struct JobWorker { + pub(crate) index: usize, + pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>, + pub(crate) next_job: Option<Job>, +} + +#[async_trait] +impl Worker for JobWorker { + fn name(&self) -> String { + format!("Job worker #{}", self.index) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, Error> { + match self.next_job.take() { + None => return Ok(WorkerState::Idle), + Some(job) => { + job.await?; + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + loop { + match self.job_chan.lock().await.recv().await { + Some((job, cancellable)) => { + if cancellable && *must_exit.borrow() { + continue; + } + self.next_job = Some(job); + return WorkerState::Busy; + } + None => return WorkerState::Done, + } + } + } +} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs new file mode 100644 index 00000000..619f5068 --- /dev/null +++ b/src/util/background/mod.rs @@ -0,0 +1,117 @@ +//! Job runner for futures and async functions + +pub mod job_worker; +pub mod worker; + +use core::future::Future; + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch, Mutex}; + +use crate::error::Error; +use worker::WorkerProcessor; +pub use worker::{Worker, WorkerState}; + +pub(crate) type JobOutput = Result<(), Error>; +pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; + +/// Job runner for futures and async functions +pub struct BackgroundRunner { + send_job: mpsc::UnboundedSender<(Job, bool)>, + send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct WorkerInfo { + pub name: String, + pub info: Option<String>, + pub state: WorkerState, + pub errors: usize, + pub consecutive_errors: usize, + pub last_error: Option<(String, u64)>, +} + +impl BackgroundRunner { + /// Create a new BackgroundRunner + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver<bool>, + ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { + let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); + + let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut worker_processor = + WorkerProcessor::new(worker_out, stop_signal, worker_info.clone()); + + let await_all_done = tokio::spawn(async move { + worker_processor.run().await; + }); + + let (send_job, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + + send_worker + .send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })) + .ok() + .unwrap(); + } + + let bgrunner = Arc::new(Self { + send_job, + send_worker, + worker_info, + }); + (bgrunner, await_all_done) + } + + pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> { + self.worker_info.lock().unwrap().clone() + } + + /// Spawn a task to be run in background + pub fn spawn<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, false)) + .ok() + .expect("Could not put job in queue"); + } + + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping + pub fn spawn_cancellable<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, true)) + .ok() + .expect("Could not put job in queue"); + } + + pub fn spawn_worker<W>(&self, worker: W) + where + W: Worker + 'static, + { + self.send_worker + .send(Box::new(worker)) + .ok() + .expect("Could not put worker in queue"); + } +} diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs new file mode 100644 index 00000000..7f573a07 --- /dev/null +++ b/src/util/background/worker.rs @@ -0,0 +1,261 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use futures::future::*; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::select; +use tokio::sync::{mpsc, watch}; +use tracing::*; + +use crate::background::WorkerInfo; +use crate::error::Error; +use crate::time::now_msec; + +#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] +pub enum WorkerState { + Busy, + Throttled(f32), + Idle, + Done, +} + +impl std::fmt::Display for WorkerState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WorkerState::Busy => write!(f, "Busy"), + WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerState::Idle => write!(f, "Idle"), + WorkerState::Done => write!(f, "Done"), + } + } +} + +#[async_trait] +pub trait Worker: Send { + fn name(&self) -> String; + + fn info(&self) -> Option<String> { + None + } + + /// Work: do a basic unit of work, if one is available (otherwise, should return + /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the + /// middle of processing, it will only be interrupted at the last minute when Garage is trying + /// to exit and this hasn't returned yet. This function may return an error to indicate that + /// its unit of work could not be processed due to an error: the error will be logged and + /// .work() will be called again after a short delay. + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>; + + /// Wait for work: await for some task to become available. This future can be interrupted in + /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we + /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows + /// it to check if we are exiting. + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState; +} + +pub(crate) struct WorkerProcessor { + stop_signal: watch::Receiver<bool>, + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, +} + +impl WorkerProcessor { + pub(crate) fn new( + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + stop_signal: watch::Receiver<bool>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, + ) -> Self { + Self { + stop_signal, + worker_chan, + worker_info, + } + } + + pub(crate) async fn run(&mut self) { + let mut workers = FuturesUnordered::new(); + let mut next_task_id = 1; + + while !*self.stop_signal.borrow() { + let await_next_worker = async { + if workers.is_empty() { + futures::future::pending().await + } else { + workers.next().await + } + }; + select! { + new_worker_opt = self.worker_chan.recv() => { + if let Some(new_worker) = new_worker_opt { + let task_id = next_task_id; + next_task_id += 1; + let stop_signal = self.stop_signal.clone(); + let stop_signal_worker = self.stop_signal.clone(); + let mut worker = WorkerHandler { + task_id, + stop_signal, + stop_signal_worker, + worker: new_worker, + state: WorkerState::Busy, + errors: 0, + consecutive_errors: 0, + last_error: None, + }; + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + worker = await_next_worker => { + if let Some(mut worker) = worker { + trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state); + + // Save worker info + let mut wi = self.worker_info.lock().unwrap(); + match wi.get_mut(&worker.task_id) { + Some(i) => { + i.state = worker.state; + i.info = worker.worker.info(); + i.errors = worker.errors; + i.consecutive_errors = worker.consecutive_errors; + if worker.last_error.is_some() { + i.last_error = worker.last_error.take(); + } + } + None => { + wi.insert(worker.task_id, WorkerInfo { + name: worker.worker.name(), + state: worker.state, + info: worker.worker.info(), + errors: worker.errors, + consecutive_errors: worker.consecutive_errors, + last_error: worker.last_error.take(), + }); + } + } + + if worker.state == WorkerState::Done { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } else { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + } + _ = self.stop_signal.changed() => (), + } + } + + // We are exiting, drain everything + let drain_half_time = Instant::now() + Duration::from_secs(5); + let drain_everything = async move { + while let Some(mut worker) = workers.next().await { + if worker.state == WorkerState::Done { + info!( + "Worker {} (TID {}) exited", + worker.worker.name(), + worker.task_id + ); + } else if Instant::now() > drain_half_time { + warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); + } else { + workers.push( + async move { + worker.step().await; + worker + } + .boxed(), + ); + } + } + }; + + select! { + _ = drain_everything => { + info!("All workers exited peacefully \\o/"); + } + _ = tokio::time::sleep(Duration::from_secs(9)) => { + error!("Some workers could not exit in time, we are cancelling some things in the middle"); + } + } + } +} + +struct WorkerHandler { + task_id: usize, + stop_signal: watch::Receiver<bool>, + stop_signal_worker: watch::Receiver<bool>, + worker: Box<dyn Worker>, + state: WorkerState, + errors: usize, + consecutive_errors: usize, + last_error: Option<(String, u64)>, +} + +impl WorkerHandler { + async fn step(&mut self) { + match self.state { + WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.state = s; + self.consecutive_errors = 0; + } + Err(e) => { + error!( + "Error in worker {} (TID {}): {}", + self.worker.name(), + self.task_id, + e + ); + self.errors += 1; + self.consecutive_errors += 1; + self.last_error = Some((format!("{}", e), now_msec())); + // Sleep a bit so that error won't repeat immediately, exponential backoff + // strategy (min 1sec, max ~60sec) + self.state = WorkerState::Throttled( + (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), + ); + } + }, + WorkerState::Throttled(delay) => { + // Sleep for given delay and go back to busy state + if !*self.stop_signal.borrow() { + select! { + _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (), + _ = self.stop_signal.changed() => (), + } + } + self.state = WorkerState::Busy; + } + WorkerState::Idle => { + if *self.stop_signal.borrow() { + select! { + new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { + self.state = new_st; + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // stay in Idle state + } + } + } else { + select! { + new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { + self.state = new_st; + } + _ = self.stop_signal.changed() => { + // stay in Idle state + } + } + } + } + WorkerState::Done => unreachable!(), + } + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index 8ca6e310..fce151af 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,7 +11,6 @@ pub mod error; pub mod formater; pub mod metrics; pub mod persister; -//pub mod sled_counter; pub mod time; pub mod token_bucket; pub mod tranquilizer; diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs index 28711387..fdb2918b 100644 --- a/src/util/tranquilizer.rs +++ b/src/util/tranquilizer.rs @@ -3,6 +3,8 @@ use std::time::{Duration, Instant}; use tokio::time::sleep; +use crate::background::WorkerState; + /// A tranquilizer is a helper object that is used to make /// background operations not take up too much time. /// @@ -33,7 +35,7 @@ impl Tranquilizer { } } - pub async fn tranquilize(&mut self, tranquility: u32) { + fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> { let observation = Instant::now() - self.last_step_begin; self.observations.push_back(observation); @@ -45,13 +47,32 @@ impl Tranquilizer { if !self.observations.is_empty() { let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32); + Some(delay) + } else { + None + } + } + + pub async fn tranquilize(&mut self, tranquility: u32) { + if let Some(delay) = self.tranquilize_internal(tranquility) { sleep(delay).await; + self.reset(); } + } - self.reset(); + #[must_use] + pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState { + match self.tranquilize_internal(tranquility) { + Some(delay) => WorkerState::Throttled(delay.as_secs_f32()), + None => WorkerState::Busy, + } } pub fn reset(&mut self) { self.last_step_begin = Instant::now(); } + + pub fn clear(&mut self) { + self.observations.clear(); + } } |