diff options
author | Alex Auvolat <alex@adnab.me> | 2023-09-05 17:37:45 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-09-06 16:35:28 +0200 |
commit | e30865984a5f23f046396ca192c1930314b50115 (patch) | |
tree | e5f0a6511c74b7ddb726733747eadbb7284ab129 /src/block/repair.rs | |
parent | 55c514999eef17d7764040cde1b7b38ca111d24c (diff) | |
download | garage-e30865984a5f23f046396ca192c1930314b50115.tar.gz garage-e30865984a5f23f046396ca192c1930314b50115.zip |
block manager: scrub checkpointing
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 158 |
1 files changed, 108 insertions, 50 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index 0e7fe0df..a7c90d4f 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -176,7 +176,9 @@ mod v081 { } mod v082 { + use garage_util::data::Hash; use serde::{Deserialize, Serialize}; + use std::path::PathBuf; use super::v081; @@ -186,6 +188,27 @@ mod v082 { pub(crate) time_last_complete_scrub: u64, pub(crate) time_next_run_scrub: u64, pub(crate) corruptions_detected: u64, + #[serde(default)] + pub(crate) checkpoint: Option<BlockStoreIterator>, + } + + #[derive(Serialize, Deserialize, Clone)] + pub struct BlockStoreIterator { + pub todo: Vec<BsiTodo>, + } + + #[derive(Serialize, Deserialize, Clone)] + pub enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + hash: Hash, + progress: u64, + }, } impl garage_util::migrate::Migrate for ScrubWorkerPersisted { @@ -200,6 +223,7 @@ mod v082 { time_last_complete_scrub: old.time_last_complete_scrub, time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), corruptions_detected: old.corruptions_detected, + checkpoint: None, } } } @@ -236,14 +260,23 @@ impl Default for ScrubWorkerPersisted { time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, + checkpoint: None, } } } #[derive(Default)] enum ScrubWorkerState { - Running(BlockStoreIterator), - Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + Running { + iterator: BlockStoreIterator, + // time of the last checkpoint + t_cp: u64, + }, + Paused { + iterator: BlockStoreIterator, + // time at which the scrub should be resumed + t_resume: u64, + }, #[default] Finished, } @@ -262,10 +295,17 @@ impl ScrubWorker { rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, persister: PersisterShared<ScrubWorkerPersisted>, ) -> Self { + let work = match persister.get_with(|x| x.checkpoint.clone()) { + None => ScrubWorkerState::Finished, + Some(iterator) => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, + }; Self { manager, rx_cmd, - work: ScrubWorkerState::Finished, + work, tranquilizer: Tranquilizer::new(30), persister, } @@ -278,7 +318,16 @@ impl ScrubWorker { ScrubWorkerState::Finished => { info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); - ScrubWorkerState::Running(iterator) + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + } } work => { error!("Cannot start scrub worker: already running!"); @@ -288,8 +337,18 @@ impl ScrubWorker { } ScrubWorkerCommand::Pause(dur) => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { - ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) + ScrubWorkerState::Running { iterator, .. } + | ScrubWorkerState::Paused { iterator, .. } => { + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Paused { + iterator, + t_resume: now_msec() + dur.as_millis() as u64, + } } work => { error!("Cannot pause scrub worker: not running!"); @@ -299,7 +358,10 @@ impl ScrubWorker { } ScrubWorkerCommand::Resume => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), + ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, work => { error!("Cannot resume scrub worker: not paused!"); work @@ -308,7 +370,7 @@ impl ScrubWorker { } ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { + ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { ScrubWorkerState::Finished } work => { @@ -344,12 +406,15 @@ impl Worker for ScrubWorker { ..Default::default() }; match &self.work { - ScrubWorkerState::Running(bsi) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + ScrubWorkerState::Running { iterator, .. } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); } - ScrubWorkerState::Paused(bsi, rt) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); - s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; + ScrubWorkerState::Paused { iterator, t_resume } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); + s.freeform = vec![format!( + "Scrub paused, resumes at {}", + msec_to_rfc3339(*t_resume) + )]; } ScrubWorkerState::Finished => { s.freeform = vec![ @@ -375,9 +440,11 @@ impl Worker for ScrubWorker { }; match &mut self.work { - ScrubWorkerState::Running(bsi) => { + ScrubWorkerState::Running { iterator, t_cp } => { self.tranquilizer.reset(); - if let Some((_path, hash)) = bsi.next().await? { + let now = now_msec(); + + if let Some((_path, hash)) = iterator.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -386,16 +453,23 @@ impl Worker for ScrubWorker { Err(e) => return Err(e), _ => (), }; + + if now - *t_cp > 60 * 1000 { + self.persister + .set_with(|p| p.checkpoint = Some(iterator.clone()))?; + *t_cp = now; + } + Ok(self .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - let now = now_msec(); let next_scrub_timestamp = randomize_next_scrub_run_time(now); self.persister.set_with(|p| { p.time_last_complete_scrub = now; p.time_next_run_scrub = next_scrub_timestamp; + p.checkpoint = None; })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); @@ -414,8 +488,8 @@ impl Worker for ScrubWorker { async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { - ScrubWorkerState::Running(_) => return WorkerState::Busy, - ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), + ScrubWorkerState::Running { .. } => return WorkerState::Busy, + ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, @@ -438,7 +512,7 @@ impl Worker for ScrubWorker { } match &self.work { - ScrubWorkerState::Running(_) => WorkerState::Busy, + ScrubWorkerState::Running { .. } => WorkerState::Busy, _ => WorkerState::Idle, } } @@ -450,23 +524,6 @@ impl Worker for ScrubWorker { const PROGRESS_FP: u64 = 1_000_000_000; -struct BlockStoreIterator { - todo: Vec<BsiTodo>, -} - -enum BsiTodo { - Directory { - path: PathBuf, - progress_min: u64, - progress_max: u64, - }, - File { - path: PathBuf, - filename: String, - progress: u64, - }, -} - impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let min_cap = manager @@ -551,11 +608,18 @@ impl BlockStoreIterator { progress_max: 0, }); } else if ft.is_file() { - self.todo.push(BsiTodo::File { - path: ent.path(), - filename: name, - progress: 0, - }); + let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + self.todo.push(BsiTodo::File { + path: ent.path(), + hash: hash.into(), + progress: 0, + }); + } + } } } @@ -582,20 +646,14 @@ impl BlockStoreIterator { self.todo[istart..].reverse(); debug_assert!(self.progress_invariant()); } - Some(BsiTodo::File { path, filename, .. }) => { - let filename = filename.strip_suffix(".zst").unwrap_or(&filename); - if filename.len() == 64 { - if let Ok(h) = hex::decode(filename) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some((path, hash.into()))); - } - } + Some(BsiTodo::File { path, hash, .. }) => { + return Ok(Some((path, hash))); } } } } + // for debug_assert! fn progress_invariant(&self) -> bool { let iter = self.todo.iter().map(|x| match x { BsiTodo::Directory { progress_min, .. } => progress_min, |