aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-05 17:37:45 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-06 16:35:28 +0200
commite30865984a5f23f046396ca192c1930314b50115 (patch)
treee5f0a6511c74b7ddb726733747eadbb7284ab129
parent55c514999eef17d7764040cde1b7b38ca111d24c (diff)
downloadgarage-e30865984a5f23f046396ca192c1930314b50115.tar.gz
garage-e30865984a5f23f046396ca192c1930314b50115.zip
block manager: scrub checkpointing
-rw-r--r--src/block/repair.rs158
1 files changed, 108 insertions, 50 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 0e7fe0df..a7c90d4f 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -176,7 +176,9 @@ mod v081 {
}
mod v082 {
+ use garage_util::data::Hash;
use serde::{Deserialize, Serialize};
+ use std::path::PathBuf;
use super::v081;
@@ -186,6 +188,27 @@ mod v082 {
pub(crate) time_last_complete_scrub: u64,
pub(crate) time_next_run_scrub: u64,
pub(crate) corruptions_detected: u64,
+ #[serde(default)]
+ pub(crate) checkpoint: Option<BlockStoreIterator>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub struct BlockStoreIterator {
+ pub todo: Vec<BsiTodo>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub enum BsiTodo {
+ Directory {
+ path: PathBuf,
+ progress_min: u64,
+ progress_max: u64,
+ },
+ File {
+ path: PathBuf,
+ hash: Hash,
+ progress: u64,
+ },
}
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
@@ -200,6 +223,7 @@ mod v082 {
time_last_complete_scrub: old.time_last_complete_scrub,
time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
corruptions_detected: old.corruptions_detected,
+ checkpoint: None,
}
}
}
@@ -236,14 +260,23 @@ impl Default for ScrubWorkerPersisted {
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
+ checkpoint: None,
}
}
}
#[derive(Default)]
enum ScrubWorkerState {
- Running(BlockStoreIterator),
- Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Running {
+ iterator: BlockStoreIterator,
+ // time of the last checkpoint
+ t_cp: u64,
+ },
+ Paused {
+ iterator: BlockStoreIterator,
+ // time at which the scrub should be resumed
+ t_resume: u64,
+ },
#[default]
Finished,
}
@@ -262,10 +295,17 @@ impl ScrubWorker {
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
persister: PersisterShared<ScrubWorkerPersisted>,
) -> Self {
+ let work = match persister.get_with(|x| x.checkpoint.clone()) {
+ None => ScrubWorkerState::Finished,
+ Some(iterator) => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
+ };
Self {
manager,
rx_cmd,
- work: ScrubWorkerState::Finished,
+ work,
tranquilizer: Tranquilizer::new(30),
persister,
}
@@ -278,7 +318,16 @@ impl ScrubWorker {
ScrubWorkerState::Finished => {
info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
- ScrubWorkerState::Running(iterator)
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ }
}
work => {
error!("Cannot start scrub worker: already running!");
@@ -288,8 +337,18 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
- ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ ScrubWorkerState::Running { iterator, .. }
+ | ScrubWorkerState::Paused { iterator, .. } => {
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Paused {
+ iterator,
+ t_resume: now_msec() + dur.as_millis() as u64,
+ }
}
work => {
error!("Cannot pause scrub worker: not running!");
@@ -299,7 +358,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
work => {
error!("Cannot resume scrub worker: not paused!");
work
@@ -308,7 +370,7 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => {
ScrubWorkerState::Finished
}
work => {
@@ -344,12 +406,15 @@ impl Worker for ScrubWorker {
..Default::default()
};
match &self.work {
- ScrubWorkerState::Running(bsi) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ ScrubWorkerState::Running { iterator, .. } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
}
- ScrubWorkerState::Paused(bsi, rt) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
- s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
+ ScrubWorkerState::Paused { iterator, t_resume } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
+ s.freeform = vec![format!(
+ "Scrub paused, resumes at {}",
+ msec_to_rfc3339(*t_resume)
+ )];
}
ScrubWorkerState::Finished => {
s.freeform = vec![
@@ -375,9 +440,11 @@ impl Worker for ScrubWorker {
};
match &mut self.work {
- ScrubWorkerState::Running(bsi) => {
+ ScrubWorkerState::Running { iterator, t_cp } => {
self.tranquilizer.reset();
- if let Some((_path, hash)) = bsi.next().await? {
+ let now = now_msec();
+
+ if let Some((_path, hash)) = iterator.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@@ -386,16 +453,23 @@ impl Worker for ScrubWorker {
Err(e) => return Err(e),
_ => (),
};
+
+ if now - *t_cp > 60 * 1000 {
+ self.persister
+ .set_with(|p| p.checkpoint = Some(iterator.clone()))?;
+ *t_cp = now;
+ }
+
Ok(self
.tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- let now = now_msec();
let next_scrub_timestamp = randomize_next_scrub_run_time(now);
self.persister.set_with(|p| {
p.time_last_complete_scrub = now;
p.time_next_run_scrub = next_scrub_timestamp;
+ p.checkpoint = None;
})?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
@@ -414,8 +488,8 @@ impl Worker for ScrubWorker {
async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
- ScrubWorkerState::Running(_) => return WorkerState::Busy,
- ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
+ ScrubWorkerState::Running { .. } => return WorkerState::Busy,
+ ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
@@ -438,7 +512,7 @@ impl Worker for ScrubWorker {
}
match &self.work {
- ScrubWorkerState::Running(_) => WorkerState::Busy,
+ ScrubWorkerState::Running { .. } => WorkerState::Busy,
_ => WorkerState::Idle,
}
}
@@ -450,23 +524,6 @@ impl Worker for ScrubWorker {
const PROGRESS_FP: u64 = 1_000_000_000;
-struct BlockStoreIterator {
- todo: Vec<BsiTodo>,
-}
-
-enum BsiTodo {
- Directory {
- path: PathBuf,
- progress_min: u64,
- progress_max: u64,
- },
- File {
- path: PathBuf,
- filename: String,
- progress: u64,
- },
-}
-
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
let min_cap = manager
@@ -551,11 +608,18 @@ impl BlockStoreIterator {
progress_max: 0,
});
} else if ft.is_file() {
- self.todo.push(BsiTodo::File {
- path: ent.path(),
- filename: name,
- progress: 0,
- });
+ let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name);
+ if filename.len() == 64 {
+ if let Ok(h) = hex::decode(filename) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ self.todo.push(BsiTodo::File {
+ path: ent.path(),
+ hash: hash.into(),
+ progress: 0,
+ });
+ }
+ }
}
}
@@ -582,20 +646,14 @@ impl BlockStoreIterator {
self.todo[istart..].reverse();
debug_assert!(self.progress_invariant());
}
- Some(BsiTodo::File { path, filename, .. }) => {
- let filename = filename.strip_suffix(".zst").unwrap_or(&filename);
- if filename.len() == 64 {
- if let Ok(h) = hex::decode(filename) {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&h);
- return Ok(Some((path, hash.into())));
- }
- }
+ Some(BsiTodo::File { path, hash, .. }) => {
+ return Ok(Some((path, hash)));
}
}
}
}
+ // for debug_assert!
fn progress_invariant(&self) -> bool {
let iter = self.todo.iter().map(|x| match x {
BsiTodo::Directory { progress_min, .. } => progress_min,