aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml14
-rw-r--r--src/block/manager.rs79
-rw-r--r--src/block/metrics.rs17
-rw-r--r--src/block/rc.rs14
-rw-r--r--src/block/repair.rs163
-rw-r--r--src/block/resync.rs108
6 files changed, 278 insertions, 117 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 1e4eb64e..c6985754 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_block"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
-garage_util = { version = "0.8.1", path = "../util" }
-garage_table = { version = "0.8.1", path = "../table" }
+garage_db = { version = "0.8.2", path = "../db" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_util = { version = "0.8.2", path = "../util" }
+garage_table = { version = "0.8.2", path = "../table" }
opentelemetry = "0.17"
@@ -25,11 +25,11 @@ arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
-zstd = { version = "0.9", default-features = false }
+zstd = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 1b5a5df0..26278974 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -6,6 +6,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use futures::Stream;
@@ -23,10 +24,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::persister::PersisterShared;
+use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
@@ -89,6 +92,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
+ pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
@@ -125,8 +129,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
- let metrics =
- BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
+ let metrics = BlockManagerMetrics::new(
+ compression_level,
+ rc.rc.clone(),
+ resync.queue.clone(),
+ resync.errors.clone(),
+ );
+
+ let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self {
replication,
@@ -138,9 +148,11 @@ impl BlockManager {
system,
endpoint,
metrics,
+ scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager.scrub_persister.set_with(|_| ()).unwrap();
block_manager
}
@@ -155,7 +167,31 @@ impl BlockManager {
// Spawn scrub worker
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
- bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
+ bg.spawn_worker(ScrubWorker::new(
+ self.clone(),
+ scrub_rx,
+ self.scrub_persister.clone(),
+ ));
+ }
+
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ self.resync.register_bg_vars(vars);
+
+ vars.register_rw(
+ &self.scrub_persister,
+ "scrub-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
+ );
+ vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
+ p.get_with(|x| x.corruptions_detected)
+ });
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -649,14 +685,21 @@ impl BlockManagerLocked {
}
};
- let mut path2 = path.clone();
- path2.set_extension("tmp");
- let mut f = fs::File::create(&path2).await?;
+ let mut path_tmp = path.clone();
+ let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
+ path_tmp.set_extension(tmp_extension);
+
+ let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
+
+ let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
f.sync_all().await?;
drop(f);
- fs::rename(path2, path).await?;
+ fs::rename(path_tmp, path).await?;
+
+ delete_on_drop.cancel();
+
if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?;
}
@@ -722,3 +765,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
.concat()
.into())
}
+
+struct DeleteOnDrop(Option<PathBuf>);
+
+impl DeleteOnDrop {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+
+impl Drop for DeleteOnDrop {
+ fn drop(&mut self) {
+ if let Some(path) = self.0.take() {
+ tokio::spawn(async move {
+ if let Err(e) = fs::remove_file(&path).await {
+ debug!("DeleteOnDrop failed for {}: {}", path.display(), e);
+ }
+ });
+ }
+ }
+}
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index fbef95af..6659df32 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -5,6 +5,7 @@ use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
+ pub(crate) _compression_level: ValueObserver<u64>,
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
@@ -25,9 +26,23 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
+ pub fn new(
+ compression_level: Option<i32>,
+ rc_tree: db::Tree,
+ resync_queue: CountedTree,
+ resync_errors: CountedTree,
+ ) -> Self {
let meter = global::meter("garage_model/block");
Self {
+ _compression_level: meter
+ .u64_value_observer("block.compression_level", move |observer| {
+ match compression_level {
+ Some(v) => observer.observe(v as u64, &[]),
+ None => observer.observe(0_u64, &[]),
+ }
+ })
+ .with_description("Garage compression level for node")
+ .init(),
_rc_size: meter
.u64_value_observer("block.rc_size", move |observer| {
if let Ok(Some(v)) = rc_tree.fast_len() {
diff --git a/src/block/rc.rs b/src/block/rc.rs
index 8dae3960..94cb5eea 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -24,9 +24,9 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match old_rc.increment().serialize() {
- Some(x) => tx.insert(&self.rc, &hash, x)?,
+ Some(x) => tx.insert(&self.rc, hash, x)?,
None => unreachable!(),
};
Ok(old_rc.is_zero())
@@ -39,10 +39,10 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
match new_rc.serialize() {
- Some(x) => tx.insert(&self.rc, &hash, x)?,
- None => tx.remove(&self.rc, &hash)?,
+ Some(x) => tx.insert(&self.rc, hash, x)?,
+ None => tx.remove(&self.rc, hash)?,
};
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
@@ -57,10 +57,10 @@ impl BlockRc {
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
self.rc.db().transaction(|mut tx| {
- let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
- tx.remove(&self.rc, &hash)?;
+ tx.remove(&self.rc, hash)?;
}
_ => (),
};
diff --git a/src/block/repair.rs b/src/block/repair.rs
index a6ded65a..c89484d9 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
+use rand::Rng;
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
@@ -13,14 +13,14 @@ use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
-// Full scrub every 30 days
-const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30);
+// Full scrub every 25 days with a random element of 10 days mixed in below
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25);
// Scrub tranquility is initially set to 4, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_SCRUB_TRANQUILITY: u32 = 4;
@@ -161,6 +161,51 @@ impl Worker for RepairWorker {
// and whose parameter (esp. speed) can be controlled at runtime.
// ---- ---- ----
+mod v081 {
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize)]
+ pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
+}
+
+mod v082 {
+ use serde::{Deserialize, Serialize};
+
+ use super::v081;
+
+ #[derive(Serialize, Deserialize)]
+ pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) time_next_run_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+ }
+
+ impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
+ type Previous = v081::ScrubWorkerPersisted;
+ const VERSION_MARKER: &'static [u8] = b"G082bswp";
+
+ fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted {
+ use crate::repair::randomize_next_scrub_run_time;
+
+ ScrubWorkerPersisted {
+ tranquility: old.tranquility,
+ time_last_complete_scrub: old.time_last_complete_scrub,
+ time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
+ corruptions_detected: old.corruptions_detected,
+ }
+ }
+ }
+}
+
+pub use v082::*;
+
pub struct ScrubWorker {
manager: Arc<BlockManager>,
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
@@ -168,17 +213,33 @@ pub struct ScrubWorker {
work: ScrubWorkerState,
tranquilizer: Tranquilizer,
- persister: Persister<ScrubWorkerPersisted>,
- persisted: ScrubWorkerPersisted,
+ persister: PersisterShared<ScrubWorkerPersisted>,
+}
+
+fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
+ // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
+ // balance scrub load across different cluster nodes.
+
+ let next_run_timestamp = timestamp
+ + SCRUB_INTERVAL
+ .saturating_add(Duration::from_secs(
+ rand::thread_rng().gen_range(0..3600 * 24 * 10),
+ ))
+ .as_millis() as u64;
+
+ next_run_timestamp
}
-#[derive(Serialize, Deserialize)]
-struct ScrubWorkerPersisted {
- tranquility: u32,
- time_last_complete_scrub: u64,
- corruptions_detected: u64,
+impl Default for ScrubWorkerPersisted {
+ fn default() -> Self {
+ ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
+ tranquility: INITIAL_SCRUB_TRANQUILITY,
+ corruptions_detected: 0,
+ }
+ }
}
-impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
enum ScrubWorkerState {
Running(BlockStoreIterator),
@@ -198,27 +259,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration),
Resume,
Cancel,
- SetTranquility(u32),
}
impl ScrubWorker {
- pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
- let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ScrubWorkerPersisted {
- time_last_complete_scrub: 0,
- tranquility: INITIAL_SCRUB_TRANQUILITY,
- corruptions_detected: 0,
- },
- };
+ pub(crate) fn new(
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+ persister: PersisterShared<ScrubWorkerPersisted>,
+ ) -> Self {
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
persister,
- persisted,
}
}
@@ -227,6 +281,7 @@ impl ScrubWorker {
ScrubWorkerCommand::Start => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Finished => {
+ info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
ScrubWorkerState::Running(iterator)
}
@@ -267,12 +322,6 @@ impl ScrubWorker {
}
}
}
- ScrubWorkerCommand::SetTranquility(t) => {
- self.persisted.tranquility = t;
- if let Err(e) = self.persister.save_async(&self.persisted).await {
- error!("Could not save new tranquilitiy value: {}", e);
- }
- }
}
}
}
@@ -284,9 +333,19 @@ impl Worker for ScrubWorker {
}
fn status(&self) -> WorkerStatus {
+ let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) =
+ self.persister.get_with(|p| {
+ (
+ p.corruptions_detected,
+ p.tranquility,
+ p.time_last_complete_scrub,
+ p.time_next_run_scrub,
+ )
+ });
+
let mut s = WorkerStatus {
- persistent_errors: Some(self.persisted.corruptions_detected),
- tranquility: Some(self.persisted.tranquility),
+ persistent_errors: Some(corruptions_detected),
+ tranquility: Some(tranquility),
..Default::default()
};
match &self.work {
@@ -298,10 +357,16 @@ impl Worker for ScrubWorker {
s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
}
ScrubWorkerState::Finished => {
- s.freeform = vec![format!(
- "Last scrub completed at {}",
- msec_to_rfc3339(self.persisted.time_last_complete_scrub)
- )];
+ s.freeform = vec![
+ format!(
+ "Last scrub completed at {}",
+ msec_to_rfc3339(time_last_complete_scrub),
+ ),
+ format!(
+ "Next scrub scheduled for {}",
+ msec_to_rfc3339(time_next_run_scrub)
+ ),
+ ];
}
}
s
@@ -321,20 +386,30 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
- self.persisted.corruptions_detected += 1;
- self.persister.save_async(&self.persisted).await?;
+ self.persister.set_with(|p| p.corruptions_detected += 1)?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
- .tranquilize_worker(self.persisted.tranquility))
+ .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- self.persisted.time_last_complete_scrub = now_msec();
- self.persister.save_async(&self.persisted).await?;
+ let now = now_msec();
+ let next_scrub_timestamp = randomize_next_scrub_run_time(now);
+
+ self.persister.set_with(|p| {
+ p.time_last_complete_scrub = now;
+ p.time_next_run_scrub = next_scrub_timestamp;
+ })?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
+
+ info!(
+ "Datastore scrub completed, next scrub scheduled for {}",
+ msec_to_rfc3339(next_scrub_timestamp)
+ );
+
Ok(WorkerState::Idle)
}
}
@@ -347,7 +422,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
- self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
+ self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
),
};
@@ -462,11 +537,11 @@ impl BlockStoreIterator {
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
let path = data_dir_ent.path();
self.path.push(ReadingDir::Pending(path));
} else if name.len() == 64 {
- if let Ok(h) = hex::decode(&name) {
+ if let Ok(h) = hex::decode(name) {
let mut hash = [0u8; 32];
hash.copy_from_slice(&h);
return Ok(Some(hash.into()));
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c7b3b0e..ea280ad4 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
pub(crate) queue: CountedTree,
- pub(crate) notify: Notify,
+ pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree,
busy_set: BusySet,
- persister: Persister<ResyncPersistedConfig>,
- persisted: ArcSwap<ResyncPersistedConfig>,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
#[derive(Serialize, Deserialize, Clone, Copy)]
@@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32,
}
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
+impl Default for ResyncPersistedConfig {
+ fn default() -> Self {
+ ResyncPersistedConfig {
+ n_workers: 1,
+ tranquility: INITIAL_RESYNC_TRANQUILITY,
+ }
+ }
+}
enum ResyncIterResult {
BusyDidSomething,
@@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
- let persister = Persister::new(&system.metadata_dir, "resync_cfg");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ResyncPersistedConfig {
- n_workers: 1,
- tranquility: INITIAL_RESYNC_TRANQUILITY,
- },
- };
+ let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
Self {
queue,
- notify: Notify::new(),
+ notify: Arc::new(Notify::new()),
errors,
busy_set: Arc::new(Mutex::new(HashSet::new())),
persister,
- persisted: ArcSwap::new(Arc::new(persisted)),
}
}
@@ -142,6 +140,38 @@ impl BlockResyncManager {
)))
}
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-worker-count",
+ |p| p.get_with(|x| x.n_workers),
+ move |p, n_workers| {
+ if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
+ return Err(Error::Message(format!(
+ "Invalid number of resync workers, must be between 1 and {}",
+ MAX_RESYNC_WORKERS
+ )));
+ }
+ p.set_with(|x| x.n_workers = n_workers)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ move |p, tranquility| {
+ p.set_with(|x| x.tranquility = tranquility)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+ }
+
// ---- Resync loop ----
// This part manages a queue of blocks that need to be
@@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(())
}
-
- async fn update_persisted(
- &self,
- update: impl Fn(&mut ResyncPersistedConfig),
- ) -> Result<(), Error> {
- let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
- update(&mut cfg);
- self.persister.save_async(&cfg).await?;
- self.persisted.store(Arc::new(cfg));
- self.notify.notify_waiters();
- Ok(())
- }
-
- pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
- if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
- return Err(Error::Message(format!(
- "Invalid number of resync workers, must be between 1 and {}",
- MAX_RESYNC_WORKERS
- )));
- }
- self.update_persisted(|cfg| cfg.n_workers = n_workers).await
- }
-
- pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
- self.update_persisted(|cfg| cfg.tranquility = tranquility)
- .await
- }
}
impl Drop for BusyBlock {
@@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
next_delay: Duration,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
+ let persister = manager.resync.persister.clone();
Self {
index,
manager,
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
+ persister,
}
}
}
@@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
}
fn status(&self) -> WorkerStatus {
- let persisted = self.manager.resync.persisted.load();
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
- if self.index >= persisted.n_workers {
+ if self.index >= n_workers {
return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()],
..Default::default()
@@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
- tranquility: Some(persisted.tranquility),
+ tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if self.index >= self.manager.resync.persisted.load().n_workers {
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
+
+ if self.index >= n_workers {
return Ok(WorkerState::Idle);
}
self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await {
- Ok(ResyncIterResult::BusyDidSomething) => Ok(self
- .tranquilizer
- .tranquilize_worker(self.manager.resync.persisted.load().tranquility)),
+ Ok(ResyncIterResult::BusyDidSomething) => {
+ Ok(self.tranquilizer.tranquilize_worker(tranquility))
+ }
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
@@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
}
async fn wait_for_work(&mut self) -> WorkerState {
- while self.index >= self.manager.resync.persisted.load().n_workers {
+ while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await
}