From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/block/manager.rs | 9 +++--- src/garage/admin.rs | 15 ++++++--- src/garage/repair/offline.rs | 17 ++--------- src/garage/repair/online.rs | 22 ++++++-------- src/garage/server.rs | 12 ++++---- src/model/garage.rs | 35 +++++++++------------ src/model/index_counter.rs | 5 +-- src/model/s3/object_table.rs | 2 -- src/model/s3/version_table.rs | 2 -- src/rpc/rpc_helper.rs | 18 +++-------- src/rpc/system.rs | 24 ++++++--------- src/table/gc.rs | 6 ++-- src/table/sync.rs | 4 +-- src/table/table.rs | 19 +++++------- src/util/background/job_worker.rs | 48 ----------------------------- src/util/background/mod.rs | 64 ++++++++------------------------------- 16 files changed, 89 insertions(+), 213 deletions(-) delete mode 100644 src/util/background/job_worker.rs diff --git a/src/block/manager.rs b/src/block/manager.rs index ffb9de9a..1b5a5df0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -23,6 +23,7 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; @@ -144,19 +145,17 @@ impl BlockManager { block_manager } - pub fn spawn_workers(self: &Arc) { + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { let worker = ResyncWorker::new(index, self.clone()); - self.system.background.spawn_worker(worker); + bg.spawn_worker(worker); } // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - self.system - .background - .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); } /// Ask nodes that might have a (possibly compressed) block for it diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 96d838d5..c669b5e6 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_util::background::BackgroundRunner; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -74,13 +75,18 @@ impl Rpc for AdminRpc { pub struct AdminRpcHandler { garage: Arc, + background: Arc, endpoint: Arc>, } impl AdminRpcHandler { - pub fn new(garage: Arc) -> Arc { + pub fn new(garage: Arc, background: Arc) -> Arc { let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { garage, endpoint }); + let admin = Arc::new(Self { + garage, + background, + endpoint, + }); admin.endpoint.set_handler(admin.clone()); admin } @@ -759,7 +765,7 @@ impl AdminRpcHandler { ))) } } else { - launch_online_repair(self.garage.clone(), opt).await?; + launch_online_repair(&self.garage, &self.background, opt).await?; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id @@ -925,12 +931,11 @@ impl AdminRpcHandler { async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { match cmd { WorkerOperation::List { opt } => { - let workers = self.garage.background.get_worker_info(); + let workers = self.background.get_worker_info(); Ok(AdminRpc::WorkerList(workers, *opt)) } WorkerOperation::Info { tid } => { let info = self - .garage .background .get_worker_info() .get(tid) diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index 7760a8bd..25193e4a 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -1,8 +1,5 @@ use std::path::PathBuf; -use tokio::sync::watch; - -use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; @@ -20,12 +17,8 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu info!("Loading configuration..."); let config = read_config(config_file)?; - info!("Initializing background runner..."); - let (done_tx, done_rx) = watch::channel(false); - let (background, await_background_done) = BackgroundRunner::new(16, done_rx); - info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), background)?; + let garage = Garage::new(config)?; info!("Launching repair operation..."); match opt.what { @@ -43,13 +36,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu } } - info!("Repair operation finished, shutting down Garage internals..."); - done_tx.send(true).unwrap(); - drop(garage); - - await_background_done.await?; - - info!("Cleaning up..."); + info!("Repair operation finished, shutting down..."); Ok(()) } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2a8e6298..4b4118a8 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,7 +15,11 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { +pub async fn launch_online_repair( + garage: &Arc, + bg: &BackgroundRunner, + opt: RepairOpt, +) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); @@ -27,23 +31,17 @@ pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result } RepairWhat::Versions => { info!("Repairing the versions table"); - garage - .background - .spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - garage - .background - .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); - garage - .background - .spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); } RepairWhat::Scrub { cmd } => { let cmd = match cmd { diff --git a/src/garage/server.rs b/src/garage/server.rs index 8e29f6ec..16f1b625 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -35,15 +35,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { #[cfg(feature = "metrics")] let metrics_exporter = opentelemetry_prometheus::exporter().init(); + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone())?; + info!("Initializing background runner..."); let watch_cancel = watch_shutdown_signal(); - let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); - - info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), background)?; + let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone()); info!("Spawning Garage workers..."); - garage.spawn_workers(); + garage.spawn_workers(&background); if config.admin.trace_sink.is_some() { info!("Initialize tracing..."); @@ -66,7 +66,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone()); + AdminRpcHandler::new(garage.clone(), background.clone()); // ---- Launch public-facing API servers ---- diff --git a/src/model/garage.rs b/src/model/garage.rs index 9ae6af82..5bea6b4f 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -39,8 +39,6 @@ pub struct Garage { /// The local database pub db: db::Db, - /// A background job runner - pub background: Arc, /// The membership manager pub system: Arc, /// The block manager @@ -78,7 +76,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, background: Arc) -> Result, Error> { + pub fn new(config: Config) -> Result, Error> { // Create meta dir and data dir if they don't exist already std::fs::create_dir_all(&config.metadata_dir) .ok_or_message("Unable to create Garage metadata directory")?; @@ -167,7 +165,7 @@ impl Garage { .expect("Invalid replication_mode in config file."); info!("Initialize membership management system..."); - let system = System::new(network_key, background.clone(), replication_mode, &config)?; + let system = System::new(network_key, replication_mode, &config)?; let data_rep_param = TableShardedReplication { system: system.clone(), @@ -225,7 +223,6 @@ impl Garage { info!("Initialize version_table..."); let version_table = Table::new( VersionTable { - background: background.clone(), block_ref_table: block_ref_table.clone(), }, meta_rep_param.clone(), @@ -240,7 +237,6 @@ impl Garage { #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { - background: background.clone(), version_table: version_table.clone(), object_counter_table: object_counter_table.clone(), }, @@ -258,7 +254,6 @@ impl Garage { config, replication_mode, db, - background, system, block_manager, bucket_table, @@ -273,20 +268,20 @@ impl Garage { })) } - pub fn spawn_workers(&self) { - self.block_manager.spawn_workers(); + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.block_manager.spawn_workers(bg); - self.bucket_table.spawn_workers(); - self.bucket_alias_table.spawn_workers(); - self.key_table.spawn_workers(); + self.bucket_table.spawn_workers(bg); + self.bucket_alias_table.spawn_workers(bg); + self.key_table.spawn_workers(bg); - self.object_table.spawn_workers(); - self.object_counter_table.spawn_workers(); - self.version_table.spawn_workers(); - self.block_ref_table.spawn_workers(); + self.object_table.spawn_workers(bg); + self.object_counter_table.spawn_workers(bg); + self.version_table.spawn_workers(bg); + self.block_ref_table.spawn_workers(bg); #[cfg(feature = "k2v")] - self.k2v.spawn_workers(); + self.k2v.spawn_workers(bg); } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { @@ -324,8 +319,8 @@ impl GarageK2V { } } - pub fn spawn_workers(&self) { - self.item_table.spawn_workers(); - self.counter_table.spawn_workers(); + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.item_table.spawn_workers(bg); + self.counter_table.spawn_workers(bg); } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index d907e947..6303ea3e 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -9,6 +9,7 @@ use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -164,8 +165,8 @@ impl IndexCounter { }) } - pub fn spawn_workers(&self) { - self.table.spawn_workers(); + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.table.spawn_workers(bg); } pub fn count( diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 05b27fb4..1b2f0014 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use garage_db as db; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +220,6 @@ impl Crdt for Object { } pub struct ObjectTable { - pub background: Arc, pub version_table: Arc>, pub object_counter_table: Arc>, } diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 0cfaa954..0486512b 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use garage_db as db; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; @@ -127,7 +126,6 @@ impl Crdt for Version { } pub struct VersionTable { - pub background: Arc, pub block_ref_table: Arc>, } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc, - background: Arc, ring: watch::Receiver>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc, - background: Arc, ring: watch::Receiver>, rpc_timeout: Option, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6f14fd..e14adf2a 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,7 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::BackgroundRunner; +use garage_util::background::{self}; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -110,9 +110,6 @@ pub struct System { pub ring: watch::Receiver>, update_ring: Mutex>>, - /// The job runner of this node - pub background: Arc, - /// Path to metadata directory pub metadata_dir: PathBuf, } @@ -232,7 +229,6 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - background: Arc, replication_mode: ReplicationMode, config: &Config, ) -> Result, Error> { @@ -354,7 +350,6 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - background.clone(), ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -372,7 +367,6 @@ impl System { ring, update_ring: Mutex::new(update_ring), - background, metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); @@ -578,7 +572,7 @@ impl System { } /// Save network configuration to disc - async fn save_cluster_layout(self: Arc) -> Result<(), Error> { + async fn save_cluster_layout(&self) -> Result<(), Error> { let ring: Arc = self.ring.borrow().clone(); self.persist_cluster_layout .save_async(&ring.layout) @@ -631,7 +625,7 @@ impl System { || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2.pull_cluster_layout(from).await; Ok(()) }); @@ -676,7 +670,7 @@ impl System { drop(update_ring); let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2 .rpc .broadcast( @@ -687,7 +681,8 @@ impl System { .await?; Ok(()) }); - self.background.spawn(self.clone().save_cluster_layout()); + + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) @@ -773,7 +768,7 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn( + background::spawn( self.netapp .clone() .try_connect(node_addr, node_id) @@ -787,11 +782,10 @@ impl System { } #[cfg(feature = "consul-discovery")] - self.background.spawn(self.clone().advertise_to_consul()); + background::spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] - self.background - .spawn(self.clone().advertise_to_kubernetes()); + background::spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { diff --git a/src/table/gc.rs b/src/table/gc.rs index c83c2050..1fc16364 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -69,10 +69,8 @@ where gc } - pub(crate) fn spawn_workers(self: &Arc) { - self.system - .background - .spawn_worker(GcWorker::new(self.clone())); + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); } async fn gc_loop_iter(&self) -> Result, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 7008a383..1e7618ca 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -87,12 +87,12 @@ where syncer } - pub(crate) fn spawn_workers(self: &Arc) { + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); self.add_full_sync_tx .store(Some(Arc::new(add_full_sync_tx))); - self.system.background.spawn_worker(SyncWorker { + bg.spawn_worker(SyncWorker { syncer: self.clone(), ring_recv: self.system.ring.clone(), ring: self.system.ring.borrow().clone(), diff --git a/src/table/table.rs b/src/table/table.rs index cb200ef2..4d93102e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,6 +14,7 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::{self, BackgroundRunner}; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -96,13 +97,11 @@ where table } - pub fn spawn_workers(self: &Arc) { - self.merkle_updater.spawn_workers(&self.system.background); - self.syncer.spawn_workers(); - self.gc.spawn_workers(); - self.system - .background - .spawn_worker(InsertQueueWorker(self.clone())); + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { @@ -276,9 +275,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } @@ -375,7 +372,7 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::>(); - self.system.background.spawn_cancellable(async move { + background::spawn(async move { for v in to_repair { self2.repair_on_read(&who[..], v).await?; } diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs deleted file mode 100644 index 2568ea11..00000000 --- a/src/util/background/job_worker.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! Job worker: a generic worker that just processes incoming -//! jobs one by one - -use std::sync::Arc; - -use async_trait::async_trait; -use tokio::sync::{mpsc, Mutex}; - -use crate::background::worker::*; -use crate::background::*; - -pub(crate) struct JobWorker { - pub(crate) index: usize, - pub(crate) job_chan: Arc>>, - pub(crate) next_job: Option, -} - -#[async_trait] -impl Worker for JobWorker { - fn name(&self) -> String { - format!("Job worker #{}", self.index) - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - match self.next_job.take() { - None => return Ok(WorkerState::Idle), - Some(job) => { - job.await?; - Ok(WorkerState::Busy) - } - } - } - - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - loop { - match self.job_chan.lock().await.recv().await { - Some((job, cancellable)) => { - if cancellable && *must_exit.borrow() { - continue; - } - self.next_job = Some(job); - return WorkerState::Busy; - } - None => return WorkerState::Done, - } - } - } -} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index fd9258b8..0bb4fb67 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,27 +1,23 @@ //! Job runner for futures and async functions -pub mod job_worker; pub mod worker; use core::future::Future; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch, Mutex}; +use tokio::sync::{mpsc, watch}; use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; pub(crate) type JobOutput = Result<(), Error>; -pub(crate) type Job = Pin + Send>>; /// Job runner for futures and async functions pub struct BackgroundRunner { - send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender>, worker_info: Arc>>, } @@ -49,10 +45,7 @@ pub struct WorkerStatus { impl BackgroundRunner { /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver, - ) -> (Arc, tokio::task::JoinHandle<()>) { + pub fn new(stop_signal: watch::Receiver) -> (Arc, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::>(); let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -63,24 +56,7 @@ impl BackgroundRunner { worker_processor.run().await; }); - let (send_job, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - - send_worker - .send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })) - .ok() - .unwrap(); - } - let bgrunner = Arc::new(Self { - send_job, send_worker, worker_info, }); @@ -91,31 +67,6 @@ impl BackgroundRunner { self.worker_info.lock().unwrap().clone() } - /// Spawn a task to be run in background - pub fn spawn(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, false)) - .ok() - .expect("Could not put job in queue"); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, true)) - .ok() - .expect("Could not put job in queue"); - } - pub fn spawn_worker(&self, worker: W) where W: Worker + 'static, @@ -126,3 +77,14 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } + +pub fn spawn(job: T) +where + T: Future + Send + 'static, +{ + tokio::spawn(async move { + if let Err(e) = job.await { + error!("{}", e); + } + }); +} -- cgit v1.2.3