aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-04 13:07:13 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-04 13:07:13 +0100
commitf3f27293df83986ba29fb03f8af26a2177518e20 (patch)
treeaebfa6b6936a974d40846a4c56b17aefac4f2526 /src/block
parent13c554988623663a9416439baf4f85f6fa91e502 (diff)
downloadgarage-f3f27293df83986ba29fb03f8af26a2177518e20.tar.gz
garage-f3f27293df83986ba29fb03f8af26a2177518e20.zip
Uniform framework for bg variable management
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs31
-rw-r--r--src/block/repair.rs72
-rw-r--r--src/block/resync.rs108
3 files changed, 125 insertions, 86 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 1b5a5df0..19841d64 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::persister::PersisterShared;
+use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
@@ -89,6 +91,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
+ pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
@@ -128,6 +131,8 @@ impl BlockManager {
let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
+ let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
+
let block_manager = Arc::new(Self {
replication,
data_dir,
@@ -138,6 +143,7 @@ impl BlockManager {
system,
endpoint,
metrics,
+ scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
@@ -155,7 +161,28 @@ impl BlockManager {
// Spawn scrub worker
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
- bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
+ bg.spawn_worker(ScrubWorker::new(
+ self.clone(),
+ scrub_rx,
+ self.scrub_persister.clone(),
+ ));
+ }
+
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ self.resync.register_bg_vars(vars);
+
+ vars.register_rw(
+ &self.scrub_persister,
+ "scrub-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
+ );
+ vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
+ p.get_with(|x| x.corruptions_detected)
+ });
}
/// Ask nodes that might have a (possibly compressed) block for it
diff --git a/src/block/repair.rs b/src/block/repair.rs
index a6ded65a..064cc005 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -13,7 +13,7 @@ use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -168,17 +168,25 @@ pub struct ScrubWorker {
work: ScrubWorkerState,
tranquilizer: Tranquilizer,
- persister: Persister<ScrubWorkerPersisted>,
- persisted: ScrubWorkerPersisted,
+ persister: PersisterShared<ScrubWorkerPersisted>,
}
#[derive(Serialize, Deserialize)]
-struct ScrubWorkerPersisted {
- tranquility: u32,
- time_last_complete_scrub: u64,
- corruptions_detected: u64,
+pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) corruptions_detected: u64,
}
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
+impl Default for ScrubWorkerPersisted {
+ fn default() -> Self {
+ ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ tranquility: INITIAL_SCRUB_TRANQUILITY,
+ corruptions_detected: 0,
+ }
+ }
+}
enum ScrubWorkerState {
Running(BlockStoreIterator),
@@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration),
Resume,
Cancel,
- SetTranquility(u32),
}
impl ScrubWorker {
- pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
- let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ScrubWorkerPersisted {
- time_last_complete_scrub: 0,
- tranquility: INITIAL_SCRUB_TRANQUILITY,
- corruptions_detected: 0,
- },
- };
+ pub(crate) fn new(
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+ persister: PersisterShared<ScrubWorkerPersisted>,
+ ) -> Self {
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
persister,
- persisted,
}
}
@@ -267,12 +268,6 @@ impl ScrubWorker {
}
}
}
- ScrubWorkerCommand::SetTranquility(t) => {
- self.persisted.tranquility = t;
- if let Err(e) = self.persister.save_async(&self.persisted).await {
- error!("Could not save new tranquilitiy value: {}", e);
- }
- }
}
}
}
@@ -284,9 +279,18 @@ impl Worker for ScrubWorker {
}
fn status(&self) -> WorkerStatus {
+ let (corruptions_detected, tranquility, time_last_complete_scrub) =
+ self.persister.get_with(|p| {
+ (
+ p.corruptions_detected,
+ p.tranquility,
+ p.time_last_complete_scrub,
+ )
+ });
+
let mut s = WorkerStatus {
- persistent_errors: Some(self.persisted.corruptions_detected),
- tranquility: Some(self.persisted.tranquility),
+ persistent_errors: Some(corruptions_detected),
+ tranquility: Some(tranquility),
..Default::default()
};
match &self.work {
@@ -300,7 +304,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Finished => {
s.freeform = vec![format!(
"Last scrub completed at {}",
- msec_to_rfc3339(self.persisted.time_last_complete_scrub)
+ msec_to_rfc3339(time_last_complete_scrub)
)];
}
}
@@ -321,18 +325,17 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
- self.persisted.corruptions_detected += 1;
- self.persister.save_async(&self.persisted).await?;
+ self.persister.set_with(|p| p.corruptions_detected += 1)?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
- .tranquilize_worker(self.persisted.tranquility))
+ .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- self.persisted.time_last_complete_scrub = now_msec();
- self.persister.save_async(&self.persisted).await?;
+ self.persister
+ .set_with(|p| p.time_last_complete_scrub = now_msec())?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
Ok(WorkerState::Idle)
@@ -347,7 +350,8 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
- self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
+ self.persister.get_with(|p| p.time_last_complete_scrub)
+ + SCRUB_INTERVAL.as_millis() as u64,
ScrubWorkerCommand::Start,
),
};
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c7b3b0e..ea280ad4 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
pub(crate) queue: CountedTree,
- pub(crate) notify: Notify,
+ pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree,
busy_set: BusySet,
- persister: Persister<ResyncPersistedConfig>,
- persisted: ArcSwap<ResyncPersistedConfig>,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
#[derive(Serialize, Deserialize, Clone, Copy)]
@@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32,
}
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
+impl Default for ResyncPersistedConfig {
+ fn default() -> Self {
+ ResyncPersistedConfig {
+ n_workers: 1,
+ tranquility: INITIAL_RESYNC_TRANQUILITY,
+ }
+ }
+}
enum ResyncIterResult {
BusyDidSomething,
@@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
- let persister = Persister::new(&system.metadata_dir, "resync_cfg");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ResyncPersistedConfig {
- n_workers: 1,
- tranquility: INITIAL_RESYNC_TRANQUILITY,
- },
- };
+ let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
Self {
queue,
- notify: Notify::new(),
+ notify: Arc::new(Notify::new()),
errors,
busy_set: Arc::new(Mutex::new(HashSet::new())),
persister,
- persisted: ArcSwap::new(Arc::new(persisted)),
}
}
@@ -142,6 +140,38 @@ impl BlockResyncManager {
)))
}
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-worker-count",
+ |p| p.get_with(|x| x.n_workers),
+ move |p, n_workers| {
+ if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
+ return Err(Error::Message(format!(
+ "Invalid number of resync workers, must be between 1 and {}",
+ MAX_RESYNC_WORKERS
+ )));
+ }
+ p.set_with(|x| x.n_workers = n_workers)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ move |p, tranquility| {
+ p.set_with(|x| x.tranquility = tranquility)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+ }
+
// ---- Resync loop ----
// This part manages a queue of blocks that need to be
@@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(())
}
-
- async fn update_persisted(
- &self,
- update: impl Fn(&mut ResyncPersistedConfig),
- ) -> Result<(), Error> {
- let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
- update(&mut cfg);
- self.persister.save_async(&cfg).await?;
- self.persisted.store(Arc::new(cfg));
- self.notify.notify_waiters();
- Ok(())
- }
-
- pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
- if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
- return Err(Error::Message(format!(
- "Invalid number of resync workers, must be between 1 and {}",
- MAX_RESYNC_WORKERS
- )));
- }
- self.update_persisted(|cfg| cfg.n_workers = n_workers).await
- }
-
- pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
- self.update_persisted(|cfg| cfg.tranquility = tranquility)
- .await
- }
}
impl Drop for BusyBlock {
@@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
next_delay: Duration,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
+ let persister = manager.resync.persister.clone();
Self {
index,
manager,
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
+ persister,
}
}
}
@@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
}
fn status(&self) -> WorkerStatus {
- let persisted = self.manager.resync.persisted.load();
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
- if self.index >= persisted.n_workers {
+ if self.index >= n_workers {
return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()],
..Default::default()
@@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
- tranquility: Some(persisted.tranquility),
+ tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if self.index >= self.manager.resync.persisted.load().n_workers {
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
+
+ if self.index >= n_workers {
return Ok(WorkerState::Idle);
}
self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await {
- Ok(ResyncIterResult::BusyDidSomething) => Ok(self
- .tranquilizer
- .tranquilize_worker(self.manager.resync.persisted.load().tranquility)),
+ Ok(ResyncIterResult::BusyDidSomething) => {
+ Ok(self.tranquilizer.tranquilize_worker(tranquility))
+ }
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
@@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
}
async fn wait_for_work(&mut self) -> WorkerState {
- while self.index >= self.manager.resync.persisted.load().n_workers {
+ while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await
}