diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/Cargo.toml | 12 | ||||
-rw-r--r-- | src/block/manager.rs | 30 | ||||
-rw-r--r-- | src/block/repair.rs | 14 |
3 files changed, 35 insertions, 21 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index c6985754..df16959b 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -14,10 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.2", path = "../db" } -garage_rpc = { version = "0.8.2", path = "../rpc" } -garage_util = { version = "0.8.2", path = "../util" } -garage_table = { version = "0.8.2", path = "../table" } +garage_db.workspace = true +garage_rpc.workspace = true +garage_util.workspace = true +garage_table.workspace = true opentelemetry = "0.17" @@ -28,7 +28,7 @@ hex = "0.4" tracing = "0.1" rand = "0.8" -async-compression = { version = "0.3", features = ["tokio", "zstd"] } +async-compression = { version = "0.4", features = ["tokio", "zstd"] } zstd = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } @@ -37,7 +37,7 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -tokio-util = { version = "0.6", features = ["io"] } +tokio-util = { version = "0.7", features = ["io"] } [features] system-libs = [ "zstd/pkg-config" ] diff --git a/src/block/manager.rs b/src/block/manager.rs index 26278974..3ece9a8a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -600,12 +600,32 @@ impl BlockManager { /// Utility: check if block is stored compressed. Error if block is not stored async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> { let mut path = self.block_path(hash); - path.set_extension("zst"); - if fs::metadata(&path).await.is_ok() { - return Ok(true); + + // If compression is disabled on node - check for the raw block + // first and then a compressed one (as compression may have been + // previously enabled). + match self.compression_level { + None => { + if fs::metadata(&path).await.is_ok() { + return Ok(false); + } + + path.set_extension("zst"); + + fs::metadata(&path).await.map(|_| true).map_err(Into::into) + } + _ => { + path.set_extension("zst"); + + if fs::metadata(&path).await.is_ok() { + return Ok(true); + } + + path.set_extension(""); + + fs::metadata(&path).await.map(|_| false).map_err(Into::into) + } } - path.set_extension(""); - fs::metadata(&path).await.map(|_| false).map_err(Into::into) } async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { diff --git a/src/block/repair.rs b/src/block/repair.rs index c89484d9..71093d69 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -220,14 +220,12 @@ fn randomize_next_scrub_run_time(timestamp: u64) -> u64 { // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to // balance scrub load across different cluster nodes. - let next_run_timestamp = timestamp + timestamp + SCRUB_INTERVAL .saturating_add(Duration::from_secs( rand::thread_rng().gen_range(0..3600 * 24 * 10), )) - .as_millis() as u64; - - next_run_timestamp + .as_millis() as u64 } impl Default for ScrubWorkerPersisted { @@ -241,18 +239,14 @@ impl Default for ScrubWorkerPersisted { } } +#[derive(Default)] enum ScrubWorkerState { Running(BlockStoreIterator), Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + #[default] Finished, } -impl Default for ScrubWorkerState { - fn default() -> Self { - ScrubWorkerState::Finished - } -} - #[derive(Debug)] pub enum ScrubWorkerCommand { Start, |