From a855c54bdb1a6912e99a6d64ee97bc63c700f29f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 21 Jun 2022 15:27:58 +0200 Subject: Online repair new workers, except blocks and scrub --- src/garage/repair/online.rs | 228 +++++++++++++++++++++++++++----------------- 1 file changed, 141 insertions(+), 87 deletions(-) (limited to 'src/garage/repair/online.rs') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index d6a71742..e6fcd705 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use async_trait::async_trait; use tokio::sync::watch; use garage_model::garage::Garage; @@ -7,83 +8,103 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::*; +use garage_util::background::*; use garage_util::error::Error; use crate::*; -pub struct OnlineRepair { - pub garage: Arc, -} - -impl OnlineRepair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); +pub fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync(); + garage.object_table.syncer.add_full_sync(); + garage.version_table.syncer.add_full_sync(); + garage.block_ref_table.syncer.add_full_sync(); + garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + garage + .background + .spawn_worker(RepairVersionsWorker::new(garage.clone())); + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + garage + .background + .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + } + RepairWhat::Blocks => { + unimplemented!() + /* + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + */ + } + RepairWhat::Scrub { tranquility } => { + unimplemented!() + /* + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, tranquility) + .await?; + */ } } + Ok(()) +} - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } +// ---- + +struct RepairVersionsWorker { + garage: Arc, + pos: Vec, + iter: usize, +} + +impl RepairVersionsWorker { + fn new(garage: Arc) -> Self { + Self { + garage, + pos: vec![], + iter: 0, } - Ok(()) } +} - async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; +#[async_trait] +impl Worker for RepairVersionsWorker { + fn name(&self) -> String { + "Version repair worker".into() + } - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { + let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; + None => { + info!("repair_versions: finished, done {}", self.iter); + return Ok(WorkerStatus::Done); } + }; + + self.iter += 1; + if self.iter % 1000 == 0 { + info!("repair_versions: {}", self.iter); + } + + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + if !version.deleted.get() { let object = self .garage .object_table @@ -109,32 +130,61 @@ impl OnlineRepair { .await?; } } - info!("repair_versions: finished, done {}", i); - Ok(()) + + Ok(WorkerStatus::Busy) } - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { + unreachable!() + } +} - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } +// ---- + +struct RepairBlockrefsWorker { + garage: Arc, + pos: Vec, + iter: usize, +} + +impl RepairBlockrefsWorker { + fn new(garage: Arc) -> Self { + Self { + garage, + pos: vec![], + iter: 0, + } + } +} - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; +#[async_trait] +impl Worker for RepairBlockrefsWorker { + fn name(&self) -> String { + "Block refs repair worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v + } + None => { + info!("repair_block_ref: finished, done {}", self.iter); + return Ok(WorkerStatus::Done); } + }; + + self.iter += 1; + if self.iter % 1000 == 0 { + info!("repair_block_ref: {}", self.iter); + } + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + if !block_ref.deleted.get() { let version = self .garage .version_table @@ -157,7 +207,11 @@ impl OnlineRepair { .await?; } } - info!("repair_block_ref: finished, done {}", i); - Ok(()) + + Ok(WorkerStatus::Busy) + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { + unreachable!() } } -- cgit v1.2.3