diff options
author | Alex <alex@adnab.me> | 2023-01-04 13:47:42 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-04 13:47:42 +0000 |
commit | 329c0e64f9044511f1a0d46b1b3ed99bdd890630 (patch) | |
tree | 222ec61f03c5abcca6678239111e94ad4dcfeb67 /src/block | |
parent | 4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff) | |
parent | 29dbcb82780dcdb6f2a01a9da5122e70abaf93bf (diff) | |
download | garage-329c0e64f9044511f1a0d46b1b3ed99bdd890630.tar.gz garage-329c0e64f9044511f1a0d46b1b3ed99bdd890630.zip |
Merge pull request 'Improve `garage worker set` and add `garage worker get`' (#464) from worker-get into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/464
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/manager.rs | 31 | ||||
-rw-r--r-- | src/block/repair.rs | 72 | ||||
-rw-r--r-- | src/block/resync.rs | 108 |
3 files changed, 125 insertions, 86 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 1b5a5df0..19841d64 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::{vars, BackgroundRunner}; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; +use garage_util::persister::PersisterShared; +use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; @@ -89,6 +91,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, + pub scrub_persister: PersisterShared<ScrubWorkerPersisted>, tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, } @@ -128,6 +131,8 @@ impl BlockManager { let metrics = BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); + let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); + let block_manager = Arc::new(Self { replication, data_dir, @@ -138,6 +143,7 @@ impl BlockManager { system, endpoint, metrics, + scrub_persister, tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); @@ -155,7 +161,28 @@ impl BlockManager { // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new( + self.clone(), + scrub_rx, + self.scrub_persister.clone(), + )); + } + + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + self.resync.register_bg_vars(vars); + + vars.register_rw( + &self.scrub_persister, + "scrub-tranquility", + |p| p.get_with(|x| x.tranquility), + |p, tranquility| p.set_with(|x| x.tranquility = tranquility), + ); + vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { + p.get_with(|x| x.corruptions_detected) + }); } /// Ask nodes that might have a (possibly compressed) block for it diff --git a/src/block/repair.rs b/src/block/repair.rs index a6ded65a..064cc005 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -13,7 +13,7 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -168,17 +168,25 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - persister: Persister<ScrubWorkerPersisted>, - persisted: ScrubWorkerPersisted, + persister: PersisterShared<ScrubWorkerPersisted>, } #[derive(Serialize, Deserialize)] -struct ScrubWorkerPersisted { - tranquility: u32, - time_last_complete_scrub: u64, - corruptions_detected: u64, +pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, } impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +impl Default for ScrubWorkerPersisted { + fn default() -> Self { + ScrubWorkerPersisted { + time_last_complete_scrub: 0, + tranquility: INITIAL_SCRUB_TRANQUILITY, + corruptions_detected: 0, + } + } +} enum ScrubWorkerState { Running(BlockStoreIterator), @@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand { Pause(Duration), Resume, Cancel, - SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { - let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ScrubWorkerPersisted { - time_last_complete_scrub: 0, - tranquility: INITIAL_SCRUB_TRANQUILITY, - corruptions_detected: 0, - }, - }; + pub(crate) fn new( + manager: Arc<BlockManager>, + rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, + persister: PersisterShared<ScrubWorkerPersisted>, + ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, - persisted, } } @@ -267,12 +268,6 @@ impl ScrubWorker { } } } - ScrubWorkerCommand::SetTranquility(t) => { - self.persisted.tranquility = t; - if let Err(e) = self.persister.save_async(&self.persisted).await { - error!("Could not save new tranquilitiy value: {}", e); - } - } } } } @@ -284,9 +279,18 @@ impl Worker for ScrubWorker { } fn status(&self) -> WorkerStatus { + let (corruptions_detected, tranquility, time_last_complete_scrub) = + self.persister.get_with(|p| { + ( + p.corruptions_detected, + p.tranquility, + p.time_last_complete_scrub, + ) + }); + let mut s = WorkerStatus { - persistent_errors: Some(self.persisted.corruptions_detected), - tranquility: Some(self.persisted.tranquility), + persistent_errors: Some(corruptions_detected), + tranquility: Some(tranquility), ..Default::default() }; match &self.work { @@ -300,7 +304,7 @@ impl Worker for ScrubWorker { ScrubWorkerState::Finished => { s.freeform = vec![format!( "Last scrub completed at {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) + msec_to_rfc3339(time_last_complete_scrub) )]; } } @@ -321,18 +325,17 @@ impl Worker for ScrubWorker { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); - self.persisted.corruptions_detected += 1; - self.persister.save_async(&self.persisted).await?; + self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer - .tranquilize_worker(self.persisted.tranquility)) + .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persisted.time_last_complete_scrub = now_msec(); - self.persister.save_async(&self.persisted).await?; + self.persister + .set_with(|p| p.time_last_complete_scrub = now_msec())?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); Ok(WorkerState::Idle) @@ -347,7 +350,8 @@ impl Worker for ScrubWorker { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_last_complete_scrub) + + SCRUB_INTERVAL.as_millis() as u64, ScrubWorkerCommand::Start, ), }; diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c7b3b0e..ea280ad4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; -use arc_swap::ArcSwap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -22,7 +21,7 @@ use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2; pub struct BlockResyncManager { pub(crate) queue: CountedTree, - pub(crate) notify: Notify, + pub(crate) notify: Arc<Notify>, pub(crate) errors: CountedTree, busy_set: BusySet, - persister: Persister<ResyncPersistedConfig>, - persisted: ArcSwap<ResyncPersistedConfig>, + persister: PersisterShared<ResyncPersistedConfig>, } #[derive(Serialize, Deserialize, Clone, Copy)] @@ -64,6 +62,14 @@ struct ResyncPersistedConfig { tranquility: u32, } impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} +impl Default for ResyncPersistedConfig { + fn default() -> Self { + ResyncPersistedConfig { + n_workers: 1, + tranquility: INITIAL_RESYNC_TRANQUILITY, + } + } +} enum ResyncIterResult { BusyDidSomething, @@ -91,22 +97,14 @@ impl BlockResyncManager { .expect("Unable to open block_local_resync_errors tree"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); - let persister = Persister::new(&system.metadata_dir, "resync_cfg"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ResyncPersistedConfig { - n_workers: 1, - tranquility: INITIAL_RESYNC_TRANQUILITY, - }, - }; + let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg"); Self { queue, - notify: Notify::new(), + notify: Arc::new(Notify::new()), errors, busy_set: Arc::new(Mutex::new(HashSet::new())), persister, - persisted: ArcSwap::new(Arc::new(persisted)), } } @@ -142,6 +140,38 @@ impl BlockResyncManager { ))) } + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-worker-count", + |p| p.get_with(|x| x.n_workers), + move |p, n_workers| { + if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { + return Err(Error::Message(format!( + "Invalid number of resync workers, must be between 1 and {}", + MAX_RESYNC_WORKERS + ))); + } + p.set_with(|x| x.n_workers = n_workers)?; + notify.notify_waiters(); + Ok(()) + }, + ); + + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-tranquility", + |p| p.get_with(|x| x.tranquility), + move |p, tranquility| { + p.set_with(|x| x.tranquility = tranquility)?; + notify.notify_waiters(); + Ok(()) + }, + ); + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -436,33 +466,6 @@ impl BlockResyncManager { Ok(()) } - - async fn update_persisted( - &self, - update: impl Fn(&mut ResyncPersistedConfig), - ) -> Result<(), Error> { - let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref(); - update(&mut cfg); - self.persister.save_async(&cfg).await?; - self.persisted.store(Arc::new(cfg)); - self.notify.notify_waiters(); - Ok(()) - } - - pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> { - if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { - return Err(Error::Message(format!( - "Invalid number of resync workers, must be between 1 and {}", - MAX_RESYNC_WORKERS - ))); - } - self.update_persisted(|cfg| cfg.n_workers = n_workers).await - } - - pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { - self.update_persisted(|cfg| cfg.tranquility = tranquility) - .await - } } impl Drop for BusyBlock { @@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker { manager: Arc<BlockManager>, tranquilizer: Tranquilizer, next_delay: Duration, + persister: PersisterShared<ResyncPersistedConfig>, } impl ResyncWorker { pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self { + let persister = manager.resync.persister.clone(); Self { index, manager, tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), + persister, } } } @@ -497,9 +503,9 @@ impl Worker for ResyncWorker { } fn status(&self) -> WorkerStatus { - let persisted = self.manager.resync.persisted.load(); + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); - if self.index >= persisted.n_workers { + if self.index >= n_workers { return WorkerStatus { freeform: vec!["This worker is currently disabled".into()], ..Default::default() @@ -508,22 +514,24 @@ impl Worker for ResyncWorker { WorkerStatus { queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), - tranquility: Some(persisted.tranquility), + tranquility: Some(tranquility), persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - if self.index >= self.manager.resync.persisted.load().n_workers { + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); + + if self.index >= n_workers { return Ok(WorkerState::Idle); } self.tranquilizer.reset(); match self.manager.resync.resync_iter(&self.manager).await { - Ok(ResyncIterResult::BusyDidSomething) => Ok(self - .tranquilizer - .tranquilize_worker(self.manager.resync.persisted.load().tranquility)), + Ok(ResyncIterResult::BusyDidSomething) => { + Ok(self.tranquilizer.tranquilize_worker(tranquility)) + } Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; @@ -542,7 +550,7 @@ impl Worker for ResyncWorker { } async fn wait_for_work(&mut self) -> WorkerState { - while self.index >= self.manager.resync.persisted.load().n_workers { + while self.index >= self.persister.get_with(|x| x.n_workers) { self.manager.resync.notify.notified().await } |