diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 176 |
1 files changed, 163 insertions, 13 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index a2a8443e..8335de51 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -1,20 +1,26 @@ use core::ops::Bound; +use std::convert::TryInto; use std::path::PathBuf; - use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use tokio::fs; +use tokio::select; +use tokio::sync::mpsc; use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::manager::*; +const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days +const TIME_LAST_COMPLETE_SCRUB: &[u8] = b"time_last_complete_scrub"; + pub struct RepairWorker { manager: Arc<BlockManager>, next_start: Option<Hash>, @@ -129,19 +135,107 @@ impl Worker for RepairWorker { pub struct ScrubWorker { manager: Arc<BlockManager>, - iterator: BlockStoreIterator, + rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, + + work: ScrubWorkerState, tranquilizer: Tranquilizer, tranquility: u32, + + time_last_complete_scrub: u64, +} + +enum ScrubWorkerState { + Running(BlockStoreIterator), + Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + Finished, +} + +impl Default for ScrubWorkerState { + fn default() -> Self { + ScrubWorkerState::Finished + } +} + +pub enum ScrubWorkerCommand { + Start, + Pause(Duration), + Resume, + Cancel, + SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc<BlockManager>, tranquility: u32) -> Self { - let iterator = BlockStoreIterator::new(&manager); + pub fn new( + manager: Arc<BlockManager>, + rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, + tranquility: u32, + ) -> Self { + let time_last_complete_scrub = match manager + .state_variables_store + .get(TIME_LAST_COMPLETE_SCRUB) + .expect("DB error when initializing scrub worker") + { + Some(v) => u64::from_be_bytes(v.try_into().unwrap()), + None => 0, + }; Self { manager, - iterator, + rx_cmd, + work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), tranquility, + time_last_complete_scrub, + } + } + + fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) { + match cmd { + ScrubWorkerCommand::Start => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Finished => { + let iterator = BlockStoreIterator::new(&self.manager); + ScrubWorkerState::Running(iterator) + } + work => { + error!("Cannot start scrub worker: already running!"); + work + } + }; + } + ScrubWorkerCommand::Pause(dur) => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { + ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) + } + work => { + error!("Cannot pause scrub worker: not running!"); + work + } + }; + } + ScrubWorkerCommand::Resume => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), + work => { + error!("Cannot resume scrub worker: not paused!"); + work + } + }; + } + ScrubWorkerCommand::Cancel => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { + ScrubWorkerState::Finished + } + work => { + error!("Cannot cancel scrub worker: not running!"); + work + } + } + } + ScrubWorkerCommand::SetTranquility(t) => { + self.tranquility = t; + } } } } @@ -153,24 +247,80 @@ impl Worker for ScrubWorker { } fn info(&self) -> Option<String> { - Some(format!("{:.2}% done", self.iterator.progress() * 100.)) + match &self.work { + ScrubWorkerState::Running(bsi) => Some(format!("{:.2}% done", bsi.progress() * 100.)), + ScrubWorkerState::Paused(_bsi, rt) => { + Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt))) + } + ScrubWorkerState::Finished => Some(format!( + "Last completed scrub: {}", + msec_to_rfc3339(self.time_last_complete_scrub) + )), + } } async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, ) -> Result<WorkerStatus, Error> { - self.tranquilizer.reset(); - if let Some(hash) = self.iterator.next().await? { - let _ = self.manager.read_block(&hash).await; - Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) - } else { - Ok(WorkerStatus::Done) + match self.rx_cmd.try_recv() { + Ok(cmd) => self.handle_cmd(cmd), + Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done), + Err(mpsc::error::TryRecvError::Empty) => (), + }; + + match &mut self.work { + ScrubWorkerState::Running(bsi) => { + self.tranquilizer.reset(); + if let Some(hash) = bsi.next().await? { + let _ = self.manager.read_block(&hash).await; + Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) + } else { + self.time_last_complete_scrub = now_msec(); // TODO save to file + self.manager.state_variables_store.insert( + TIME_LAST_COMPLETE_SCRUB, + u64::to_be_bytes(self.time_last_complete_scrub), + )?; + self.work = ScrubWorkerState::Finished; + self.tranquilizer.clear(); + Ok(WorkerStatus::Idle) + } + } + _ => Ok(WorkerStatus::Idle), } } async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { - unreachable!() + match &self.work { + ScrubWorkerState::Running(_) => return WorkerStatus::Busy, + ScrubWorkerState::Paused(_, resume_time) => { + let delay = Duration::from_millis(resume_time - now_msec()); + select! { + _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume), + cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { + self.handle_cmd(cmd); + } else { + return WorkerStatus::Done; + } + } + } + ScrubWorkerState::Finished => { + let delay = SCRUB_INTERVAL + - Duration::from_secs(now_msec() - self.time_last_complete_scrub); + select! { + _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start), + cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { + self.handle_cmd(cmd); + } else { + return WorkerStatus::Done; + } + } + } + } + match &self.work { + ScrubWorkerState::Running(_) => WorkerStatus::Busy, + _ => WorkerStatus::Idle, + } } } |