aboutsummaryrefslogtreecommitdiff
path: root/src/block/repair.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r--src/block/repair.rs129
1 files changed, 72 insertions, 57 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index e2884b69..064cc005 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -13,7 +13,7 @@ use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -53,7 +53,7 @@ impl Worker for RepairWorker {
"Block repair worker".into()
}
- fn info(&self) -> Option<String> {
+ fn status(&self) -> WorkerStatus {
match self.block_iter.as_ref() {
None => {
let idx_bytes = self
@@ -66,9 +66,20 @@ impl Worker for RepairWorker {
} else {
idx_bytes
};
- Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ WorkerStatus {
+ progress: Some("0.00%".into()),
+ freeform: vec![format!(
+ "Currently in phase 1, iterator position: {}",
+ hex::encode(idx_bytes)
+ )],
+ ..Default::default()
+ }
}
- Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ Some(bi) => WorkerStatus {
+ progress: Some(format!("{:.2}%", bi.progress() * 100.)),
+ freeform: vec!["Currently in phase 2".into()],
+ ..Default::default()
+ },
}
}
@@ -137,7 +148,7 @@ impl Worker for RepairWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
@@ -157,15 +168,24 @@ pub struct ScrubWorker {
work: ScrubWorkerState,
tranquilizer: Tranquilizer,
- persister: Persister<ScrubWorkerPersisted>,
- persisted: ScrubWorkerPersisted,
+ persister: PersisterShared<ScrubWorkerPersisted>,
}
#[derive(Serialize, Deserialize)]
-struct ScrubWorkerPersisted {
- tranquility: u32,
- time_last_complete_scrub: u64,
- corruptions_detected: u64,
+pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+}
+impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
+impl Default for ScrubWorkerPersisted {
+ fn default() -> Self {
+ ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ tranquility: INITIAL_SCRUB_TRANQUILITY,
+ corruptions_detected: 0,
+ }
+ }
}
enum ScrubWorkerState {
@@ -186,27 +206,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration),
Resume,
Cancel,
- SetTranquility(u32),
}
impl ScrubWorker {
- pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
- let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ScrubWorkerPersisted {
- time_last_complete_scrub: 0,
- tranquility: INITIAL_SCRUB_TRANQUILITY,
- corruptions_detected: 0,
- },
- };
+ pub(crate) fn new(
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+ persister: PersisterShared<ScrubWorkerPersisted>,
+ ) -> Self {
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
persister,
- persisted,
}
}
@@ -255,12 +268,6 @@ impl ScrubWorker {
}
}
}
- ScrubWorkerCommand::SetTranquility(t) => {
- self.persisted.tranquility = t;
- if let Err(e) = self.persister.save_async(&self.persisted).await {
- error!("Could not save new tranquilitiy value: {}", e);
- }
- }
}
}
}
@@ -271,29 +278,37 @@ impl Worker for ScrubWorker {
"Block scrub worker".into()
}
- fn info(&self) -> Option<String> {
- let s = match &self.work {
- ScrubWorkerState::Running(bsi) => format!(
- "{:.2}% done (tranquility = {})",
- bsi.progress() * 100.,
- self.persisted.tranquility
- ),
- ScrubWorkerState::Paused(bsi, rt) => {
- format!(
- "Paused, {:.2}% done, resumes at {}",
- bsi.progress() * 100.,
- msec_to_rfc3339(*rt)
+ fn status(&self) -> WorkerStatus {
+ let (corruptions_detected, tranquility, time_last_complete_scrub) =
+ self.persister.get_with(|p| {
+ (
+ p.corruptions_detected,
+ p.tranquility,
+ p.time_last_complete_scrub,
)
- }
- ScrubWorkerState::Finished => format!(
- "Last completed scrub: {}",
- msec_to_rfc3339(self.persisted.time_last_complete_scrub)
- ),
+ });
+
+ let mut s = WorkerStatus {
+ persistent_errors: Some(corruptions_detected),
+ tranquility: Some(tranquility),
+ ..Default::default()
};
- Some(format!(
- "{} ; corruptions detected: {}",
- s, self.persisted.corruptions_detected
- ))
+ match &self.work {
+ ScrubWorkerState::Running(bsi) => {
+ s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ }
+ ScrubWorkerState::Paused(bsi, rt) => {
+ s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
+ }
+ ScrubWorkerState::Finished => {
+ s.freeform = vec![format!(
+ "Last scrub completed at {}",
+ msec_to_rfc3339(time_last_complete_scrub)
+ )];
+ }
+ }
+ s
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@@ -310,18 +325,17 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
- self.persisted.corruptions_detected += 1;
- self.persister.save_async(&self.persisted).await?;
+ self.persister.set_with(|p| p.corruptions_detected += 1)?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
- .tranquilize_worker(self.persisted.tranquility))
+ .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- self.persisted.time_last_complete_scrub = now_msec();
- self.persister.save_async(&self.persisted).await?;
+ self.persister
+ .set_with(|p| p.time_last_complete_scrub = now_msec())?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
Ok(WorkerState::Idle)
@@ -331,12 +345,13 @@ impl Worker for ScrubWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
- self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
+ self.persister.get_with(|p| p.time_last_complete_scrub)
+ + SCRUB_INTERVAL.as_millis() as u64,
ScrubWorkerCommand::Start,
),
};