aboutsummaryrefslogtreecommitdiff
path: root/src/block/repair.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r--src/block/repair.rs410
1 files changed, 314 insertions, 96 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 71093d69..77ee0d14 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use crate::block::*;
use crate::manager::*;
// Full scrub every 25 days with a random element of 10 days mixed in below
@@ -136,7 +137,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
- if let Some(hash) = bi.next().await? {
+ if let Some((_path, hash)) = bi.next().await? {
self.manager
.resync
.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -175,7 +176,9 @@ mod v081 {
}
mod v082 {
+ use garage_util::data::Hash;
use serde::{Deserialize, Serialize};
+ use std::path::PathBuf;
use super::v081;
@@ -185,6 +188,27 @@ mod v082 {
pub(crate) time_last_complete_scrub: u64,
pub(crate) time_next_run_scrub: u64,
pub(crate) corruptions_detected: u64,
+ #[serde(default)]
+ pub(crate) checkpoint: Option<BlockStoreIterator>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub struct BlockStoreIterator {
+ pub todo: Vec<BsiTodo>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub enum BsiTodo {
+ Directory {
+ path: PathBuf,
+ progress_min: u64,
+ progress_max: u64,
+ },
+ File {
+ path: PathBuf,
+ hash: Hash,
+ progress: u64,
+ },
}
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
@@ -199,6 +223,7 @@ mod v082 {
time_last_complete_scrub: old.time_last_complete_scrub,
time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
corruptions_detected: old.corruptions_detected,
+ checkpoint: None,
}
}
}
@@ -235,14 +260,23 @@ impl Default for ScrubWorkerPersisted {
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
+ checkpoint: None,
}
}
}
#[derive(Default)]
enum ScrubWorkerState {
- Running(BlockStoreIterator),
- Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Running {
+ iterator: BlockStoreIterator,
+ // time of the last checkpoint
+ t_cp: u64,
+ },
+ Paused {
+ iterator: BlockStoreIterator,
+ // time at which the scrub should be resumed
+ t_resume: u64,
+ },
#[default]
Finished,
}
@@ -261,10 +295,17 @@ impl ScrubWorker {
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
persister: PersisterShared<ScrubWorkerPersisted>,
) -> Self {
+ let work = match persister.get_with(|x| x.checkpoint.clone()) {
+ None => ScrubWorkerState::Finished,
+ Some(iterator) => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
+ };
Self {
manager,
rx_cmd,
- work: ScrubWorkerState::Finished,
+ work,
tranquilizer: Tranquilizer::new(30),
persister,
}
@@ -277,7 +318,16 @@ impl ScrubWorker {
ScrubWorkerState::Finished => {
info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
- ScrubWorkerState::Running(iterator)
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ }
}
work => {
error!("Cannot start scrub worker: already running!");
@@ -287,8 +337,18 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
- ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ ScrubWorkerState::Running { iterator, .. }
+ | ScrubWorkerState::Paused { iterator, .. } => {
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Paused {
+ iterator,
+ t_resume: now_msec() + dur.as_millis() as u64,
+ }
}
work => {
error!("Cannot pause scrub worker: not running!");
@@ -298,7 +358,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
work => {
error!("Cannot resume scrub worker: not paused!");
work
@@ -307,7 +370,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => {
+ if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
ScrubWorkerState::Finished
}
work => {
@@ -343,12 +409,15 @@ impl Worker for ScrubWorker {
..Default::default()
};
match &self.work {
- ScrubWorkerState::Running(bsi) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ ScrubWorkerState::Running { iterator, .. } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
}
- ScrubWorkerState::Paused(bsi, rt) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
- s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
+ ScrubWorkerState::Paused { iterator, t_resume } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
+ s.freeform = vec![format!(
+ "Scrub paused, resumes at {}",
+ msec_to_rfc3339(*t_resume)
+ )];
}
ScrubWorkerState::Finished => {
s.freeform = vec![
@@ -374,9 +443,11 @@ impl Worker for ScrubWorker {
};
match &mut self.work {
- ScrubWorkerState::Running(bsi) => {
+ ScrubWorkerState::Running { iterator, t_cp } => {
self.tranquilizer.reset();
- if let Some(hash) = bsi.next().await? {
+ let now = now_msec();
+
+ if let Some((_path, hash)) = iterator.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@@ -385,16 +456,23 @@ impl Worker for ScrubWorker {
Err(e) => return Err(e),
_ => (),
};
+
+ if now - *t_cp > 60 * 1000 {
+ self.persister
+ .set_with(|p| p.checkpoint = Some(iterator.clone()))?;
+ *t_cp = now;
+ }
+
Ok(self
.tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- let now = now_msec();
let next_scrub_timestamp = randomize_next_scrub_run_time(now);
self.persister.set_with(|p| {
p.time_last_complete_scrub = now;
p.time_next_run_scrub = next_scrub_timestamp;
+ p.checkpoint = None;
})?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
@@ -413,8 +491,8 @@ impl Worker for ScrubWorker {
async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
- ScrubWorkerState::Running(_) => return WorkerState::Busy,
- ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
+ ScrubWorkerState::Running { .. } => return WorkerState::Busy,
+ ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
@@ -437,110 +515,250 @@ impl Worker for ScrubWorker {
}
match &self.work {
- ScrubWorkerState::Running(_) => WorkerState::Busy,
+ ScrubWorkerState::Running { .. } => WorkerState::Busy,
_ => WorkerState::Idle,
}
}
}
// ---- ---- ----
-// UTILITY FOR ENUMERATING THE BLOCK STORE
+// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS
+// between multiple storage locations.
+// This is a one-shot repair operation that can be launched,
+// checks everything, and then exits.
// ---- ---- ----
-struct BlockStoreIterator {
- path: Vec<ReadingDir>,
+pub struct RebalanceWorker {
+ manager: Arc<BlockManager>,
+ block_iter: BlockStoreIterator,
+ t_started: u64,
+ t_finished: Option<u64>,
+ moved: usize,
+ moved_bytes: u64,
}
-enum ReadingDir {
- Pending(PathBuf),
- Read {
- subpaths: Vec<fs::DirEntry>,
- pos: usize,
- },
+impl RebalanceWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ let block_iter = BlockStoreIterator::new(&manager);
+ Self {
+ manager,
+ block_iter,
+ t_started: now_msec(),
+ t_finished: None,
+ moved: 0,
+ moved_bytes: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RebalanceWorker {
+ fn name(&self) -> String {
+ "Block rebalance worker".into()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
+ let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
+ let mut freeform = vec![
+ format!("Blocks moved: {}", self.moved),
+ format!(
+ "Bytes moved: {} ({}/s)",
+ bytesize::ByteSize::b(self.moved_bytes),
+ bytesize::ByteSize::b(rate)
+ ),
+ format!("Started: {}", msec_to_rfc3339(self.t_started)),
+ ];
+ if let Some(t_fin) = self.t_finished {
+ freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
+ }
+ WorkerStatus {
+ progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
+ freeform,
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some((path, hash)) = self.block_iter.next().await? {
+ let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
+ if path.ancestors().all(|x| x != prim_loc) {
+ let block_path = match path.extension() {
+ None => DataBlockPath::Plain(path.clone()),
+ Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
+ _ => {
+ warn!("not rebalancing file: {}", path.to_string_lossy());
+ return Ok(WorkerState::Busy);
+ }
+ };
+ // block is not in its primary location,
+ // move it there (reading and re-writing does the trick)
+ debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc);
+ let block_len = self.manager.fix_block_location(&hash, block_path).await?;
+ self.moved += 1;
+ self.moved_bytes += block_len as u64;
+ }
+ Ok(WorkerState::Busy)
+ } else {
+ // all blocks are in their primary location:
+ // - the ones we moved now are
+ // - the ones written in the meantime always were, because we only
+ // write to primary locations
+ // so we can safely remove all secondary locations from the data layout
+ let new_layout = self
+ .manager
+ .data_layout
+ .load_full()
+ .without_secondary_locations();
+ self.manager
+ .data_layout_persister
+ .save_async(&new_layout)
+ .await?;
+ self.manager.data_layout.store(Arc::new(new_layout));
+ self.t_finished = Some(now_msec());
+ Ok(WorkerState::Done)
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
}
+// ---- ---- ----
+// UTILITY FOR ENUMERATING THE BLOCK STORE
+// ---- ---- ----
+
+const PROGRESS_FP: u64 = 1_000_000_000;
+
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
- let root_dir = manager.data_dir.clone();
- Self {
- path: vec![ReadingDir::Pending(root_dir)],
+ let data_layout = manager.data_layout.load_full();
+
+ let mut dir_cap = vec![0; data_layout.data_dirs.len()];
+ for prim in data_layout.part_prim.iter() {
+ dir_cap[*prim as usize] += 1;
+ }
+ for sec_vec in data_layout.part_sec.iter() {
+ for sec in sec_vec.iter() {
+ dir_cap[*sec as usize] += 1;
+ }
}
+ let sum_cap = dir_cap.iter().sum::<usize>() as u64;
+
+ let mut cum_cap = 0;
+ let mut todo = vec![];
+ for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) {
+ let progress_min = (cum_cap * PROGRESS_FP) / sum_cap;
+ let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap;
+ cum_cap += cap as u64;
+
+ todo.push(BsiTodo::Directory {
+ path: dir.path.clone(),
+ progress_min,
+ progress_max,
+ });
+ }
+ // entries are processed back-to-front (because of .pop()),
+ // so reverse entries to process them in increasing progress bounds
+ todo.reverse();
+
+ let ret = Self { todo };
+ debug_assert!(ret.progress_invariant());
+
+ ret
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
- if self.path.is_empty() {
- 1.0
- } else {
- let mut ret = 0.0;
- let mut next_div = 1;
- for p in self.path.iter() {
- match p {
- ReadingDir::Pending(_) => break,
- ReadingDir::Read { subpaths, pos } => {
- next_div *= subpaths.len();
- ret += ((*pos - 1) as f32) / (next_div as f32);
- }
- }
- }
- ret
- }
+ self.todo
+ .last()
+ .map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => *progress_min,
+ BsiTodo::File { progress, .. } => *progress,
+ })
+ .map(|x| x as f32 / PROGRESS_FP as f32)
+ .unwrap_or(1.0)
}
- async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ async fn next(&mut self) -> Result<Option<(PathBuf, Hash)>, Error> {
loop {
- let last_path = match self.path.last_mut() {
+ match self.todo.pop() {
None => return Ok(None),
- Some(lp) => lp,
- };
-
- if let ReadingDir::Pending(path) = last_path {
- let mut reader = fs::read_dir(&path).await?;
- let mut subpaths = vec![];
- while let Some(ent) = reader.next_entry().await? {
- subpaths.push(ent);
- }
- *last_path = ReadingDir::Read { subpaths, pos: 0 };
- }
+ Some(BsiTodo::Directory {
+ path,
+ progress_min,
+ progress_max,
+ }) => {
+ let istart = self.todo.len();
+
+ let mut reader = fs::read_dir(&path).await?;
+ while let Some(ent) = reader.next_entry().await? {
+ let name = if let Ok(n) = ent.file_name().into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ft = ent.file_type().await?;
+ if ft.is_dir() && hex::decode(&name).is_ok() {
+ self.todo.push(BsiTodo::Directory {
+ path: ent.path(),
+ progress_min: 0,
+ progress_max: 0,
+ });
+ } else if ft.is_file() {
+ let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name);
+ if filename.len() == 64 {
+ if let Ok(h) = hex::decode(filename) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ self.todo.push(BsiTodo::File {
+ path: ent.path(),
+ hash: hash.into(),
+ progress: 0,
+ });
+ }
+ }
+ }
+ }
- let (subpaths, pos) = match *last_path {
- ReadingDir::Read {
- ref subpaths,
- ref mut pos,
- } => (subpaths, pos),
- ReadingDir::Pending(_) => unreachable!(),
- };
-
- let data_dir_ent = match subpaths.get(*pos) {
- None => {
- self.path.pop();
- continue;
- }
- Some(ent) => {
- *pos += 1;
- ent
+ let count = self.todo.len() - istart;
+ for (i, ent) in self.todo[istart..].iter_mut().enumerate() {
+ let p1 = progress_min
+ + ((progress_max - progress_min) * i as u64) / count as u64;
+ let p2 = progress_min
+ + ((progress_max - progress_min) * (i + 1) as u64) / count as u64;
+ match ent {
+ BsiTodo::Directory {
+ progress_min,
+ progress_max,
+ ..
+ } => {
+ *progress_min = p1;
+ *progress_max = p2;
+ }
+ BsiTodo::File { progress, .. } => {
+ *progress = p1;
+ }
+ }
+ }
+ self.todo[istart..].reverse();
+ debug_assert!(self.progress_invariant());
}
- };
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
- let path = data_dir_ent.path();
- self.path.push(ReadingDir::Pending(path));
- } else if name.len() == 64 {
- if let Ok(h) = hex::decode(name) {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&h);
- return Ok(Some(hash.into()));
+ Some(BsiTodo::File { path, hash, .. }) => {
+ return Ok(Some((path, hash)));
}
}
}
}
+
+ // for debug_assert!
+ fn progress_invariant(&self) -> bool {
+ let iter = self.todo.iter().map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => progress_min,
+ BsiTodo::File { progress, .. } => progress,
+ });
+ let iter_1 = iter.clone().skip(1);
+ iter.zip(iter_1).all(|(prev, next)| prev >= next)
+ }
}