aboutsummaryrefslogtreecommitdiff
path: root/src/block/resync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-04 13:07:13 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-04 13:07:13 +0100
commitf3f27293df83986ba29fb03f8af26a2177518e20 (patch)
treeaebfa6b6936a974d40846a4c56b17aefac4f2526 /src/block/resync.rs
parent13c554988623663a9416439baf4f85f6fa91e502 (diff)
downloadgarage-f3f27293df83986ba29fb03f8af26a2177518e20.tar.gz
garage-f3f27293df83986ba29fb03f8af26a2177518e20.zip
Uniform framework for bg variable management
Diffstat (limited to 'src/block/resync.rs')
-rw-r--r--src/block/resync.rs108
1 files changed, 58 insertions, 50 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c7b3b0e..ea280ad4 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
pub(crate) queue: CountedTree,
- pub(crate) notify: Notify,
+ pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree,
busy_set: BusySet,
- persister: Persister<ResyncPersistedConfig>,
- persisted: ArcSwap<ResyncPersistedConfig>,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
#[derive(Serialize, Deserialize, Clone, Copy)]
@@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32,
}
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
+impl Default for ResyncPersistedConfig {
+ fn default() -> Self {
+ ResyncPersistedConfig {
+ n_workers: 1,
+ tranquility: INITIAL_RESYNC_TRANQUILITY,
+ }
+ }
+}
enum ResyncIterResult {
BusyDidSomething,
@@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
- let persister = Persister::new(&system.metadata_dir, "resync_cfg");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ResyncPersistedConfig {
- n_workers: 1,
- tranquility: INITIAL_RESYNC_TRANQUILITY,
- },
- };
+ let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
Self {
queue,
- notify: Notify::new(),
+ notify: Arc::new(Notify::new()),
errors,
busy_set: Arc::new(Mutex::new(HashSet::new())),
persister,
- persisted: ArcSwap::new(Arc::new(persisted)),
}
}
@@ -142,6 +140,38 @@ impl BlockResyncManager {
)))
}
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-worker-count",
+ |p| p.get_with(|x| x.n_workers),
+ move |p, n_workers| {
+ if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
+ return Err(Error::Message(format!(
+ "Invalid number of resync workers, must be between 1 and {}",
+ MAX_RESYNC_WORKERS
+ )));
+ }
+ p.set_with(|x| x.n_workers = n_workers)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ move |p, tranquility| {
+ p.set_with(|x| x.tranquility = tranquility)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+ }
+
// ---- Resync loop ----
// This part manages a queue of blocks that need to be
@@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(())
}
-
- async fn update_persisted(
- &self,
- update: impl Fn(&mut ResyncPersistedConfig),
- ) -> Result<(), Error> {
- let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
- update(&mut cfg);
- self.persister.save_async(&cfg).await?;
- self.persisted.store(Arc::new(cfg));
- self.notify.notify_waiters();
- Ok(())
- }
-
- pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
- if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
- return Err(Error::Message(format!(
- "Invalid number of resync workers, must be between 1 and {}",
- MAX_RESYNC_WORKERS
- )));
- }
- self.update_persisted(|cfg| cfg.n_workers = n_workers).await
- }
-
- pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
- self.update_persisted(|cfg| cfg.tranquility = tranquility)
- .await
- }
}
impl Drop for BusyBlock {
@@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
next_delay: Duration,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
+ let persister = manager.resync.persister.clone();
Self {
index,
manager,
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
+ persister,
}
}
}
@@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
}
fn status(&self) -> WorkerStatus {
- let persisted = self.manager.resync.persisted.load();
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
- if self.index >= persisted.n_workers {
+ if self.index >= n_workers {
return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()],
..Default::default()
@@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
- tranquility: Some(persisted.tranquility),
+ tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if self.index >= self.manager.resync.persisted.load().n_workers {
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
+
+ if self.index >= n_workers {
return Ok(WorkerState::Idle);
}
self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await {
- Ok(ResyncIterResult::BusyDidSomething) => Ok(self
- .tranquilizer
- .tranquilize_worker(self.manager.resync.persisted.load().tranquility)),
+ Ok(ResyncIterResult::BusyDidSomething) => {
+ Ok(self.tranquilizer.tranquilize_worker(tranquility))
+ }
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
@@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
}
async fn wait_for_work(&mut self) -> WorkerState {
- while self.index >= self.manager.resync.persisted.load().n_workers {
+ while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await
}