From 2183518edccadef47cdeaf6476033b52d8832d6e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:28:07 +0100 Subject: Spawn all background workers in a separate step --- src/block/manager.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 28523a93..ffb9de9a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -87,7 +88,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, - tx_scrub_command: mpsc::Sender, + tx_scrub_command: ArcSwapOption>, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -126,8 +127,6 @@ impl BlockManager { let metrics = BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); - let (scrub_tx, scrub_rx) = mpsc::channel(1); - let block_manager = Arc::new(Self { replication, data_dir, @@ -138,21 +137,26 @@ impl BlockManager { system, endpoint, metrics, - tx_scrub_command: scrub_tx, + tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager + } + + pub fn spawn_workers(self: &Arc) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { - let worker = ResyncWorker::new(index, block_manager.clone()); - block_manager.system.background.spawn_worker(worker); + let worker = ResyncWorker::new(index, self.clone()); + self.system.background.spawn_worker(worker); } // Spawn scrub worker - let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx); - block_manager.system.background.spawn_worker(scrub_worker); - - block_manager + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + self.system + .background + .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); } /// Ask nodes that might have a (possibly compressed) block for it @@ -325,8 +329,11 @@ impl BlockManager { } /// Send command to start/stop/manager scrub worker - pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { - let _ = self.tx_scrub_command.send(cmd).await; + pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> { + let tx = self.tx_scrub_command.load(); + let tx = tx.as_ref().ok_or_message("scrub worker is not running")?; + tx.send(cmd).await.ok_or_message("send error")?; + Ok(()) } /// Get the reference count of a block -- cgit v1.2.3 From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/block/manager.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index ffb9de9a..1b5a5df0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -23,6 +23,7 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; @@ -144,19 +145,17 @@ impl BlockManager { block_manager } - pub fn spawn_workers(self: &Arc) { + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { let worker = ResyncWorker::new(index, self.clone()); - self.system.background.spawn_worker(worker); + bg.spawn_worker(worker); } // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - self.system - .background - .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); } /// Ask nodes that might have a (possibly compressed) block for it -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/block/repair.rs | 4 ++-- src/block/resync.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index 1878027e..f5515d4e 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -148,7 +148,7 @@ impl Worker for RepairWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -341,7 +341,7 @@ impl Worker for ScrubWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), diff --git a/src/block/resync.rs b/src/block/resync.rs index 8231b55d..51bb9846 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -540,7 +540,7 @@ impl Worker for ResyncWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { while self.index >= self.manager.resync.persisted.load().n_workers { self.manager.resync.notify.notified().await } -- cgit v1.2.3