aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-21 12:37:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-21 12:37:52 +0200
commite12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (patch)
tree1a748d618917cbcf4e45b3756a0fcb62bb6ae42c
parent77e3fd6db2c9cd3a10889bd071e95ef839cfbefc (diff)
downloadgarage-e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62.tar.gz
garage-e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62.zip
First try on background worker manager
-rw-r--r--Cargo.lock3
-rw-r--r--src/db/Cargo.toml2
-rw-r--r--src/db/sqlite_adapter.rs2
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/background.rs160
-rw-r--r--src/util/background/job_worker.rs52
-rw-r--r--src/util/background/mod.rs91
-rw-r--r--src/util/background/worker.rs145
-rw-r--r--src/util/lib.rs1
9 files changed, 293 insertions, 164 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ecdf8a57..486ad4a1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1065,11 +1065,11 @@ dependencies = [
"err-derive 0.3.1",
"heed",
"hexdump",
- "log",
"mktemp",
"pretty_env_logger",
"rusqlite",
"sled",
+ "tracing",
]
[[package]]
@@ -1258,6 +1258,7 @@ dependencies = [
name = "garage_util"
version = "0.7.0"
dependencies = [
+ "async-trait",
"blake2",
"chrono",
"err-derive 0.3.1",
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 6d8f64be..f697054b 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -19,7 +19,7 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
-log = "0.4"
+tracing = "0.1.30"
heed = "0.11"
rusqlite = { version = "0.27", features = ["bundled"] }
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 68d96ca0..97a78b07 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -6,7 +6,7 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard};
-use log::trace;
+use tracing::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction};
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 5d073436..57c70ffb 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
+async-trait = "0.1"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
diff --git a/src/util/background.rs b/src/util/background.rs
deleted file mode 100644
index d35425f5..00000000
--- a/src/util/background.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-//! Job runner for futures and async functions
-use core::future::Future;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::*;
-use futures::select;
-use futures::stream::FuturesUnordered;
-use futures::StreamExt;
-use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
-
-use crate::error::Error;
-
-type JobOutput = Result<(), Error>;
-type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
-/// Job runner for futures and async functions
-pub struct BackgroundRunner {
- stop_signal: watch::Receiver<bool>,
- queue_in: mpsc::UnboundedSender<(Job, bool)>,
- worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
-}
-
-impl BackgroundRunner {
- /// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
- let (worker_in, mut worker_out) = mpsc::unbounded_channel();
-
- let stop_signal_2 = stop_signal.clone();
- let await_all_done = tokio::spawn(async move {
- let mut workers = FuturesUnordered::new();
- let mut shutdown_timer = 0;
- loop {
- let closed = match worker_out.try_recv() {
- Ok(wkr) => {
- workers.push(wkr);
- false
- }
- Err(TryRecvError::Empty) => false,
- Err(TryRecvError::Disconnected) => true,
- };
- select! {
- res = workers.next() => {
- if let Some(Err(e)) = res {
- error!("Worker exited with error: {}", e);
- }
- }
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if closed || *stop_signal_2.borrow() {
- shutdown_timer += 1;
- if shutdown_timer >= 10 {
- break;
- }
- }
- }
- }
- }
- });
-
- let (queue_in, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
- let stop_signal = stop_signal.clone();
-
- worker_in
- .send(tokio::spawn(async move {
- loop {
- let (job, cancellable) = {
- select! {
- item = wait_job(&queue_out).fuse() => match item {
- // We received a task, process it
- Some(x) => x,
- // We received a signal that no more tasks will ever be sent
- // because the sending side was dropped. Exit now.
- None => break,
- },
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal.borrow() {
- // Nothing has been going on for 5 secs, and we are shutting
- // down. Exit now.
- break;
- } else {
- // Nothing is going on but we don't want to exit.
- continue;
- }
- }
- }
- };
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- }
- info!("Background worker {} exiting", i);
- }))
- .unwrap();
- }
-
- let bgrunner = Arc::new(Self {
- stop_signal,
- queue_in,
- worker_in,
- });
- (bgrunner, await_all_done)
- }
-
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, false))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, true))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- pub fn spawn_worker<F, T>(&self, name: String, worker: F)
- where
- F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = ()> + Send + 'static,
- {
- let stop_signal = self.stop_signal.clone();
- let task = tokio::spawn(async move {
- info!("Worker started: {}", name);
- worker(stop_signal).await;
- info!("Worker exited: {}", name);
- });
- self.worker_in
- .send(task)
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-}
-
-async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
- q.lock().await.recv().await
-}
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..8cc660f8
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,52 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerStatus::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerStatus::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ // skip job
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerStatus::Busy
+ }
+ None => return WorkerStatus::Done,
+ }
+ }
+ }
+}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..97d25784
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,91 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::{Worker, WorkerProcessor};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let await_all_done =
+ tokio::spawn(
+ async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
+ );
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker.send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ })).ok().unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..a173902c
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,145 @@
+use std::time::{Duration, Instant};
+
+use tracing::*;
+use async_trait::async_trait;
+use futures::future::*;
+use tokio::select;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::{mpsc, watch};
+
+use crate::error::Error;
+
+#[derive(PartialEq, Copy, Clone)]
+pub enum WorkerStatus {
+ Busy,
+ Idle,
+ Done,
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
+ async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ workers.push(async move {
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ worker: new_worker,
+ status: WorkerStatus::Busy,
+ };
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ // TODO save new worker status somewhere
+ if worker.status == WorkerStatus::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.status == WorkerStatus::Busy
+ || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
+ {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ } else {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited in time \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
+ }
+ }
+ }
+}
+
+// TODO add tranquilizer
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ status: WorkerStatus,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.status {
+ WorkerStatus::Busy => {
+ match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.status = s;
+ }
+ Err(e) => {
+ error!("Error in worker {}: {}", self.worker.name(), e);
+ }
+ }
+ }
+ WorkerStatus::Idle => {
+ self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
+ }
+ WorkerStatus::Done => unreachable!()
+ }
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 8ca6e310..fce151af 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,7 +11,6 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
-//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;