aboutsummaryrefslogtreecommitdiff
path: root/src/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-19 20:36:36 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-19 20:36:36 +0000
commit5ae32972efaba357ecc0027fe852d710b16b6d0e (patch)
treec49938469ac6f01c1501a79f96260269a410b739 /src/block.rs
parenta54f3158f12cbc69107b0a65af6c2e56fda5b2d7 (diff)
downloadgarage-5ae32972efaba357ecc0027fe852d710b16b6d0e.tar.gz
garage-5ae32972efaba357ecc0027fe852d710b16b6d0e.zip
Implement repair command
Diffstat (limited to 'src/block.rs')
-rw-r--r--src/block.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/src/block.rs b/src/block.rs
index 489dc33e..4ad74d76 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -17,6 +17,7 @@ use crate::membership::System;
use crate::rpc_client::*;
use crate::rpc_server::*;
+use crate::block_ref_table::*;
use crate::server::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
@@ -356,6 +357,89 @@ impl BlockManager {
.await?;
Ok(())
}
+
+ pub async fn launch_repair(self: &Arc<Self>) -> Result<(), Error> {
+ let self2 = self.clone();
+ self.system
+ .background
+ .spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await })
+ .await;
+ Ok(())
+ }
+
+ pub async fn repair_worker(
+ self: Arc<Self>,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ // 1. Repair blocks from RC table
+ let garage = self.garage.load_full().unwrap();
+ let mut last_hash = None;
+ let mut i = 0usize;
+ for entry in garage.block_ref_table.store.iter() {
+ let (_k, v_bytes) = entry?;
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
+ if Some(&block_ref.block) == last_hash.as_ref() {
+ continue;
+ }
+ if !block_ref.deleted {
+ last_hash = Some(block_ref.block.clone());
+ self.put_to_resync(&block_ref.block, 0)?;
+ }
+ i += 1;
+ if i & 0xFF == 0 && *must_exit.borrow() {
+ return Ok(());
+ }
+ }
+
+ // 2. Repair blocks actually on disk
+ let mut ls_data_dir = fs::read_dir(&self.data_dir).await?;
+ while let Some(data_dir_ent) = ls_data_dir.next().await {
+ let data_dir_ent = data_dir_ent?;
+ let dir_name = data_dir_ent.file_name();
+ let dir_name = match dir_name.into_string() {
+ Ok(x) => x,
+ Err(_) => continue,
+ };
+ if dir_name.len() != 2 || hex::decode(&dir_name).is_err() {
+ continue;
+ }
+
+ let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
+ Err(e) => {
+ eprintln!(
+ "Warning: could not list dir {:?}: {}",
+ data_dir_ent.path().to_str(),
+ e
+ );
+ continue;
+ }
+ Ok(x) => x,
+ };
+ while let Some(file) = ls_data_dir_2.next().await {
+ let file = file?;
+ let file_name = file.file_name();
+ let file_name = match file_name.into_string() {
+ Ok(x) => x,
+ Err(_) => continue,
+ };
+ if file_name.len() != 64 {
+ continue;
+ }
+ let hash_bytes = match hex::decode(&file_name) {
+ Ok(h) => h,
+ Err(_) => continue,
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ self.put_to_resync(&hash.into(), 0)?;
+
+ if *must_exit.borrow() {
+ return Ok(());
+ }
+ }
+ }
+ Ok(())
+ }
}
fn u64_from_bytes(bytes: &[u8]) -> u64 {