aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs62
-rw-r--r--src/block/repair.rs26
-rw-r--r--src/garage/admin.rs19
-rw-r--r--src/garage/cli/structs.rs16
-rw-r--r--src/model/garage.rs1
-rw-r--r--src/util/config.rs7
6 files changed, 108 insertions, 23 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 017ba9da..ef48107f 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,7 +3,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
-use arc_swap::ArcSwapOption;
+use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -25,6 +25,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::persister::Persister;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -55,6 +56,10 @@ const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour)
const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
+// Resync tranquility is initially set to 2, but can be changed in the CLI
+// and the updated version is persisted over Garage restarts
+const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
+
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
@@ -90,7 +95,6 @@ pub struct BlockManager {
pub data_dir: PathBuf,
compression_level: Option<i32>,
- background_tranquility: u32,
mutation_lock: Mutex<BlockManagerLocked>,
@@ -100,6 +104,9 @@ pub struct BlockManager {
resync_notify: Notify,
resync_errors: CountedTree,
+ resync_persister: Persister<ResyncPersistedConfig>,
+ resync_persisted: ArcSwap<ResyncPersistedConfig>,
+
pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -124,7 +131,6 @@ impl BlockManager {
db: &db::Db,
data_dir: PathBuf,
compression_level: Option<i32>,
- background_tranquility: u32,
replication: TableShardedReplication,
system: Arc<System>,
) -> Arc<Self> {
@@ -145,6 +151,14 @@ impl BlockManager {
let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
+ let resync_persister = Persister::new(&system.metadata_dir, "resync_cfg");
+ let resync_persisted = match resync_persister.load() {
+ Ok(v) => v,
+ Err(_) => ResyncPersistedConfig {
+ tranquility: INITIAL_RESYNC_TRANQUILITY,
+ },
+ };
+
let endpoint = system
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
@@ -157,12 +171,13 @@ impl BlockManager {
replication,
data_dir,
compression_level,
- background_tranquility,
mutation_lock: Mutex::new(manager_locked),
rc,
resync_queue,
resync_notify: Notify::new(),
resync_errors,
+ resync_persister,
+ resync_persisted: ArcSwap::new(Arc::new(resync_persisted)),
system,
endpoint,
metrics,
@@ -716,6 +731,23 @@ impl BlockManager {
Ok(())
}
+
+ async fn update_resync_persisted(
+ &self,
+ update: impl Fn(&mut ResyncPersistedConfig),
+ ) -> Result<(), Error> {
+ let mut cfg: ResyncPersistedConfig = *self.resync_persisted.load().as_ref();
+ update(&mut cfg);
+ self.resync_persister.save_async(&cfg).await?;
+ self.resync_persisted.store(Arc::new(cfg));
+ self.resync_notify.notify_one();
+ Ok(())
+ }
+
+ pub async fn set_resync_tranquility(&self, tranquility: u32) -> Result<(), Error> {
+ self.update_resync_persisted(|cfg| cfg.tranquility = tranquility)
+ .await
+ }
}
#[async_trait]
@@ -734,6 +766,11 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+#[derive(Serialize, Deserialize, Clone, Copy)]
+struct ResyncPersistedConfig {
+ tranquility: u32,
+}
+
struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
@@ -758,19 +795,22 @@ impl Worker for ResyncWorker {
fn info(&self) -> Option<String> {
let mut ret = vec![];
+ ret.push(format!(
+ "tranquility = {}",
+ self.manager.resync_persisted.load().tranquility
+ ));
+
let qlen = self.manager.resync_queue_len().unwrap_or(0);
- let elen = self.manager.resync_errors_len().unwrap_or(0);
if qlen > 0 {
ret.push(format!("{} blocks in queue", qlen));
}
+
+ let elen = self.manager.resync_errors_len().unwrap_or(0);
if elen > 0 {
ret.push(format!("{} blocks in error state", elen));
}
- if !ret.is_empty() {
- Some(ret.join(", "))
- } else {
- None
- }
+
+ Some(ret.join(", "))
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@@ -778,7 +818,7 @@ impl Worker for ResyncWorker {
match self.manager.resync_iter().await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self
.tranquilizer
- .tranquilize_worker(self.manager.background_tranquility)),
+ .tranquilize_worker(self.manager.resync_persisted.load().tranquility)),
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 07ff6772..18e1de95 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -19,7 +19,17 @@ use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
-const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
+// Full scrub every 30 days
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30);
+// Scrub tranquility is initially set to 4, but can be changed in the CLI
+// and the updated version is persisted over Garage restarts
+const INITIAL_SCRUB_TRANQUILITY: u32 = 4;
+
+// ---- ---- ----
+// FIRST KIND OF REPAIR: FINDING MISSING BLOCKS/USELESS BLOCKS
+// This is a one-shot repair operation that can be launched,
+// checks everything, and then exits.
+// ---- ---- ----
pub struct RepairWorker {
manager: Arc<BlockManager>,
@@ -128,7 +138,13 @@ impl Worker for RepairWorker {
}
}
-// ----
+// ---- ---- ----
+// SECOND KIND OF REPAIR: SCRUBBING THE DATASTORE
+// This is significantly more complex than the process above,
+// as it is a continuously-running task that triggers automatically
+// every SCRUB_INTERVAL, but can also be triggered manually
+// and whose parameter (esp. speed) can be controlled at runtime.
+// ---- ---- ----
pub struct ScrubWorker {
manager: Arc<BlockManager>,
@@ -176,7 +192,7 @@ impl ScrubWorker {
Ok(v) => v,
Err(_) => ScrubWorkerPersisted {
time_last_complete_scrub: 0,
- tranquility: 4,
+ tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
},
};
@@ -343,7 +359,9 @@ impl Worker for ScrubWorker {
}
}
-// ----
+// ---- ---- ----
+// UTILITY FOR ENUMERATING THE BLOCK STORE
+// ---- ---- ----
struct BlockStoreIterator {
path: Vec<ReadingDir>,
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 71ee608c..1d80889c 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -15,6 +15,8 @@ use garage_table::*;
use garage_rpc::*;
+use garage_block::repair::ScrubWorkerCommand;
+
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
@@ -836,6 +838,23 @@ impl AdminRpcHandler {
let workers = self.garage.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, opt))
}
+ WorkerCmd::Set { opt } => match opt {
+ WorkerSetCmd::ScrubTranquility { tranquility } => {
+ let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
+ self.garage
+ .block_manager
+ .send_scrub_command(scrub_command)
+ .await;
+ Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
+ }
+ WorkerSetCmd::ResyncTranquility { tranquility } => {
+ self.garage
+ .block_manager
+ .set_resync_tranquility(tranquility)
+ .await?;
+ Ok(AdminRpc::Ok("Resync tranquility updated".into()))
+ }
+ },
}
}
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 9274f80f..1fba934f 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -501,6 +501,12 @@ pub enum WorkerCmd {
#[structopt(flatten)]
opt: WorkerListOpt,
},
+ /// Set worker parameter
+ #[structopt(name = "set", version = version::garage())]
+ Set {
+ #[structopt(subcommand)]
+ opt: WorkerSetCmd,
+ },
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
@@ -512,3 +518,13 @@ pub struct WorkerListOpt {
#[structopt(short = "e", long = "errors")]
pub errors: bool,
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerSetCmd {
+ /// Set tranquility of scrub operations
+ #[structopt(name = "scrub-tranquility", version = version::garage())]
+ ScrubTranquility { tranquility: u32 },
+ /// Set tranquility of resync operations
+ #[structopt(name = "resync-tranquility", version = version::garage())]
+ ResyncTranquility { tranquility: u32 },
+}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 15769a17..4dd24582 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -159,7 +159,6 @@ impl Garage {
&db,
config.data_dir.clone(),
config.compression_level,
- config.block_manager_background_tranquility,
data_rep_param,
system.clone(),
);
diff --git a/src/util/config.rs b/src/util/config.rs
index e8ef4fdd..a2bb8fb3 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -23,10 +23,6 @@ pub struct Config {
#[serde(default = "default_block_size")]
pub block_size: usize,
- /// Size of data blocks to save to disk
- #[serde(default = "default_block_manager_background_tranquility")]
- pub block_manager_background_tranquility: u32,
-
/// Replication mode. Supported values:
/// - none, 1 -> no replication
/// - 2 -> 2-way replication
@@ -147,9 +143,6 @@ fn default_sled_flush_every_ms() -> u64 {
fn default_block_size() -> usize {
1048576
}
-fn default_block_manager_background_tranquility() -> u32 {
- 2
-}
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {