From 936b6cb563b9dc8bb5c879f8bd6b89574f016f03 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 17:29:11 +0100 Subject: When saving block, delete .tmp file if we could not complete --- src/block/manager.rs | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 1b5a5df0..6d3131d2 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -6,6 +6,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use futures::Stream; @@ -649,14 +650,21 @@ impl BlockManagerLocked { } }; - let mut path2 = path.clone(); - path2.set_extension("tmp"); - let mut f = fs::File::create(&path2).await?; + let mut path_tmp = path.clone(); + let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); + path_tmp.set_extension(tmp_extension); + + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); + + let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; f.sync_all().await?; drop(f); - fs::rename(path2, path).await?; + fs::rename(path_tmp, path).await?; + + delete_on_drop.cancel(); + if let Some(to_delete) = to_delete { fs::remove_file(to_delete).await?; } @@ -722,3 +730,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result { .concat() .into()) } + +struct DeleteOnDrop(Option); + +impl DeleteOnDrop { + fn cancel(&mut self) { + drop(self.0.take()); + } +} + +impl Drop for DeleteOnDrop { + fn drop(&mut self) { + if let Some(path) = self.0.take() { + tokio::spawn(async move { + if let Err(e) = fs::remove_file(&path).await { + debug!("DeleteOnDrop failed for {}: {}", path.display(), e); + } + }); + } + } +} -- cgit v1.2.3 From f3f27293df83986ba29fb03f8af26a2177518e20 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jan 2023 13:07:13 +0100 Subject: Uniform framework for bg variable management --- src/block/manager.rs | 31 ++++++++++++++- src/block/repair.rs | 72 ++++++++++++++++++---------------- src/block/resync.rs | 108 +++++++++++++++++++++++++++------------------------ 3 files changed, 125 insertions(+), 86 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 1b5a5df0..19841d64 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::{vars, BackgroundRunner}; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; +use garage_util::persister::PersisterShared; +use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; @@ -89,6 +91,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, + pub scrub_persister: PersisterShared, tx_scrub_command: ArcSwapOption>, } @@ -128,6 +131,8 @@ impl BlockManager { let metrics = BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); + let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); + let block_manager = Arc::new(Self { replication, data_dir, @@ -138,6 +143,7 @@ impl BlockManager { system, endpoint, metrics, + scrub_persister, tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); @@ -155,7 +161,28 @@ impl BlockManager { // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new( + self.clone(), + scrub_rx, + self.scrub_persister.clone(), + )); + } + + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + self.resync.register_bg_vars(vars); + + vars.register_rw( + &self.scrub_persister, + "scrub-tranquility", + |p| p.get_with(|x| x.tranquility), + |p, tranquility| p.set_with(|x| x.tranquility = tranquility), + ); + vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { + p.get_with(|x| x.corruptions_detected) + }); } /// Ask nodes that might have a (possibly compressed) block for it diff --git a/src/block/repair.rs b/src/block/repair.rs index a6ded65a..064cc005 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -13,7 +13,7 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -168,17 +168,25 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - persister: Persister, - persisted: ScrubWorkerPersisted, + persister: PersisterShared, } #[derive(Serialize, Deserialize)] -struct ScrubWorkerPersisted { - tranquility: u32, - time_last_complete_scrub: u64, - corruptions_detected: u64, +pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, } impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +impl Default for ScrubWorkerPersisted { + fn default() -> Self { + ScrubWorkerPersisted { + time_last_complete_scrub: 0, + tranquility: INITIAL_SCRUB_TRANQUILITY, + corruptions_detected: 0, + } + } +} enum ScrubWorkerState { Running(BlockStoreIterator), @@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand { Pause(Duration), Resume, Cancel, - SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc, rx_cmd: mpsc::Receiver) -> Self { - let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ScrubWorkerPersisted { - time_last_complete_scrub: 0, - tranquility: INITIAL_SCRUB_TRANQUILITY, - corruptions_detected: 0, - }, - }; + pub(crate) fn new( + manager: Arc, + rx_cmd: mpsc::Receiver, + persister: PersisterShared, + ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, - persisted, } } @@ -267,12 +268,6 @@ impl ScrubWorker { } } } - ScrubWorkerCommand::SetTranquility(t) => { - self.persisted.tranquility = t; - if let Err(e) = self.persister.save_async(&self.persisted).await { - error!("Could not save new tranquilitiy value: {}", e); - } - } } } } @@ -284,9 +279,18 @@ impl Worker for ScrubWorker { } fn status(&self) -> WorkerStatus { + let (corruptions_detected, tranquility, time_last_complete_scrub) = + self.persister.get_with(|p| { + ( + p.corruptions_detected, + p.tranquility, + p.time_last_complete_scrub, + ) + }); + let mut s = WorkerStatus { - persistent_errors: Some(self.persisted.corruptions_detected), - tranquility: Some(self.persisted.tranquility), + persistent_errors: Some(corruptions_detected), + tranquility: Some(tranquility), ..Default::default() }; match &self.work { @@ -300,7 +304,7 @@ impl Worker for ScrubWorker { ScrubWorkerState::Finished => { s.freeform = vec![format!( "Last scrub completed at {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) + msec_to_rfc3339(time_last_complete_scrub) )]; } } @@ -321,18 +325,17 @@ impl Worker for ScrubWorker { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); - self.persisted.corruptions_detected += 1; - self.persister.save_async(&self.persisted).await?; + self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer - .tranquilize_worker(self.persisted.tranquility)) + .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persisted.time_last_complete_scrub = now_msec(); - self.persister.save_async(&self.persisted).await?; + self.persister + .set_with(|p| p.time_last_complete_scrub = now_msec())?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); Ok(WorkerState::Idle) @@ -347,7 +350,8 @@ impl Worker for ScrubWorker { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_last_complete_scrub) + + SCRUB_INTERVAL.as_millis() as u64, ScrubWorkerCommand::Start, ), }; diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c7b3b0e..ea280ad4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; -use arc_swap::ArcSwap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -22,7 +21,7 @@ use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2; pub struct BlockResyncManager { pub(crate) queue: CountedTree, - pub(crate) notify: Notify, + pub(crate) notify: Arc, pub(crate) errors: CountedTree, busy_set: BusySet, - persister: Persister, - persisted: ArcSwap, + persister: PersisterShared, } #[derive(Serialize, Deserialize, Clone, Copy)] @@ -64,6 +62,14 @@ struct ResyncPersistedConfig { tranquility: u32, } impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} +impl Default for ResyncPersistedConfig { + fn default() -> Self { + ResyncPersistedConfig { + n_workers: 1, + tranquility: INITIAL_RESYNC_TRANQUILITY, + } + } +} enum ResyncIterResult { BusyDidSomething, @@ -91,22 +97,14 @@ impl BlockResyncManager { .expect("Unable to open block_local_resync_errors tree"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); - let persister = Persister::new(&system.metadata_dir, "resync_cfg"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ResyncPersistedConfig { - n_workers: 1, - tranquility: INITIAL_RESYNC_TRANQUILITY, - }, - }; + let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg"); Self { queue, - notify: Notify::new(), + notify: Arc::new(Notify::new()), errors, busy_set: Arc::new(Mutex::new(HashSet::new())), persister, - persisted: ArcSwap::new(Arc::new(persisted)), } } @@ -142,6 +140,38 @@ impl BlockResyncManager { ))) } + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-worker-count", + |p| p.get_with(|x| x.n_workers), + move |p, n_workers| { + if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { + return Err(Error::Message(format!( + "Invalid number of resync workers, must be between 1 and {}", + MAX_RESYNC_WORKERS + ))); + } + p.set_with(|x| x.n_workers = n_workers)?; + notify.notify_waiters(); + Ok(()) + }, + ); + + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-tranquility", + |p| p.get_with(|x| x.tranquility), + move |p, tranquility| { + p.set_with(|x| x.tranquility = tranquility)?; + notify.notify_waiters(); + Ok(()) + }, + ); + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -436,33 +466,6 @@ impl BlockResyncManager { Ok(()) } - - async fn update_persisted( - &self, - update: impl Fn(&mut ResyncPersistedConfig), - ) -> Result<(), Error> { - let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref(); - update(&mut cfg); - self.persister.save_async(&cfg).await?; - self.persisted.store(Arc::new(cfg)); - self.notify.notify_waiters(); - Ok(()) - } - - pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> { - if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { - return Err(Error::Message(format!( - "Invalid number of resync workers, must be between 1 and {}", - MAX_RESYNC_WORKERS - ))); - } - self.update_persisted(|cfg| cfg.n_workers = n_workers).await - } - - pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { - self.update_persisted(|cfg| cfg.tranquility = tranquility) - .await - } } impl Drop for BusyBlock { @@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker { manager: Arc, tranquilizer: Tranquilizer, next_delay: Duration, + persister: PersisterShared, } impl ResyncWorker { pub(crate) fn new(index: usize, manager: Arc) -> Self { + let persister = manager.resync.persister.clone(); Self { index, manager, tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), + persister, } } } @@ -497,9 +503,9 @@ impl Worker for ResyncWorker { } fn status(&self) -> WorkerStatus { - let persisted = self.manager.resync.persisted.load(); + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); - if self.index >= persisted.n_workers { + if self.index >= n_workers { return WorkerStatus { freeform: vec!["This worker is currently disabled".into()], ..Default::default() @@ -508,22 +514,24 @@ impl Worker for ResyncWorker { WorkerStatus { queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), - tranquility: Some(persisted.tranquility), + tranquility: Some(tranquility), persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - if self.index >= self.manager.resync.persisted.load().n_workers { + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); + + if self.index >= n_workers { return Ok(WorkerState::Idle); } self.tranquilizer.reset(); match self.manager.resync.resync_iter(&self.manager).await { - Ok(ResyncIterResult::BusyDidSomething) => Ok(self - .tranquilizer - .tranquilize_worker(self.manager.resync.persisted.load().tranquility)), + Ok(ResyncIterResult::BusyDidSomething) => { + Ok(self.tranquilizer.tranquilize_worker(tranquility)) + } Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; @@ -542,7 +550,7 @@ impl Worker for ResyncWorker { } async fn wait_for_work(&mut self) -> WorkerState { - while self.index >= self.manager.resync.persisted.load().n_workers { + while self.index >= self.persister.get_with(|x| x.n_workers) { self.manager.resync.notify.notified().await } -- cgit v1.2.3 From 4cfb469d2bcce5bd41fe0af0022b34716ac7279a Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 9 Jan 2023 18:49:33 +0000 Subject: block/metrics.rs: Added compression_level metric. --- src/block/manager.rs | 8 ++++++-- src/block/metrics.rs | 17 ++++++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 1655be06..051a9f93 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -129,8 +129,12 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let metrics = - BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); + let metrics = BlockManagerMetrics::new( + compression_level, + rc.rc.clone(), + resync.queue.clone(), + resync.errors.clone(), + ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); diff --git a/src/block/metrics.rs b/src/block/metrics.rs index fbef95af..500022fc 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -5,6 +5,7 @@ use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { + pub(crate) _compression_level: ValueObserver, pub(crate) _rc_size: ValueObserver, pub(crate) _resync_queue_len: ValueObserver, pub(crate) _resync_errored_blocks: ValueObserver, @@ -25,9 +26,23 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self { + pub fn new( + compression_level: Option, + rc_tree: db::Tree, + resync_queue: CountedTree, + resync_errors: CountedTree, + ) -> Self { let meter = global::meter("garage_model/block"); Self { + _compression_level: meter + .u64_value_observer("block.compression_level", move |observer| { + match compression_level { + Some(v) => observer.observe(v as u64, &[]), + None => observer.observe(0 as u64, &[]), + } + }) + .with_description("Garage compression level for node") + .init(), _rc_size: meter .u64_value_observer("block.rc_size", move |observer| { if let Ok(Some(v)) = rc_tree.fast_len() { -- cgit v1.2.3 From 5c3075fe0163f869a0d6cf76f8e5a86a441fa132 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 18:08:14 +0000 Subject: Cargo.toml: Updated zstd from 0.9 to 0.12. --- src/block/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/block') diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1e4eb64e..48bcd181 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -29,7 +29,7 @@ tracing = "0.1.30" rand = "0.8" async-compression = { version = "0.3", features = ["tokio", "zstd"] } -zstd = { version = "0.9", default-features = false } +zstd = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" -- cgit v1.2.3 From 20c1cdf662a0b5368a8020526f43e08baedfedaa Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 22:27:44 +0000 Subject: Cargo.toml: Loosen tracing dependency to just 0.1. --- src/block/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/block') diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 48bcd181..a7e8bc2c 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -25,7 +25,7 @@ arc-swap = "1.5" async-trait = "0.1.7" bytes = "1.0" hex = "0.4" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" async-compression = { version = "0.3", features = ["tokio", "zstd"] } -- cgit v1.2.3 From 8e93d6997415d60ba5c371da8b27065a57254a8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 26 Jan 2023 17:26:32 +0100 Subject: More clippy fixes --- src/block/metrics.rs | 2 +- src/block/rc.rs | 14 +++++++------- src/block/repair.rs | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) (limited to 'src/block') diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 500022fc..6659df32 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -38,7 +38,7 @@ impl BlockManagerMetrics { .u64_value_observer("block.compression_level", move |observer| { match compression_level { Some(v) => observer.observe(v as u64, &[]), - None => observer.observe(0 as u64, &[]), + None => observer.observe(0_u64, &[]), } }) .with_description("Garage compression level for node") diff --git a/src/block/rc.rs b/src/block/rc.rs index 8dae3960..94cb5eea 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -24,9 +24,9 @@ impl BlockRc { tx: &mut db::Transaction, hash: &Hash, ) -> db::TxOpResult { - let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?); match old_rc.increment().serialize() { - Some(x) => tx.insert(&self.rc, &hash, x)?, + Some(x) => tx.insert(&self.rc, hash, x)?, None => unreachable!(), }; Ok(old_rc.is_zero()) @@ -39,10 +39,10 @@ impl BlockRc { tx: &mut db::Transaction, hash: &Hash, ) -> db::TxOpResult { - let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); + let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement(); match new_rc.serialize() { - Some(x) => tx.insert(&self.rc, &hash, x)?, - None => tx.remove(&self.rc, &hash)?, + Some(x) => tx.insert(&self.rc, hash, x)?, + None => tx.remove(&self.rc, hash)?, }; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } @@ -57,10 +57,10 @@ impl BlockRc { pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); self.rc.db().transaction(|mut tx| { - let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?); match rcval { RcEntry::Deletable { at_time } if now > at_time => { - tx.remove(&self.rc, &hash)?; + tx.remove(&self.rc, hash)?; } _ => (), }; diff --git a/src/block/repair.rs b/src/block/repair.rs index 064cc005..d4593dbf 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -466,11 +466,11 @@ impl BlockStoreIterator { let ent_type = data_dir_ent.file_type().await?; let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { let path = data_dir_ent.path(); self.path.push(ReadingDir::Pending(path)); } else if name.len() == 64 { - if let Ok(h) = hex::decode(&name) { + if let Ok(h) = hex::decode(name) { let mut hash = [0u8; 32]; hash.copy_from_slice(&h); return Ok(Some(hash.into())); -- cgit v1.2.3 From 53d09eb00f29fe5c7a8a14fa22ce6cfc64bb0b14 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Sat, 4 Mar 2023 16:16:10 +0000 Subject: block/repair.rs: Added function and time_next_run_scrub with a random element of 10 days to SCRUB_INTERVAL to help balance scrub load across cluster. --- src/block/repair.rs | 48 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index d4593dbf..10d46291 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::select; @@ -19,8 +20,8 @@ use garage_util::tranquilizer::Tranquilizer; use crate::manager::*; -// Full scrub every 30 days -const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); +// Full scrub every 25 days with a random element of 10 days mixed in below +const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25); // Scrub tranquility is initially set to 4, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_SCRUB_TRANQUILITY: u32 = 4; @@ -175,13 +176,30 @@ pub struct ScrubWorker { pub struct ScrubWorkerPersisted { pub tranquility: u32, pub(crate) time_last_complete_scrub: u64, + pub(crate) time_next_run_scrub: u64, pub(crate) corruptions_detected: u64, } + +fn randomize_next_scrub_run_time() -> u64 { + // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to + // balance scrub load across different cluster nodes. + + let next_run_timestamp = now_msec() + + SCRUB_INTERVAL + .saturating_add(Duration::from_secs( + rand::thread_rng().gen_range(0..3600 * 24 * 10), + )) + .as_millis() as u64; + + next_run_timestamp +} + impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} impl Default for ScrubWorkerPersisted { fn default() -> Self { ScrubWorkerPersisted { time_last_complete_scrub: 0, + time_next_run_scrub: randomize_next_scrub_run_time(), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, } @@ -279,12 +297,13 @@ impl Worker for ScrubWorker { } fn status(&self) -> WorkerStatus { - let (corruptions_detected, tranquility, time_last_complete_scrub) = + let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) = self.persister.get_with(|p| { ( p.corruptions_detected, p.tranquility, p.time_last_complete_scrub, + p.time_next_run_scrub, ) }); @@ -302,10 +321,16 @@ impl Worker for ScrubWorker { s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; } ScrubWorkerState::Finished => { - s.freeform = vec![format!( - "Last scrub completed at {}", - msec_to_rfc3339(time_last_complete_scrub) - )]; + s.freeform = vec![ + format!( + "Last scrub completed at {}", + msec_to_rfc3339(time_last_complete_scrub), + ), + format!( + "Next scrub scheduled for {}", + msec_to_rfc3339(time_next_run_scrub) + ), + ]; } } s @@ -334,8 +359,10 @@ impl Worker for ScrubWorker { .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persister - .set_with(|p| p.time_last_complete_scrub = now_msec())?; + self.persister.set_with(|p| { + p.time_last_complete_scrub = now_msec(); + p.time_next_run_scrub = randomize_next_scrub_run_time(); + })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); Ok(WorkerState::Idle) @@ -350,8 +377,7 @@ impl Worker for ScrubWorker { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persister.get_with(|p| p.time_last_complete_scrub) - + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, ), }; -- cgit v1.2.3 From 148b66b843be7f79f15b0a3805f38a0c3e944214 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 6 Mar 2023 12:31:03 +0000 Subject: block/manager.rs: Display scrub-next-run. --- src/block/manager.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 051a9f93..bfa66069 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -185,6 +185,9 @@ impl BlockManager { vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) }); + vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub)) + }); vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { p.get_with(|x| x.corruptions_detected) }); -- cgit v1.2.3 From b70cc0a94087bfd70931ff6741299823b48ad291 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 9 Mar 2023 15:34:14 +0000 Subject: block/repair.rs: Added migration for ScrubWorkerPersisted's time_next_run_scrub. Fixes: #520. --- src/block/repair.rs | 54 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 10 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index 10d46291..006cc866 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -5,7 +5,6 @@ use std::time::Duration; use async_trait::async_trait; use rand::Rng; -use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::select; use tokio::sync::mpsc; @@ -162,6 +161,50 @@ impl Worker for RepairWorker { // and whose parameter (esp. speed) can be controlled at runtime. // ---- ---- ---- +mod v081 { + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, + } + + impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +} + +mod v082 { + use serde::{Deserialize, Serialize}; + + use super::v081; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) time_next_run_scrub: u64, + pub(crate) corruptions_detected: u64, + } + + impl garage_util::migrate::Migrate for ScrubWorkerPersisted { + type Previous = v081::ScrubWorkerPersisted; + + fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted { + use crate::repair::randomize_next_scrub_run_time; + + ScrubWorkerPersisted { + tranquility: old.tranquility, + time_last_complete_scrub: old.time_last_complete_scrub, + time_next_run_scrub: randomize_next_scrub_run_time(), + corruptions_detected: old.corruptions_detected, + } + } + } +} + +pub use v082::*; + pub struct ScrubWorker { manager: Arc, rx_cmd: mpsc::Receiver, @@ -172,14 +215,6 @@ pub struct ScrubWorker { persister: PersisterShared, } -#[derive(Serialize, Deserialize)] -pub struct ScrubWorkerPersisted { - pub tranquility: u32, - pub(crate) time_last_complete_scrub: u64, - pub(crate) time_next_run_scrub: u64, - pub(crate) corruptions_detected: u64, -} - fn randomize_next_scrub_run_time() -> u64 { // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to // balance scrub load across different cluster nodes. @@ -194,7 +229,6 @@ fn randomize_next_scrub_run_time() -> u64 { next_run_timestamp } -impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} impl Default for ScrubWorkerPersisted { fn default() -> Self { ScrubWorkerPersisted { -- cgit v1.2.3 From 7b65dd24e2bacc20afa747dc2b1f3fb81249f688 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 9 Mar 2023 16:32:22 +0000 Subject: block/repair.rs: Added a timestamp argument to randomize_next_scrub_run_time(). --- src/block/repair.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index 006cc866..5476bf8a 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -196,7 +196,7 @@ mod v082 { ScrubWorkerPersisted { tranquility: old.tranquility, time_last_complete_scrub: old.time_last_complete_scrub, - time_next_run_scrub: randomize_next_scrub_run_time(), + time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), corruptions_detected: old.corruptions_detected, } } @@ -215,11 +215,11 @@ pub struct ScrubWorker { persister: PersisterShared, } -fn randomize_next_scrub_run_time() -> u64 { +fn randomize_next_scrub_run_time(timestamp: u64) -> u64 { // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to // balance scrub load across different cluster nodes. - let next_run_timestamp = now_msec() + let next_run_timestamp = timestamp + SCRUB_INTERVAL .saturating_add(Duration::from_secs( rand::thread_rng().gen_range(0..3600 * 24 * 10), @@ -233,7 +233,7 @@ impl Default for ScrubWorkerPersisted { fn default() -> Self { ScrubWorkerPersisted { time_last_complete_scrub: 0, - time_next_run_scrub: randomize_next_scrub_run_time(), + time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, } @@ -395,7 +395,7 @@ impl Worker for ScrubWorker { } else { self.persister.set_with(|p| { p.time_last_complete_scrub = now_msec(); - p.time_next_run_scrub = randomize_next_scrub_run_time(); + p.time_next_run_scrub = randomize_next_scrub_run_time(now_msec()); })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); -- cgit v1.2.3 From d218f475cbe6450f0402424e01b99c4630cb8a2f Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 9 Mar 2023 17:08:47 +0000 Subject: block/manager.rs: Set defaults for scrub_persister. --- src/block/manager.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index bfa66069..26278974 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -152,6 +152,7 @@ impl BlockManager { tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager.scrub_persister.set_with(|_| ()).unwrap(); block_manager } -- cgit v1.2.3 From 0a1ddcf6301359cde654003b46636497f6a417a4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 13 Mar 2023 18:44:09 +0100 Subject: Prepare for v0.8.2 --- src/block/Cargo.toml | 10 +++++----- src/block/repair.rs | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'src/block') diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index a7e8bc2c..c6985754 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_block" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,10 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } -garage_rpc = { version = "0.8.1", path = "../rpc" } -garage_util = { version = "0.8.1", path = "../util" } -garage_table = { version = "0.8.1", path = "../table" } +garage_db = { version = "0.8.2", path = "../db" } +garage_rpc = { version = "0.8.2", path = "../rpc" } +garage_util = { version = "0.8.2", path = "../util" } +garage_table = { version = "0.8.2", path = "../table" } opentelemetry = "0.17" diff --git a/src/block/repair.rs b/src/block/repair.rs index 5476bf8a..37ccd59a 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -189,6 +189,7 @@ mod v082 { impl garage_util::migrate::Migrate for ScrubWorkerPersisted { type Previous = v081::ScrubWorkerPersisted; + const VERSION_MARKER: &'static [u8] = b"G082bswp"; fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted { use crate::repair::randomize_next_scrub_run_time; -- cgit v1.2.3 From fb3bd11dce1f1a9fb8f60b93894300ce4fc72520 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Sun, 23 Apr 2023 19:32:28 +0100 Subject: block/repair.rs: Added log entries for scrub start/finish. --- src/block/repair.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index 37ccd59a..c89484d9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -281,6 +281,7 @@ impl ScrubWorker { ScrubWorkerCommand::Start => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Finished => { + info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); ScrubWorkerState::Running(iterator) } @@ -394,12 +395,21 @@ impl Worker for ScrubWorker { .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { + let now = now_msec(); + let next_scrub_timestamp = randomize_next_scrub_run_time(now); + self.persister.set_with(|p| { - p.time_last_complete_scrub = now_msec(); - p.time_next_run_scrub = randomize_next_scrub_run_time(now_msec()); + p.time_last_complete_scrub = now; + p.time_next_run_scrub = next_scrub_timestamp; })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); + + info!( + "Datastore scrub completed, next scrub scheduled for {}", + msec_to_rfc3339(next_scrub_timestamp) + ); + Ok(WorkerState::Idle) } } -- cgit v1.2.3