diff options
Diffstat (limited to 'src/garage/repair')
-rw-r--r-- | src/garage/repair/online.rs | 222 |
1 files changed, 137 insertions, 85 deletions
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index d6a71742..e33cf097 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,89 +1,110 @@ use std::sync::Arc; +use std::time::Duration; +use async_trait::async_trait; use tokio::sync::watch; +use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::*; +use garage_util::background::*; use garage_util::error::Error; use crate::*; -pub struct OnlineRepair { - pub garage: Arc<Garage>, -} - -impl OnlineRepair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); +pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync(); + garage.object_table.syncer.add_full_sync(); + garage.version_table.syncer.add_full_sync(); + garage.block_ref_table.syncer.add_full_sync(); + garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + garage + .background + .spawn_worker(RepairVersionsWorker::new(garage.clone())); + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + garage + .background + .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + garage + .background + .spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairWhat::Scrub { cmd } => { + let cmd = match cmd { + ScrubCmd::Start => ScrubWorkerCommand::Start, + ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), + ScrubCmd::Resume => ScrubWorkerCommand::Resume, + ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, + ScrubCmd::SetTranquility { tranquility } => { + ScrubWorkerCommand::SetTranquility(tranquility) + } + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await; } } +} - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } +// ---- + +struct RepairVersionsWorker { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, +} + +impl RepairVersionsWorker { + fn new(garage: Arc<Garage>) -> Self { + Self { + garage, + pos: vec![], + counter: 0, } - Ok(()) } +} - async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; +#[async_trait] +impl Worker for RepairVersionsWorker { + fn name(&self) -> String { + "Version repair worker".into() + } - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; + fn info(&self) -> Option<String> { + Some(format!("{} items done", self.counter)) + } - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; + None => { + info!("repair_versions: finished, done {}", self.counter); + return Ok(WorkerState::Done); } + }; + + self.counter += 1; + + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + if !version.deleted.get() { let object = self .garage .object_table @@ -109,32 +130,59 @@ impl OnlineRepair { .await?; } } - info!("repair_versions: finished, done {}", i); - Ok(()) + + Ok(WorkerState::Busy) } - async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + unreachable!() + } +} - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; +// ---- - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } +struct RepairBlockrefsWorker { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, +} - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; +impl RepairBlockrefsWorker { + fn new(garage: Arc<Garage>) -> Self { + Self { + garage, + pos: vec![], + counter: 0, + } + } +} + +#[async_trait] +impl Worker for RepairBlockrefsWorker { + fn name(&self) -> String { + "Block refs repair worker".into() + } + + fn info(&self) -> Option<String> { + Some(format!("{} items done", self.counter)) + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v } + None => { + info!("repair_block_ref: finished, done {}", self.counter); + return Ok(WorkerState::Done); + } + }; + + self.counter += 1; + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + if !block_ref.deleted.get() { let version = self .garage .version_table @@ -157,7 +205,11 @@ impl OnlineRepair { .await?; } } - info!("repair_block_ref: finished, done {}", i); - Ok(()) + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + unreachable!() } } |