aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-14 12:51:16 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-14 12:51:42 +0100
commitd56c472712df7c064387429a5af73d3bc0eb438d (patch)
treed3d73490ae3167a228ee91f69a583242c572cd9f /src/table
parent2183518edccadef47cdeaf6476033b52d8832d6e (diff)
downloadgarage-d56c472712df7c064387429a5af73d3bc0eb438d.tar.gz
garage-d56c472712df7c064387429a5af73d3bc0eb438d.zip
Refactor background runner and get rid of job worker
Diffstat (limited to 'src/table')
-rw-r--r--src/table/gc.rs6
-rw-r--r--src/table/sync.rs4
-rw-r--r--src/table/table.rs19
3 files changed, 12 insertions, 17 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index c83c2050..1fc16364 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -69,10 +69,8 @@ where
gc
}
- pub(crate) fn spawn_workers(self: &Arc<Self>) {
- self.system
- .background
- .spawn_worker(GcWorker::new(self.clone()));
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ bg.spawn_worker(GcWorker::new(self.clone()));
}
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 7008a383..1e7618ca 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -87,12 +87,12 @@ where
syncer
}
- pub(crate) fn spawn_workers(self: &Arc<Self>) {
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
self.add_full_sync_tx
.store(Some(Arc::new(add_full_sync_tx)));
- self.system.background.spawn_worker(SyncWorker {
+ bg.spawn_worker(SyncWorker {
syncer: self.clone(),
ring_recv: self.system.ring.clone(),
ring: self.system.ring.borrow().clone(),
diff --git a/src/table/table.rs b/src/table/table.rs
index cb200ef2..4d93102e 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -14,6 +14,7 @@ use opentelemetry::{
use garage_db as db;
+use garage_util::background::{self, BackgroundRunner};
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -96,13 +97,11 @@ where
table
}
- pub fn spawn_workers(self: &Arc<Self>) {
- self.merkle_updater.spawn_workers(&self.system.background);
- self.syncer.spawn_workers();
- self.gc.spawn_workers();
- self.system
- .background
- .spawn_worker(InsertQueueWorker(self.clone()));
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ self.merkle_updater.spawn_workers(bg);
+ self.syncer.spawn_workers(bg);
+ self.gc.spawn_workers(bg);
+ bg.spawn_worker(InsertQueueWorker(self.clone()));
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
@@ -276,9 +275,7 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
+ background::spawn(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
@@ -375,7 +372,7 @@ where
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
- self.system.background.spawn_cancellable(async move {
+ background::spawn(async move {
for v in to_repair {
self2.repair_on_read(&who[..], v).await?;
}