aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs326
-rw-r--r--src/block/repair.rs468
4 files changed, 585 insertions, 211 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 80346aca..2555a44a 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17"
+arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
diff --git a/src/block/lib.rs b/src/block/lib.rs
index dc685657..ebdb95d8 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -2,6 +2,7 @@
extern crate tracing;
pub mod manager;
+pub mod repair;
mod block;
mod metrics;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 32ba0431..36166ae3 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,18 +1,17 @@
-use core::ops::Bound;
-
use std::convert::TryInto;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::*;
-use futures::select;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::sync::{watch, Mutex, Notify};
+use tokio::select;
+use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -22,6 +21,7 @@ use opentelemetry::{
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -36,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
+use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -93,16 +94,18 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- rc: BlockRc,
+ pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
resync_errors: CountedTree,
- system: Arc<System>,
+ pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
+
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@@ -110,6 +113,12 @@ pub struct BlockManager {
// it INSIDE a Mutex.
struct BlockManagerLocked();
+enum ResyncIterResult {
+ BusyDidSomething,
+ BusyDidNothing,
+ IdleFor(Duration),
+}
+
impl BlockManager {
pub fn new(
db: &db::Db,
@@ -157,10 +166,11 @@ impl BlockManager {
system,
endpoint,
metrics,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
- block_manager.clone().spawn_background_worker();
+ block_manager.clone().spawn_background_workers();
block_manager
}
@@ -218,90 +228,6 @@ impl BlockManager {
Ok(())
}
- /// Launch the repair procedure on the data store
- ///
- /// This will list all blocks locally present, as well as those
- /// that are required because of refcount > 0, and will try
- /// to fix any mismatch between the two.
- pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // 1. Repair blocks from RC table.
- let mut next_start: Option<Hash> = None;
- loop {
- // We have to do this complicated two-step process where we first read a bunch
- // of hashes from the RC table, and then insert them in the to-resync queue,
- // because of SQLite. Basically, as long as we have an iterator on a DB table,
- // we can't do anything else on the DB. The naive approach (which we had previously)
- // of just iterating on the RC table and inserting items one to one in the resync
- // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
- // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
- // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
- // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
- let mut batch_of_hashes = vec![];
- let start_bound = match next_start.as_ref() {
- None => Bound::Unbounded,
- Some(x) => Bound::Excluded(x.as_slice()),
- };
- for entry in self
- .rc
- .rc
- .range::<&[u8], _>((start_bound, Bound::Unbounded))?
- {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- batch_of_hashes.push(hash);
- if batch_of_hashes.len() >= 1000 {
- break;
- }
- }
- if batch_of_hashes.is_empty() {
- break;
- }
-
- for hash in batch_of_hashes.into_iter() {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- next_start = Some(hash)
- }
-
- if *must_exit.borrow() {
- return Ok(());
- }
- }
-
- // 2. Repair blocks actually on disk
- // Lists all blocks on disk and adds them to the resync queue.
- // This allows us to find blocks we are storing but don't actually need,
- // so that we can offload them if necessary and then delete them locally.
- self.for_each_file(
- (),
- move |_, hash| async move {
- self.put_to_resync(&hash, Duration::from_secs(0))
- .map_err(Into::into)
- },
- must_exit,
- )
- .await
- }
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn scrub_data_store(
- &self,
- must_exit: &watch::Receiver<bool>,
- tranquility: u32,
- ) -> Result<(), Error> {
- let tranquilizer = Tranquilizer::new(30);
- self.for_each_file(
- tranquilizer,
- move |mut tranquilizer, hash| async move {
- let _ = self.read_block(&hash).await;
- tranquilizer.tranquilize(tranquility).await;
- Ok(tranquilizer)
- },
- must_exit,
- )
- .await
- }
-
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@@ -321,6 +247,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Send command to start/stop/manager scrub worker
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
+ let _ = self
+ .tx_scrub_command
+ .load()
+ .as_ref()
+ .unwrap()
+ .send(cmd)
+ .await;
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@@ -390,7 +327,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -554,18 +491,27 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
- fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
+ let worker = ResyncWorker {
+ manager: self.clone(),
+ tranquilizer: Tranquilizer::new(30),
+ next_delay: Duration::from_secs(10),
+ };
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
- background.spawn_worker("block resync worker".into(), move |must_exit| {
- self.resync_loop(must_exit)
- });
+ background.spawn_worker(worker);
});
+
+ // Launch a background worker for data store scrubs
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx);
+ self.system.background.spawn_worker(scrub_worker);
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
+ pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@@ -579,37 +525,7 @@ impl BlockManager {
Ok(())
}
- async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- let mut tranquilizer = Tranquilizer::new(30);
-
- while !*must_exit.borrow() {
- match self.resync_iter(&mut must_exit).await {
- Ok(true) => {
- tranquilizer.tranquilize(self.background_tranquility).await;
- }
- Ok(false) => {
- tranquilizer.reset();
- }
- Err(e) => {
- // The errors that we have here are only Sled errors
- // We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
- // if it does there is not much we can do -- TODO should we just panic?)
- error!(
- "Could not do a resync iteration: {} (this is a very bad error)",
- e
- );
- tranquilizer.reset();
- }
- }
- }
- }
-
- // The result of resync_iter is:
- // - Ok(true) -> a block was processed (successfully or not)
- // - Ok(false) -> no block was processed, but we are ready for the next iteration
- // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ async fn resync_iter(&self) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -629,7 +545,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
- return Ok(false);
+ return Ok(ResyncIterResult::BusyDidNothing);
}
}
@@ -676,15 +592,11 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?;
}
- Ok(true)
+ Ok(ResyncIterResult::BusyDidSomething)
} else {
- let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_millis(
+ time_msec - now,
+ )))
}
} else {
// Here we wait either for a notification that an item has been
@@ -693,13 +605,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine.
- let delay = tokio::time::sleep(Duration::from_secs(10));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_secs(10)))
}
}
@@ -814,72 +720,6 @@ impl BlockManager {
Ok(())
}
-
- // ---- Utility: iteration on files in the data directory ----
-
- async fn for_each_file<F, Fut, State>(
- &self,
- state: State,
- mut f: F,
- must_exit: &watch::Receiver<bool>,
- ) -> Result<(), Error>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send,
- {
- self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
- .await
- .map(|_| ())
- }
-
- fn for_each_file_rec<'a, F, Fut, State>(
- &'a self,
- path: &'a Path,
- mut state: State,
- f: &'a mut F,
- must_exit: &'a watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<State, Error>>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send + 'a,
- {
- async move {
- let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
- if *must_exit.borrow() {
- break;
- }
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- state = self
- .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
- .await?;
- } else if name.len() == 64 {
- let hash_bytes = if let Ok(h) = hex::decode(&name) {
- h
- } else {
- continue;
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- state = f(state, hash.into()).await?;
- }
- }
- Ok(state)
- }
- .boxed()
- }
}
#[async_trait]
@@ -898,6 +738,70 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+struct ResyncWorker {
+ manager: Arc<BlockManager>,
+ tranquilizer: Tranquilizer,
+ next_delay: Duration,
+}
+
+#[async_trait]
+impl Worker for ResyncWorker {
+ fn name(&self) -> String {
+ "Block resync worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let mut ret = vec![];
+ let qlen = self.manager.resync_queue_len().unwrap_or(0);
+ let elen = self.manager.resync_errors_len().unwrap_or(0);
+ if qlen > 0 {
+ ret.push(format!("{} blocks in queue", qlen));
+ }
+ if elen > 0 {
+ ret.push(format!("{} blocks in error state", elen));
+ }
+ if !ret.is_empty() {
+ Some(ret.join(", "))
+ } else {
+ None
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ self.tranquilizer.reset();
+ match self.manager.resync_iter().await {
+ Ok(ResyncIterResult::BusyDidSomething) => Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.manager.background_tranquility)),
+ Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
+ Ok(ResyncIterResult::IdleFor(delay)) => {
+ self.next_delay = delay;
+ Ok(WorkerStatus::Idle)
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ // Here we just give the error to the worker manager,
+ // it will print it to the logs and increment a counter
+ Err(e.into())
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ select! {
+ _ = tokio::time::sleep(self.next_delay) => (),
+ _ = self.manager.resync_notify.notified() => (),
+ };
+ WorkerStatus::Busy
+ }
+}
+
struct BlockStatus {
exists: bool,
needed: RcEntry,
diff --git a/src/block/repair.rs b/src/block/repair.rs
new file mode 100644
index 00000000..284a8846
--- /dev/null
+++ b/src/block/repair.rs
@@ -0,0 +1,468 @@
+use core::ops::Bound;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use tokio::fs;
+use tokio::select;
+use tokio::sync::mpsc;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::persister::Persister;
+use garage_util::time::*;
+use garage_util::tranquilizer::Tranquilizer;
+
+use crate::manager::*;
+
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
+
+pub struct RepairWorker {
+ manager: Arc<BlockManager>,
+ next_start: Option<Hash>,
+ block_iter: Option<BlockStoreIterator>,
+}
+
+impl RepairWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ Self {
+ manager,
+ next_start: None,
+ block_iter: None,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairWorker {
+ fn name(&self) -> String {
+ "Block repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ match self.block_iter.as_ref() {
+ None => {
+ let idx_bytes = self
+ .next_start
+ .as_ref()
+ .map(|x| x.as_slice())
+ .unwrap_or(&[]);
+ let idx_bytes = if idx_bytes.len() > 4 {
+ &idx_bytes[..4]
+ } else {
+ idx_bytes
+ };
+ Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ }
+ Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.block_iter.as_mut() {
+ None => {
+ // Phase 1: Repair blocks from RC table.
+
+ // We have to do this complicated two-step process where we first read a bunch
+ // of hashes from the RC table, and then insert them in the to-resync queue,
+ // because of SQLite. Basically, as long as we have an iterator on a DB table,
+ // we can't do anything else on the DB. The naive approach (which we had previously)
+ // of just iterating on the RC table and inserting items one to one in the resync
+ // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
+ // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
+ // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
+ // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
+ // TODO: maybe do this with tokio::task::spawn_blocking ?
+ let mut batch_of_hashes = vec![];
+ let start_bound = match self.next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .manager
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
+ }
+ if batch_of_hashes.is_empty() {
+ // move on to phase 2
+ self.block_iter = Some(BlockStoreIterator::new(&self.manager));
+ return Ok(WorkerStatus::Busy);
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ self.next_start = Some(hash)
+ }
+
+ Ok(WorkerStatus::Busy)
+ }
+ Some(bi) => {
+ // Phase 2: Repair blocks actually on disk
+ // Lists all blocks on disk and adds them to the resync queue.
+ // This allows us to find blocks we are storing but don't actually need,
+ // so that we can offload them if necessary and then delete them locally.
+ if let Some(hash) = bi.next().await? {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ Ok(WorkerStatus::Busy)
+ } else {
+ Ok(WorkerStatus::Done)
+ }
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ unreachable!()
+ }
+}
+
+// ----
+
+pub struct ScrubWorker {
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+
+ work: ScrubWorkerState,
+ tranquilizer: Tranquilizer,
+
+ persister: Persister<ScrubWorkerPersisted>,
+ persisted: ScrubWorkerPersisted,
+}
+
+#[derive(Serialize, Deserialize)]
+struct ScrubWorkerPersisted {
+ tranquility: u32,
+ time_last_complete_scrub: u64,
+ corruptions_detected: u64,
+}
+
+enum ScrubWorkerState {
+ Running(BlockStoreIterator),
+ Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Finished,
+}
+
+impl Default for ScrubWorkerState {
+ fn default() -> Self {
+ ScrubWorkerState::Finished
+ }
+}
+
+#[derive(Debug)]
+pub enum ScrubWorkerCommand {
+ Start,
+ Pause(Duration),
+ Resume,
+ Cancel,
+ SetTranquility(u32),
+}
+
+impl ScrubWorker {
+ pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
+ let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
+ let persisted = match persister.load() {
+ Ok(v) => v,
+ Err(_) => ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ tranquility: 4,
+ corruptions_detected: 0,
+ },
+ };
+ Self {
+ manager,
+ rx_cmd,
+ work: ScrubWorkerState::Finished,
+ tranquilizer: Tranquilizer::new(30),
+ persister,
+ persisted,
+ }
+ }
+
+ async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
+ match cmd {
+ ScrubWorkerCommand::Start => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Finished => {
+ let iterator = BlockStoreIterator::new(&self.manager);
+ ScrubWorkerState::Running(iterator)
+ }
+ work => {
+ error!("Cannot start scrub worker: already running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Pause(dur) => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
+ ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ }
+ work => {
+ error!("Cannot pause scrub worker: not running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Resume => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ work => {
+ error!("Cannot resume scrub worker: not paused!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Cancel => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Finished
+ }
+ work => {
+ error!("Cannot cancel scrub worker: not running!");
+ work
+ }
+ }
+ }
+ ScrubWorkerCommand::SetTranquility(t) => {
+ self.persisted.tranquility = t;
+ if let Err(e) = self.persister.save_async(&self.persisted).await {
+ error!("Could not save new tranquilitiy value: {}", e);
+ }
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for ScrubWorker {
+ fn name(&self) -> String {
+ "Block scrub worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let s = match &self.work {
+ ScrubWorkerState::Running(bsi) => format!(
+ "{:.2}% done (tranquility = {})",
+ bsi.progress() * 100.,
+ self.persisted.tranquility
+ ),
+ ScrubWorkerState::Paused(_bsi, rt) => {
+ format!("Paused, resumes at {}", msec_to_rfc3339(*rt))
+ }
+ ScrubWorkerState::Finished => format!(
+ "Last completed scrub: {}",
+ msec_to_rfc3339(self.persisted.time_last_complete_scrub)
+ ),
+ };
+ Some(format!(
+ "{} ; corruptions detected: {}",
+ s, self.persisted.corruptions_detected
+ ))
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.rx_cmd.try_recv() {
+ Ok(cmd) => self.handle_cmd(cmd).await,
+ Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done),
+ Err(mpsc::error::TryRecvError::Empty) => (),
+ };
+
+ match &mut self.work {
+ ScrubWorkerState::Running(bsi) => {
+ self.tranquilizer.reset();
+ if let Some(hash) = bsi.next().await? {
+ match self.manager.read_block(&hash).await {
+ Err(Error::CorruptData(_)) => {
+ error!("Found corrupt data block during scrub: {:?}", hash);
+ self.persisted.corruptions_detected += 1;
+ self.persister.save_async(&self.persisted).await?;
+ }
+ Err(e) => return Err(e),
+ _ => (),
+ };
+ Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.persisted.tranquility))
+ } else {
+ self.persisted.time_last_complete_scrub = now_msec();
+ self.persister.save_async(&self.persisted).await?;
+ self.work = ScrubWorkerState::Finished;
+ self.tranquilizer.clear();
+ Ok(WorkerStatus::Idle)
+ }
+ }
+ _ => Ok(WorkerStatus::Idle),
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ match &self.work {
+ ScrubWorkerState::Running(_) => return WorkerStatus::Busy,
+ ScrubWorkerState::Paused(_, resume_time) => {
+ let now = now_msec();
+ if now >= *resume_time {
+ self.handle_cmd(ScrubWorkerCommand::Resume).await;
+ return WorkerStatus::Busy;
+ }
+ let delay = Duration::from_millis(*resume_time - now);
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume).await,
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd).await;
+ } else {
+ return WorkerStatus::Done;
+ }
+ }
+ }
+ ScrubWorkerState::Finished => {
+ let now = now_msec();
+ if now - self.persisted.time_last_complete_scrub
+ >= SCRUB_INTERVAL.as_millis() as u64
+ {
+ self.handle_cmd(ScrubWorkerCommand::Start).await;
+ return WorkerStatus::Busy;
+ }
+ let delay = SCRUB_INTERVAL
+ - Duration::from_millis(now - self.persisted.time_last_complete_scrub);
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start).await,
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd).await;
+ } else {
+ return WorkerStatus::Done;
+ }
+ }
+ }
+ }
+ match &self.work {
+ ScrubWorkerState::Running(_) => WorkerStatus::Busy,
+ _ => WorkerStatus::Idle,
+ }
+ }
+}
+
+// ----
+
+struct BlockStoreIterator {
+ path: Vec<ReadingDir>,
+}
+
+enum ReadingDir {
+ Pending(PathBuf),
+ Read {
+ subpaths: Vec<fs::DirEntry>,
+ pos: usize,
+ },
+}
+
+impl BlockStoreIterator {
+ fn new(manager: &BlockManager) -> Self {
+ let root_dir = manager.data_dir.clone();
+ Self {
+ path: vec![ReadingDir::Pending(root_dir)],
+ }
+ }
+
+ /// Returns progress done, between 0 and 1
+ fn progress(&self) -> f32 {
+ if self.path.is_empty() {
+ 1.0
+ } else {
+ let mut ret = 0.0;
+ let mut next_div = 1;
+ for p in self.path.iter() {
+ match p {
+ ReadingDir::Pending(_) => break,
+ ReadingDir::Read { subpaths, pos } => {
+ next_div *= subpaths.len();
+ ret += ((*pos - 1) as f32) / (next_div as f32);
+ }
+ }
+ }
+ ret
+ }
+ }
+
+ async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ loop {
+ let last_path = match self.path.last_mut() {
+ None => return Ok(None),
+ Some(lp) => lp,
+ };
+
+ if let ReadingDir::Pending(path) = last_path {
+ let mut reader = fs::read_dir(&path).await?;
+ let mut subpaths = vec![];
+ while let Some(ent) = reader.next_entry().await? {
+ subpaths.push(ent);
+ }
+ *last_path = ReadingDir::Read { subpaths, pos: 0 };
+ }
+
+ let (subpaths, pos) = match *last_path {
+ ReadingDir::Read {
+ ref subpaths,
+ ref mut pos,
+ } => (subpaths, pos),
+ ReadingDir::Pending(_) => unreachable!(),
+ };
+
+ if *pos >= subpaths.len() {
+ self.path.pop();
+ continue;
+ }
+
+ let data_dir_ent = match subpaths.get(*pos) {
+ None => {
+ self.path.pop();
+ continue;
+ }
+ Some(ent) => {
+ *pos += 1;
+ ent
+ }
+ };
+
+ let name = data_dir_ent.file_name();
+ let name = if let Ok(n) = name.into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ent_type = data_dir_ent.file_type().await?;
+
+ let name = name.strip_suffix(".zst").unwrap_or(&name);
+ if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ let path = data_dir_ent.path();
+ self.path.push(ReadingDir::Pending(path));
+ } else if name.len() == 64 {
+ let hash_bytes = if let Ok(h) = hex::decode(&name) {
+ h
+ } else {
+ continue;
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ return Ok(Some(hash.into()));
+ }
+ }
+ }
+}