From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/table/gc.rs | 6 ++---- src/table/sync.rs | 4 ++-- src/table/table.rs | 19 ++++++++----------- 3 files changed, 12 insertions(+), 17 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index c83c2050..1fc16364 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -69,10 +69,8 @@ where gc } - pub(crate) fn spawn_workers(self: &Arc) { - self.system - .background - .spawn_worker(GcWorker::new(self.clone())); + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); } async fn gc_loop_iter(&self) -> Result, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 7008a383..1e7618ca 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -87,12 +87,12 @@ where syncer } - pub(crate) fn spawn_workers(self: &Arc) { + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); self.add_full_sync_tx .store(Some(Arc::new(add_full_sync_tx))); - self.system.background.spawn_worker(SyncWorker { + bg.spawn_worker(SyncWorker { syncer: self.clone(), ring_recv: self.system.ring.clone(), ring: self.system.ring.borrow().clone(), diff --git a/src/table/table.rs b/src/table/table.rs index cb200ef2..4d93102e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,6 +14,7 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::{self, BackgroundRunner}; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -96,13 +97,11 @@ where table } - pub fn spawn_workers(self: &Arc) { - self.merkle_updater.spawn_workers(&self.system.background); - self.syncer.spawn_workers(); - self.gc.spawn_workers(); - self.system - .background - .spawn_worker(InsertQueueWorker(self.clone())); + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { @@ -276,9 +275,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } @@ -375,7 +372,7 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::>(); - self.system.background.spawn_cancellable(async move { + background::spawn(async move { for v in to_repair { self2.repair_on_read(&who[..], v).await?; } -- cgit v1.2.3