aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrinity Pointard <trinity.pointard@gmail.com>2021-06-24 01:34:28 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-27 10:31:03 +0200
commit28c015d9ffcb3255295572465654fdca680b1964 (patch)
treeda668e3594cfaf5bc59435127bf1390b94f98aab
parent4e8af1d95645447ee879c9681775c35db1764d58 (diff)
downloadgarage-28c015d9ffcb3255295572465654fdca680b1964.tar.gz
garage-28c015d9ffcb3255295572465654fdca680b1964.zip
add cli parameter to verify local bloc integrity
reuse code for listing local blocks add disk i/o speed limit on integrity check
-rw-r--r--src/garage/cli/structs.rs7
-rw-r--r--src/garage/repair.rs8
-rw-r--r--src/model/block.rs121
3 files changed, 107 insertions, 29 deletions
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index f134cd49..588900a3 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -282,6 +282,13 @@ pub enum RepairWhat {
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
#[structopt(name = "block_refs")]
BlockRefs,
+ /// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
+ #[structopt(name = "blocks_integrity")]
+ BlockIntegrity {
+ /// Limit on i/o speed, in B/s
+ #[structopt(name = "limit")]
+ limit: Option<usize>,
+ },
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 8200f1f0..a67bf2e5 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -63,6 +63,14 @@ impl Repair {
.await?;
}
+ if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what {
+ info!("Verifying integrity of stored blocks");
+ self.garage
+ .block_manager
+ .verify_data_store_integrity(&must_exit, limit)
+ .await?;
+ }
+
Ok(())
}
diff --git a/src/model/block.rs b/src/model/block.rs
index d1ea1512..c43c0b97 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
+use tokio::time::Instant;
use garage_util::data::*;
use garage_util::error::Error;
@@ -197,10 +198,15 @@ impl BlockManager {
}
// 2. Repair blocks actually on disk
- self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
- .await?;
-
- Ok(())
+ // Lists all blocks on disk and adds them to the resync queue.
+ // This allows us to find blocks we are storing but don't actually need,
+ // so that we can offload them if necessary and then delete them locally.
+ self.for_each_file(
+ (),
+ move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
+ must_exit,
+ )
+ .await
}
/// Get lenght of resync queue
@@ -485,50 +491,107 @@ impl BlockManager {
Ok(())
}
- fn repair_aux_read_dir_rec<'a>(
+ async fn for_each_file<F, Fut, State>(
+ &self,
+ state: State,
+ mut f: F,
+ must_exit: &watch::Receiver<bool>,
+ ) -> Result<(), Error>
+ where
+ F: FnMut(State, Hash) -> Fut + Send,
+ Fut: Future<Output = Result<State, Error>> + Send,
+ State: Send,
+ {
+ self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
+ .await
+ .map(|_| ())
+ }
+
+ fn for_each_file_rec<'a, F, Fut, State>(
&'a self,
path: &'a Path,
+ mut state: State,
+ f: &'a mut F,
must_exit: &'a watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<(), Error>> {
- // Lists all blocks on disk and adds them to the resync queue.
- // This allows us to find blocks we are storing but don't actually need,
- // so that we can offload them if necessary and then delete them locally.
+ ) -> BoxFuture<'a, Result<State, Error>>
+ where
+ F: FnMut(State, Hash) -> Fut + Send,
+ Fut: Future<Output = Result<State, Error>> + Send,
+ State: Send + 'a,
+ {
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
- loop {
- let data_dir_ent = ls_data_dir.next_entry().await?;
- let data_dir_ent = match data_dir_ent {
- Some(x) => x,
- None => break,
- };
+ while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
+ if *must_exit.borrow() {
+ break;
+ }
+
let name = data_dir_ent.file_name();
- let name = match name.into_string() {
- Ok(x) => x,
- Err(_) => continue,
+ let name = if let Ok(n) = name.into_string() {
+ n
+ } else {
+ continue;
};
let ent_type = data_dir_ent.file_type().await?;
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
+ state = self
+ .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
.await?;
} else if name.len() == 64 {
- let hash_bytes = match hex::decode(&name) {
- Ok(h) => h,
- Err(_) => continue,
+ let hash_bytes = if let Ok(h) = hex::decode(&name) {
+ h
+ } else {
+ continue;
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
- self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
- }
-
- if *must_exit.borrow() {
- break;
+ state = f(state, hash.into()).await?;
}
}
- Ok(())
+ Ok(state)
}
.boxed()
}
+
+ /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
+ /// this function.
+ pub async fn verify_data_store_integrity(
+ &self,
+ must_exit: &watch::Receiver<bool>,
+ speed_limit: Option<usize>,
+ ) -> Result<(), Error> {
+ let last_refill = Instant::now();
+ let token_left = speed_limit.unwrap_or(0);
+ self.for_each_file(
+ (last_refill, token_left),
+ move |(last_refill, token_left), hash| {
+ async move {
+ let len = match self.read_block(&hash).await {
+ Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
+ Ok(_) => unreachable!(),
+ Err(_) => 0, // resync and warn message made by read_block if necessary
+ };
+
+ if let Some(speed_limit) = speed_limit {
+ // throttling logic
+ if let Some(t) = token_left.checked_sub(len) {
+ // token bucket not empty yet
+ Ok((last_refill, t))
+ } else {
+ // token bucket empty. Sleep and refill
+ tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
+ Ok((Instant::now(), speed_limit))
+ }
+ } else {
+ Ok((last_refill, token_left)) // actually not used
+ }
+ }
+ },
+ must_exit,
+ )
+ .await
+ }
}
#[async_trait]
@@ -598,7 +661,7 @@ impl BlockManagerLocked {
);
let path = mgr.block_path(hash);
let mut path2 = path.clone();
- path2.set_extension(".corrupted");
+ path2.set_extension("corrupted");
fs::rename(path, path2).await?;
mgr.put_to_resync(hash, Duration::from_millis(0))?;
Ok(())