From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/garage/admin.rs | 15 ++++++++++----- src/garage/repair/offline.rs | 17 ++--------------- src/garage/repair/online.rs | 22 ++++++++++------------ src/garage/server.rs | 12 ++++++------ 4 files changed, 28 insertions(+), 38 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 96d838d5..c669b5e6 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_util::background::BackgroundRunner; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -74,13 +75,18 @@ impl Rpc for AdminRpc { pub struct AdminRpcHandler { garage: Arc, + background: Arc, endpoint: Arc>, } impl AdminRpcHandler { - pub fn new(garage: Arc) -> Arc { + pub fn new(garage: Arc, background: Arc) -> Arc { let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { garage, endpoint }); + let admin = Arc::new(Self { + garage, + background, + endpoint, + }); admin.endpoint.set_handler(admin.clone()); admin } @@ -759,7 +765,7 @@ impl AdminRpcHandler { ))) } } else { - launch_online_repair(self.garage.clone(), opt).await?; + launch_online_repair(&self.garage, &self.background, opt).await?; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id @@ -925,12 +931,11 @@ impl AdminRpcHandler { async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { match cmd { WorkerOperation::List { opt } => { - let workers = self.garage.background.get_worker_info(); + let workers = self.background.get_worker_info(); Ok(AdminRpc::WorkerList(workers, *opt)) } WorkerOperation::Info { tid } => { let info = self - .garage .background .get_worker_info() .get(tid) diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index 7760a8bd..25193e4a 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -1,8 +1,5 @@ use std::path::PathBuf; -use tokio::sync::watch; - -use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; @@ -20,12 +17,8 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu info!("Loading configuration..."); let config = read_config(config_file)?; - info!("Initializing background runner..."); - let (done_tx, done_rx) = watch::channel(false); - let (background, await_background_done) = BackgroundRunner::new(16, done_rx); - info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), background)?; + let garage = Garage::new(config)?; info!("Launching repair operation..."); match opt.what { @@ -43,13 +36,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu } } - info!("Repair operation finished, shutting down Garage internals..."); - done_tx.send(true).unwrap(); - drop(garage); - - await_background_done.await?; - - info!("Cleaning up..."); + info!("Repair operation finished, shutting down..."); Ok(()) } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2a8e6298..4b4118a8 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,7 +15,11 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { +pub async fn launch_online_repair( + garage: &Arc, + bg: &BackgroundRunner, + opt: RepairOpt, +) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); @@ -27,23 +31,17 @@ pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result } RepairWhat::Versions => { info!("Repairing the versions table"); - garage - .background - .spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - garage - .background - .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); - garage - .background - .spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); } RepairWhat::Scrub { cmd } => { let cmd = match cmd { diff --git a/src/garage/server.rs b/src/garage/server.rs index 8e29f6ec..16f1b625 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -35,15 +35,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { #[cfg(feature = "metrics")] let metrics_exporter = opentelemetry_prometheus::exporter().init(); + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone())?; + info!("Initializing background runner..."); let watch_cancel = watch_shutdown_signal(); - let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); - - info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), background)?; + let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone()); info!("Spawning Garage workers..."); - garage.spawn_workers(); + garage.spawn_workers(&background); if config.admin.trace_sink.is_some() { info!("Initialize tracing..."); @@ -66,7 +66,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone()); + AdminRpcHandler::new(garage.clone(), background.clone()); // ---- Launch public-facing API servers ---- -- cgit v1.2.3