diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-28 16:59:19 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-28 16:59:19 +0200 |
commit | b053fc051842677dbabe1cab294af74ed26932a4 (patch) | |
tree | 68600a49ad5a10e21e21a0a2429aa0305c12f11c /src | |
parent | f1c972289d53e956a5837dcae2a53cc13f61ec7b (diff) | |
download | garage-b053fc051842677dbabe1cab294af74ed26932a4.tar.gz garage-b053fc051842677dbabe1cab294af74ed26932a4.zip |
Persist scrub worker thing in Persister
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 11 | ||||
-rw-r--r-- | src/block/repair.rs | 89 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 2 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 2 | ||||
-rw-r--r-- | src/rpc/system.rs | 6 | ||||
-rw-r--r-- | src/util/background/worker.rs | 13 |
6 files changed, 76 insertions, 47 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 015ac71b..36166ae3 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,8 +88,6 @@ pub struct BlockManager { pub replication: TableShardedReplication, /// Directory in which block are stored pub data_dir: PathBuf, - /// State store (only used by scrub worker to store time of last scrub) - pub(crate) state_variables_store: db::Tree, compression_level: Option<i32>, background_tranquility: u32, @@ -102,7 +100,7 @@ pub struct BlockManager { resync_notify: Notify, resync_errors: CountedTree, - system: Arc<System>, + pub(crate) system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, metrics: BlockManagerMetrics, @@ -147,10 +145,6 @@ impl BlockManager { let resync_errors = CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); - let state_variables_store = db - .open_tree("state_variables") - .expect("Unable to open state_variables tree"); - let endpoint = system .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); @@ -169,7 +163,6 @@ impl BlockManager { resync_queue, resync_notify: Notify::new(), resync_errors, - state_variables_store, system, endpoint, metrics, @@ -514,7 +507,7 @@ impl BlockManager { // Launch a background worker for data store scrubs let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4); + let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx); self.system.background.spawn_worker(scrub_worker); } diff --git a/src/block/repair.rs b/src/block/repair.rs index 8335de51..27ed05c2 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -1,10 +1,10 @@ use core::ops::Bound; -use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::select; use tokio::sync::mpsc; @@ -13,13 +13,13 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::persister::Persister; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::manager::*; const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days -const TIME_LAST_COMPLETE_SCRUB: &[u8] = b"time_last_complete_scrub"; pub struct RepairWorker { manager: Arc<BlockManager>, @@ -139,8 +139,14 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - tranquility: u32, + persister: Persister<ScrubWorkerPersisted>, + persisted: ScrubWorkerPersisted, +} + +#[derive(Serialize, Deserialize)] +struct ScrubWorkerPersisted { + tranquility: u32, time_last_complete_scrub: u64, } @@ -156,6 +162,7 @@ impl Default for ScrubWorkerState { } } +#[derive(Debug)] pub enum ScrubWorkerCommand { Start, Pause(Duration), @@ -165,30 +172,26 @@ pub enum ScrubWorkerCommand { } impl ScrubWorker { - pub fn new( - manager: Arc<BlockManager>, - rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, - tranquility: u32, - ) -> Self { - let time_last_complete_scrub = match manager - .state_variables_store - .get(TIME_LAST_COMPLETE_SCRUB) - .expect("DB error when initializing scrub worker") - { - Some(v) => u64::from_be_bytes(v.try_into().unwrap()), - None => 0, + pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { + let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); + let persisted = match persister.load() { + Ok(v) => v, + Err(_) => ScrubWorkerPersisted { + time_last_complete_scrub: 0, + tranquility: 4, + }, }; Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), - tranquility, - time_last_complete_scrub, + persister, + persisted, } } - fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) { + async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) { match cmd { ScrubWorkerCommand::Start => { self.work = match std::mem::take(&mut self.work) { @@ -234,7 +237,10 @@ impl ScrubWorker { } } ScrubWorkerCommand::SetTranquility(t) => { - self.tranquility = t; + self.persisted.tranquility = t; + if let Err(e) = self.persister.save_async(&self.persisted).await { + error!("Could not save new tranquilitiy value: {}", e); + } } } } @@ -248,13 +254,17 @@ impl Worker for ScrubWorker { fn info(&self) -> Option<String> { match &self.work { - ScrubWorkerState::Running(bsi) => Some(format!("{:.2}% done", bsi.progress() * 100.)), + ScrubWorkerState::Running(bsi) => Some(format!( + "{:.2}% done (tranquility = {})", + bsi.progress() * 100., + self.persisted.tranquility + )), ScrubWorkerState::Paused(_bsi, rt) => { Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt))) } ScrubWorkerState::Finished => Some(format!( "Last completed scrub: {}", - msec_to_rfc3339(self.time_last_complete_scrub) + msec_to_rfc3339(self.persisted.time_last_complete_scrub) )), } } @@ -264,7 +274,7 @@ impl Worker for ScrubWorker { _must_exit: &mut watch::Receiver<bool>, ) -> Result<WorkerStatus, Error> { match self.rx_cmd.try_recv() { - Ok(cmd) => self.handle_cmd(cmd), + Ok(cmd) => self.handle_cmd(cmd).await, Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done), Err(mpsc::error::TryRecvError::Empty) => (), }; @@ -274,13 +284,12 @@ impl Worker for ScrubWorker { self.tranquilizer.reset(); if let Some(hash) = bsi.next().await? { let _ = self.manager.read_block(&hash).await; - Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) + Ok(self + .tranquilizer + .tranquilize_worker(self.persisted.tranquility)) } else { - self.time_last_complete_scrub = now_msec(); // TODO save to file - self.manager.state_variables_store.insert( - TIME_LAST_COMPLETE_SCRUB, - u64::to_be_bytes(self.time_last_complete_scrub), - )?; + self.persisted.time_last_complete_scrub = now_msec(); + self.persister.save_async(&self.persisted).await?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); Ok(WorkerStatus::Idle) @@ -294,23 +303,35 @@ impl Worker for ScrubWorker { match &self.work { ScrubWorkerState::Running(_) => return WorkerStatus::Busy, ScrubWorkerState::Paused(_, resume_time) => { - let delay = Duration::from_millis(resume_time - now_msec()); + let now = now_msec(); + if now >= *resume_time { + self.handle_cmd(ScrubWorkerCommand::Resume).await; + return WorkerStatus::Busy; + } + let delay = Duration::from_millis(*resume_time - now); select! { - _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume), + _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume).await, cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { - self.handle_cmd(cmd); + self.handle_cmd(cmd).await; } else { return WorkerStatus::Done; } } } ScrubWorkerState::Finished => { + let now = now_msec(); + if now - self.persisted.time_last_complete_scrub + >= SCRUB_INTERVAL.as_millis() as u64 + { + self.handle_cmd(ScrubWorkerCommand::Start).await; + return WorkerStatus::Busy; + } let delay = SCRUB_INTERVAL - - Duration::from_secs(now_msec() - self.time_last_complete_scrub); + - Duration::from_millis(now - self.persisted.time_last_complete_scrub); select! { - _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start), + _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start).await, cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { - self.handle_cmd(cmd); + self.handle_cmd(cmd).await; } else { return WorkerStatus::Done; } diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index fc5a9932..8be56138 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -263,7 +263,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { continue; } - table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name)); + table.push(format!("{}\t{}\t{}", tid, info.status, info.name)); if let Some(i) = &info.info { table.push(format!("\t\t {}", i)); } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 8207a8b4..d4366486 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -46,7 +46,6 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { )); } RepairWhat::Scrub { cmd } => { - info!("Verifying integrity of stored blocks"); let cmd = match cmd { ScrubCmd::Start => ScrubWorkerCommand::Start, ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), @@ -56,6 +55,7 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { ScrubWorkerCommand::SetTranquility(tranquility) } }; + info!("Sending command to scrub worker: {:?}", cmd); garage.block_manager.send_scrub_command(cmd).await; } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1d7c3ea4..77b79864 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -104,6 +104,9 @@ pub struct System { /// The job runner of this node pub background: Arc<BackgroundRunner>, + + /// Path to metadata directory (usefull) + pub metadata_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -295,6 +298,7 @@ impl System { ring, update_ring: Mutex::new(update_ring), background, + metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); sys diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index c08a0aaa..7fd63c2b 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -15,7 +15,7 @@ use crate::background::WorkerInfo; use crate::error::Error; use crate::time::now_msec; -#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] pub enum WorkerStatus { Busy, Throttled(f32), @@ -23,6 +23,17 @@ pub enum WorkerStatus { Done, } +impl std::fmt::Display for WorkerStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WorkerStatus::Busy => write!(f, "Busy"), + WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerStatus::Idle => write!(f, "Idle"), + WorkerStatus::Done => write!(f, "Done"), + } + } +} + #[async_trait] pub trait Worker: Send { fn name(&self) -> String; |