aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs62
1 files changed, 51 insertions, 11 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 017ba9da..ef48107f 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,7 +3,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
-use arc_swap::ArcSwapOption;
+use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -25,6 +25,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::persister::Persister;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -55,6 +56,10 @@ const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour)
const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
+// Resync tranquility is initially set to 2, but can be changed in the CLI
+// and the updated version is persisted over Garage restarts
+const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
+
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
@@ -90,7 +95,6 @@ pub struct BlockManager {
pub data_dir: PathBuf,
compression_level: Option<i32>,
- background_tranquility: u32,
mutation_lock: Mutex<BlockManagerLocked>,
@@ -100,6 +104,9 @@ pub struct BlockManager {
resync_notify: Notify,
resync_errors: CountedTree,
+ resync_persister: Persister<ResyncPersistedConfig>,
+ resync_persisted: ArcSwap<ResyncPersistedConfig>,
+
pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -124,7 +131,6 @@ impl BlockManager {
db: &db::Db,
data_dir: PathBuf,
compression_level: Option<i32>,
- background_tranquility: u32,
replication: TableShardedReplication,
system: Arc<System>,
) -> Arc<Self> {
@@ -145,6 +151,14 @@ impl BlockManager {
let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
+ let resync_persister = Persister::new(&system.metadata_dir, "resync_cfg");
+ let resync_persisted = match resync_persister.load() {
+ Ok(v) => v,
+ Err(_) => ResyncPersistedConfig {
+ tranquility: INITIAL_RESYNC_TRANQUILITY,
+ },
+ };
+
let endpoint = system
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
@@ -157,12 +171,13 @@ impl BlockManager {
replication,
data_dir,
compression_level,
- background_tranquility,
mutation_lock: Mutex::new(manager_locked),
rc,
resync_queue,
resync_notify: Notify::new(),
resync_errors,
+ resync_persister,
+ resync_persisted: ArcSwap::new(Arc::new(resync_persisted)),
system,
endpoint,
metrics,
@@ -716,6 +731,23 @@ impl BlockManager {
Ok(())
}
+
+ async fn update_resync_persisted(
+ &self,
+ update: impl Fn(&mut ResyncPersistedConfig),
+ ) -> Result<(), Error> {
+ let mut cfg: ResyncPersistedConfig = *self.resync_persisted.load().as_ref();
+ update(&mut cfg);
+ self.resync_persister.save_async(&cfg).await?;
+ self.resync_persisted.store(Arc::new(cfg));
+ self.resync_notify.notify_one();
+ Ok(())
+ }
+
+ pub async fn set_resync_tranquility(&self, tranquility: u32) -> Result<(), Error> {
+ self.update_resync_persisted(|cfg| cfg.tranquility = tranquility)
+ .await
+ }
}
#[async_trait]
@@ -734,6 +766,11 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+#[derive(Serialize, Deserialize, Clone, Copy)]
+struct ResyncPersistedConfig {
+ tranquility: u32,
+}
+
struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
@@ -758,19 +795,22 @@ impl Worker for ResyncWorker {
fn info(&self) -> Option<String> {
let mut ret = vec![];
+ ret.push(format!(
+ "tranquility = {}",
+ self.manager.resync_persisted.load().tranquility
+ ));
+
let qlen = self.manager.resync_queue_len().unwrap_or(0);
- let elen = self.manager.resync_errors_len().unwrap_or(0);
if qlen > 0 {
ret.push(format!("{} blocks in queue", qlen));
}
+
+ let elen = self.manager.resync_errors_len().unwrap_or(0);
if elen > 0 {
ret.push(format!("{} blocks in error state", elen));
}
- if !ret.is_empty() {
- Some(ret.join(", "))
- } else {
- None
- }
+
+ Some(ret.join(", "))
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@@ -778,7 +818,7 @@ impl Worker for ResyncWorker {
match self.manager.resync_iter().await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self
.tranquilizer
- .tranquilize_worker(self.manager.background_tranquility)),
+ .tranquilize_worker(self.manager.resync_persisted.load().tranquility)),
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;