aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml12
-rw-r--r--src/block/manager.rs30
-rw-r--r--src/block/repair.rs14
3 files changed, 35 insertions, 21 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index c6985754..df16959b 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -14,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", path = "../db" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
-garage_util = { version = "0.8.2", path = "../util" }
-garage_table = { version = "0.8.2", path = "../table" }
+garage_db.workspace = true
+garage_rpc.workspace = true
+garage_util.workspace = true
+garage_table.workspace = true
opentelemetry = "0.17"
@@ -28,7 +28,7 @@ hex = "0.4"
tracing = "0.1"
rand = "0.8"
-async-compression = { version = "0.3", features = ["tokio", "zstd"] }
+async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
@@ -37,7 +37,7 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-util = { version = "0.6", features = ["io"] }
+tokio-util = { version = "0.7", features = ["io"] }
[features]
system-libs = [ "zstd/pkg-config" ]
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 26278974..3ece9a8a 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -600,12 +600,32 @@ impl BlockManager {
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
- path.set_extension("zst");
- if fs::metadata(&path).await.is_ok() {
- return Ok(true);
+
+ // If compression is disabled on node - check for the raw block
+ // first and then a compressed one (as compression may have been
+ // previously enabled).
+ match self.compression_level {
+ None => {
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(false);
+ }
+
+ path.set_extension("zst");
+
+ fs::metadata(&path).await.map(|_| true).map_err(Into::into)
+ }
+ _ => {
+ path.set_extension("zst");
+
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(true);
+ }
+
+ path.set_extension("");
+
+ fs::metadata(&path).await.map(|_| false).map_err(Into::into)
+ }
}
- path.set_extension("");
- fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
diff --git a/src/block/repair.rs b/src/block/repair.rs
index c89484d9..71093d69 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -220,14 +220,12 @@ fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
// Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
// balance scrub load across different cluster nodes.
- let next_run_timestamp = timestamp
+ timestamp
+ SCRUB_INTERVAL
.saturating_add(Duration::from_secs(
rand::thread_rng().gen_range(0..3600 * 24 * 10),
))
- .as_millis() as u64;
-
- next_run_timestamp
+ .as_millis() as u64
}
impl Default for ScrubWorkerPersisted {
@@ -241,18 +239,14 @@ impl Default for ScrubWorkerPersisted {
}
}
+#[derive(Default)]
enum ScrubWorkerState {
Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ #[default]
Finished,
}
-impl Default for ScrubWorkerState {
- fn default() -> Self {
- ScrubWorkerState::Finished
- }
-}
-
#[derive(Debug)]
pub enum ScrubWorkerCommand {
Start,