From 2183518edccadef47cdeaf6476033b52d8832d6e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:28:07 +0100 Subject: Spawn all background workers in a separate step --- src/garage/repair/online.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'src/garage/repair/online.rs') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 42221c2a..2a8e6298 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,15 +15,15 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) { +pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync(); - garage.object_table.syncer.add_full_sync(); - garage.version_table.syncer.add_full_sync(); - garage.block_ref_table.syncer.add_full_sync(); - garage.key_table.syncer.add_full_sync(); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; } RepairWhat::Versions => { info!("Repairing the versions table"); @@ -56,9 +56,10 @@ pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) { } }; info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await; + garage.block_manager.send_scrub_command(cmd).await?; } } + Ok(()) } // ---- -- cgit v1.2.3 From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/garage/repair/online.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) (limited to 'src/garage/repair/online.rs') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2a8e6298..4b4118a8 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,7 +15,11 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { +pub async fn launch_online_repair( + garage: &Arc, + bg: &BackgroundRunner, + opt: RepairOpt, +) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); @@ -27,23 +31,17 @@ pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result } RepairWhat::Versions => { info!("Repairing the versions table"); - garage - .background - .spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - garage - .background - .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); - garage - .background - .spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); } RepairWhat::Scrub { cmd } => { let cmd = match cmd { -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/garage/repair/online.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/garage/repair/online.rs') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 4b4118a8..cd7de49b 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -136,7 +136,7 @@ impl Worker for RepairVersionsWorker { Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -214,7 +214,7 @@ impl Worker for RepairBlockrefsWorker { Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } -- cgit v1.2.3 From 1fcd0b371ba1fa94cd6efad5aa9a236b2e58c922 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:31:31 +0100 Subject: online repair workers: retry on error --- src/garage/repair/online.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) (limited to 'src/garage/repair/online.rs') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index cd7de49b..6e8ec2d3 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -92,19 +92,14 @@ impl Worker for RepairVersionsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => { - self.pos = k; - v - } + let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), None => { info!("repair_versions: finished, done {}", self.counter); return Ok(WorkerState::Done); } }; - self.counter += 1; - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if !version.deleted.get() { let object = self @@ -133,6 +128,9 @@ impl Worker for RepairVersionsWorker { } } + self.counter += 1; + self.pos = next_pos; + Ok(WorkerState::Busy) } @@ -173,19 +171,14 @@ impl Worker for RepairBlockrefsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => { - self.pos = k; - v - } + let (item_bytes, next_pos) = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), None => { info!("repair_block_ref: finished, done {}", self.counter); return Ok(WorkerState::Done); } }; - self.counter += 1; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if !block_ref.deleted.get() { let version = self @@ -211,6 +204,9 @@ impl Worker for RepairBlockrefsWorker { } } + self.counter += 1; + self.pos = next_pos; + Ok(WorkerState::Busy) } -- cgit v1.2.3 From 6b857a9b8cd454721a2a1e69e108794be07d36a2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 2 Jan 2023 13:50:42 +0100 Subject: cargo fmt --- src/garage/repair/online.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'src/garage/repair/online.rs') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6e8ec2d3..7120972c 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -171,13 +171,14 @@ impl Worker for RepairBlockrefsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; + let (item_bytes, next_pos) = + match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!("repair_block_ref: finished, done {}", self.counter); + return Ok(WorkerState::Done); + } + }; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if !block_ref.deleted.get() { -- cgit v1.2.3