diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 129 |
1 files changed, 72 insertions, 57 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index e2884b69..064cc005 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -13,7 +13,7 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -53,7 +53,7 @@ impl Worker for RepairWorker { "Block repair worker".into() } - fn info(&self) -> Option<String> { + fn status(&self) -> WorkerStatus { match self.block_iter.as_ref() { None => { let idx_bytes = self @@ -66,9 +66,20 @@ impl Worker for RepairWorker { } else { idx_bytes }; - Some(format!("Phase 1: {}", hex::encode(idx_bytes))) + WorkerStatus { + progress: Some("0.00%".into()), + freeform: vec![format!( + "Currently in phase 1, iterator position: {}", + hex::encode(idx_bytes) + )], + ..Default::default() + } } - Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)), + Some(bi) => WorkerStatus { + progress: Some(format!("{:.2}%", bi.progress() * 100.)), + freeform: vec!["Currently in phase 2".into()], + ..Default::default() + }, } } @@ -137,7 +148,7 @@ impl Worker for RepairWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -157,15 +168,24 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - persister: Persister<ScrubWorkerPersisted>, - persisted: ScrubWorkerPersisted, + persister: PersisterShared<ScrubWorkerPersisted>, } #[derive(Serialize, Deserialize)] -struct ScrubWorkerPersisted { - tranquility: u32, - time_last_complete_scrub: u64, - corruptions_detected: u64, +pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, +} +impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +impl Default for ScrubWorkerPersisted { + fn default() -> Self { + ScrubWorkerPersisted { + time_last_complete_scrub: 0, + tranquility: INITIAL_SCRUB_TRANQUILITY, + corruptions_detected: 0, + } + } } enum ScrubWorkerState { @@ -186,27 +206,20 @@ pub enum ScrubWorkerCommand { Pause(Duration), Resume, Cancel, - SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { - let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ScrubWorkerPersisted { - time_last_complete_scrub: 0, - tranquility: INITIAL_SCRUB_TRANQUILITY, - corruptions_detected: 0, - }, - }; + pub(crate) fn new( + manager: Arc<BlockManager>, + rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, + persister: PersisterShared<ScrubWorkerPersisted>, + ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, - persisted, } } @@ -255,12 +268,6 @@ impl ScrubWorker { } } } - ScrubWorkerCommand::SetTranquility(t) => { - self.persisted.tranquility = t; - if let Err(e) = self.persister.save_async(&self.persisted).await { - error!("Could not save new tranquilitiy value: {}", e); - } - } } } } @@ -271,29 +278,37 @@ impl Worker for ScrubWorker { "Block scrub worker".into() } - fn info(&self) -> Option<String> { - let s = match &self.work { - ScrubWorkerState::Running(bsi) => format!( - "{:.2}% done (tranquility = {})", - bsi.progress() * 100., - self.persisted.tranquility - ), - ScrubWorkerState::Paused(bsi, rt) => { - format!( - "Paused, {:.2}% done, resumes at {}", - bsi.progress() * 100., - msec_to_rfc3339(*rt) + fn status(&self) -> WorkerStatus { + let (corruptions_detected, tranquility, time_last_complete_scrub) = + self.persister.get_with(|p| { + ( + p.corruptions_detected, + p.tranquility, + p.time_last_complete_scrub, ) - } - ScrubWorkerState::Finished => format!( - "Last completed scrub: {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) - ), + }); + + let mut s = WorkerStatus { + persistent_errors: Some(corruptions_detected), + tranquility: Some(tranquility), + ..Default::default() }; - Some(format!( - "{} ; corruptions detected: {}", - s, self.persisted.corruptions_detected - )) + match &self.work { + ScrubWorkerState::Running(bsi) => { + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + } + ScrubWorkerState::Paused(bsi, rt) => { + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; + } + ScrubWorkerState::Finished => { + s.freeform = vec![format!( + "Last scrub completed at {}", + msec_to_rfc3339(time_last_complete_scrub) + )]; + } + } + s } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { @@ -310,18 +325,17 @@ impl Worker for ScrubWorker { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); - self.persisted.corruptions_detected += 1; - self.persister.save_async(&self.persisted).await?; + self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer - .tranquilize_worker(self.persisted.tranquility)) + .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persisted.time_last_complete_scrub = now_msec(); - self.persister.save_async(&self.persisted).await?; + self.persister + .set_with(|p| p.time_last_complete_scrub = now_msec())?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); Ok(WorkerState::Idle) @@ -331,12 +345,13 @@ impl Worker for ScrubWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_last_complete_scrub) + + SCRUB_INTERVAL.as_millis() as u64, ScrubWorkerCommand::Start, ), }; |