diff options
Diffstat (limited to 'src/garage/repair/online.rs')
-rw-r--r-- | src/garage/repair/online.rs | 79 |
1 files changed, 38 insertions, 41 deletions
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 42221c2a..627e3bf3 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -12,38 +12,37 @@ use garage_model::s3::version_table::*; use garage_table::*; use garage_util::background::*; use garage_util::error::Error; +use garage_util::migrate::Migrate; use crate::*; -pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { +pub async fn launch_online_repair( + garage: &Arc<Garage>, + bg: &BackgroundRunner, + opt: RepairOpt, +) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync(); - garage.object_table.syncer.add_full_sync(); - garage.version_table.syncer.add_full_sync(); - garage.block_ref_table.syncer.add_full_sync(); - garage.key_table.syncer.add_full_sync(); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; } RepairWhat::Versions => { info!("Repairing the versions table"); - garage - .background - .spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - garage - .background - .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); - garage - .background - .spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); } RepairWhat::Scrub { cmd } => { let cmd = match cmd { @@ -56,9 +55,10 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { } }; info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await; + garage.block_manager.send_scrub_command(cmd).await?; } } + Ok(()) } // ---- @@ -93,20 +93,15 @@ impl Worker for RepairVersionsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => { - self.pos = k; - v - } + let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), None => { info!("repair_versions: finished, done {}", self.counter); return Ok(WorkerState::Done); } }; - self.counter += 1; - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; if !version.deleted.get() { let object = self .garage @@ -134,10 +129,13 @@ impl Worker for RepairVersionsWorker { } } + self.counter += 1; + self.pos = next_pos; + Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -174,20 +172,16 @@ impl Worker for RepairBlockrefsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => { - self.pos = k; - v - } - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - self.counter += 1; + let (item_bytes, next_pos) = + match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!("repair_block_ref: finished, done {}", self.counter); + return Ok(WorkerState::Done); + } + }; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; if !block_ref.deleted.get() { let version = self .garage @@ -212,10 +206,13 @@ impl Worker for RepairBlockrefsWorker { } } + self.counter += 1; + self.pos = next_pos; + Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } |