diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 163 |
1 files changed, 119 insertions, 44 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index a6ded65a..c89484d9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use rand::Rng; use tokio::fs; use tokio::select; use tokio::sync::mpsc; @@ -13,14 +13,14 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::manager::*; -// Full scrub every 30 days -const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); +// Full scrub every 25 days with a random element of 10 days mixed in below +const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25); // Scrub tranquility is initially set to 4, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_SCRUB_TRANQUILITY: u32 = 4; @@ -161,6 +161,51 @@ impl Worker for RepairWorker { // and whose parameter (esp. speed) can be controlled at runtime. // ---- ---- ---- +mod v081 { + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, + } + + impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +} + +mod v082 { + use serde::{Deserialize, Serialize}; + + use super::v081; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) time_next_run_scrub: u64, + pub(crate) corruptions_detected: u64, + } + + impl garage_util::migrate::Migrate for ScrubWorkerPersisted { + type Previous = v081::ScrubWorkerPersisted; + const VERSION_MARKER: &'static [u8] = b"G082bswp"; + + fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted { + use crate::repair::randomize_next_scrub_run_time; + + ScrubWorkerPersisted { + tranquility: old.tranquility, + time_last_complete_scrub: old.time_last_complete_scrub, + time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), + corruptions_detected: old.corruptions_detected, + } + } + } +} + +pub use v082::*; + pub struct ScrubWorker { manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, @@ -168,17 +213,33 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - persister: Persister<ScrubWorkerPersisted>, - persisted: ScrubWorkerPersisted, + persister: PersisterShared<ScrubWorkerPersisted>, +} + +fn randomize_next_scrub_run_time(timestamp: u64) -> u64 { + // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to + // balance scrub load across different cluster nodes. + + let next_run_timestamp = timestamp + + SCRUB_INTERVAL + .saturating_add(Duration::from_secs( + rand::thread_rng().gen_range(0..3600 * 24 * 10), + )) + .as_millis() as u64; + + next_run_timestamp } -#[derive(Serialize, Deserialize)] -struct ScrubWorkerPersisted { - tranquility: u32, - time_last_complete_scrub: u64, - corruptions_detected: u64, +impl Default for ScrubWorkerPersisted { + fn default() -> Self { + ScrubWorkerPersisted { + time_last_complete_scrub: 0, + time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), + tranquility: INITIAL_SCRUB_TRANQUILITY, + corruptions_detected: 0, + } + } } -impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} enum ScrubWorkerState { Running(BlockStoreIterator), @@ -198,27 +259,20 @@ pub enum ScrubWorkerCommand { Pause(Duration), Resume, Cancel, - SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { - let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ScrubWorkerPersisted { - time_last_complete_scrub: 0, - tranquility: INITIAL_SCRUB_TRANQUILITY, - corruptions_detected: 0, - }, - }; + pub(crate) fn new( + manager: Arc<BlockManager>, + rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, + persister: PersisterShared<ScrubWorkerPersisted>, + ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, - persisted, } } @@ -227,6 +281,7 @@ impl ScrubWorker { ScrubWorkerCommand::Start => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Finished => { + info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); ScrubWorkerState::Running(iterator) } @@ -267,12 +322,6 @@ impl ScrubWorker { } } } - ScrubWorkerCommand::SetTranquility(t) => { - self.persisted.tranquility = t; - if let Err(e) = self.persister.save_async(&self.persisted).await { - error!("Could not save new tranquilitiy value: {}", e); - } - } } } } @@ -284,9 +333,19 @@ impl Worker for ScrubWorker { } fn status(&self) -> WorkerStatus { + let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) = + self.persister.get_with(|p| { + ( + p.corruptions_detected, + p.tranquility, + p.time_last_complete_scrub, + p.time_next_run_scrub, + ) + }); + let mut s = WorkerStatus { - persistent_errors: Some(self.persisted.corruptions_detected), - tranquility: Some(self.persisted.tranquility), + persistent_errors: Some(corruptions_detected), + tranquility: Some(tranquility), ..Default::default() }; match &self.work { @@ -298,10 +357,16 @@ impl Worker for ScrubWorker { s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; } ScrubWorkerState::Finished => { - s.freeform = vec![format!( - "Last scrub completed at {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) - )]; + s.freeform = vec![ + format!( + "Last scrub completed at {}", + msec_to_rfc3339(time_last_complete_scrub), + ), + format!( + "Next scrub scheduled for {}", + msec_to_rfc3339(time_next_run_scrub) + ), + ]; } } s @@ -321,20 +386,30 @@ impl Worker for ScrubWorker { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); - self.persisted.corruptions_detected += 1; - self.persister.save_async(&self.persisted).await?; + self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer - .tranquilize_worker(self.persisted.tranquility)) + .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persisted.time_last_complete_scrub = now_msec(); - self.persister.save_async(&self.persisted).await?; + let now = now_msec(); + let next_scrub_timestamp = randomize_next_scrub_run_time(now); + + self.persister.set_with(|p| { + p.time_last_complete_scrub = now; + p.time_next_run_scrub = next_scrub_timestamp; + })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); + + info!( + "Datastore scrub completed, next scrub scheduled for {}", + msec_to_rfc3339(next_scrub_timestamp) + ); + Ok(WorkerState::Idle) } } @@ -347,7 +422,7 @@ impl Worker for ScrubWorker { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, ), }; @@ -462,11 +537,11 @@ impl BlockStoreIterator { let ent_type = data_dir_ent.file_type().await?; let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { let path = data_dir_ent.path(); self.path.push(ReadingDir::Pending(path)); } else if name.len() == 64 { - if let Ok(h) = hex::decode(&name) { + if let Ok(h) = hex::decode(name) { let mut hash = [0u8; 32]; hash.copy_from_slice(&h); return Ok(Some(hash.into())); |