aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs68
1 files changed, 37 insertions, 31 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 18fadf85..eeacf8b9 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -22,7 +22,7 @@ use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
-use garage_util::config::DataDirEnum;
+use garage_util::config::Config;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -84,6 +84,7 @@ pub struct BlockManager {
data_fsync: bool,
compression_level: Option<i32>,
+ disable_scrub: bool,
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
@@ -119,9 +120,7 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
- data_dir: DataDirEnum,
- data_fsync: bool,
- compression_level: Option<i32>,
+ config: &Config,
replication: TableShardedReplication,
system: Arc<System>,
) -> Result<Arc<Self>, Error> {
@@ -131,11 +130,13 @@ impl BlockManager {
let data_layout = match data_layout_persister.load() {
Ok(mut layout) => {
layout
- .update(&data_dir)
+ .update(&config.data_dir)
.ok_or_message("invalid data_dir config")?;
layout
}
- Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
+ Err(_) => {
+ DataLayout::initialize(&config.data_dir).ok_or_message("invalid data_dir config")?
+ }
};
data_layout_persister
.save(&data_layout)
@@ -154,7 +155,7 @@ impl BlockManager {
.endpoint("garage_block/manager.rs/Rpc".to_string());
let metrics = BlockManagerMetrics::new(
- compression_level,
+ config.compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
@@ -166,8 +167,9 @@ impl BlockManager {
replication,
data_layout: ArcSwap::new(Arc::new(data_layout)),
data_layout_persister,
- data_fsync,
- compression_level,
+ data_fsync: config.data_fsync,
+ disable_scrub: config.disable_scrub,
+ compression_level: config.compression_level,
mutation_lock: vec![(); MUTEX_COUNT]
.iter()
.map(|_| Mutex::new(BlockManagerLocked()))
@@ -194,33 +196,37 @@ impl BlockManager {
}
// Spawn scrub worker
- let (scrub_tx, scrub_rx) = mpsc::channel(1);
- self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
- bg.spawn_worker(ScrubWorker::new(
- self.clone(),
- scrub_rx,
- self.scrub_persister.clone(),
- ));
+ if !self.disable_scrub {
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ bg.spawn_worker(ScrubWorker::new(
+ self.clone(),
+ scrub_rx,
+ self.scrub_persister.clone(),
+ ));
+ }
}
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
self.resync.register_bg_vars(vars);
- vars.register_rw(
- &self.scrub_persister,
- "scrub-tranquility",
- |p| p.get_with(|x| x.tranquility),
- |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
- );
- vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
- p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
- });
- vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
- p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
- });
- vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
- p.get_with(|x| x.corruptions_detected)
- });
+ if !self.disable_scrub {
+ vars.register_rw(
+ &self.scrub_persister,
+ "scrub-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
+ );
+ vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
+ p.get_with(|x| x.corruptions_detected)
+ });
+ }
}
/// Ask nodes that might have a (possibly compressed) block for it