diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/gc.rs | 6 | ||||
-rw-r--r-- | src/table/sync.rs | 4 | ||||
-rw-r--r-- | src/table/table.rs | 19 |
3 files changed, 12 insertions, 17 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index c83c2050..1fc16364 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -69,10 +69,8 @@ where gc } - pub(crate) fn spawn_workers(self: &Arc<Self>) { - self.system - .background - .spawn_worker(GcWorker::new(self.clone())); + pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); } async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 7008a383..1e7618ca 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -87,12 +87,12 @@ where syncer } - pub(crate) fn spawn_workers(self: &Arc<Self>) { + pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); self.add_full_sync_tx .store(Some(Arc::new(add_full_sync_tx))); - self.system.background.spawn_worker(SyncWorker { + bg.spawn_worker(SyncWorker { syncer: self.clone(), ring_recv: self.system.ring.clone(), ring: self.system.ring.borrow().clone(), diff --git a/src/table/table.rs b/src/table/table.rs index cb200ef2..4d93102e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,6 +14,7 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::{self, BackgroundRunner}; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -96,13 +97,11 @@ where table } - pub fn spawn_workers(self: &Arc<Self>) { - self.merkle_updater.spawn_workers(&self.system.background); - self.syncer.spawn_workers(); - self.gc.spawn_workers(); - self.system - .background - .spawn_worker(InsertQueueWorker(self.clone())); + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { @@ -276,9 +275,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } @@ -375,7 +372,7 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::<Vec<_>>(); - self.system.background.spawn_cancellable(async move { + background::spawn(async move { for v in to_repair { self2.repair_on_read(&who[..], v).await?; } |