diff options
Diffstat (limited to 'src/store/repair.rs')
-rw-r--r-- | src/store/repair.rs | 184 |
1 files changed, 0 insertions, 184 deletions
diff --git a/src/store/repair.rs b/src/store/repair.rs deleted file mode 100644 index 39c57fc1..00000000 --- a/src/store/repair.rs +++ /dev/null @@ -1,184 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::watch; - -use crate::error::Error; -use crate::server::Garage; -use crate::table::*; - -use crate::store::block_ref_table::*; -use crate::store::version_table::*; - -use crate::*; - -pub struct Repair { - pub garage: Arc<Garage>, -} - -impl Repair { - pub async fn repair_worker( - &self, - opt: RepairOpt, - must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); - - if todo(RepairWhat::Tables) { - info!("Launching a full sync of tables"); - self.garage - .bucket_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .object_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .version_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .block_ref_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - } - - // TODO: wait for full sync to finish before proceeding to the rest? - - if todo(RepairWhat::Versions) { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - - if todo(RepairWhat::BlockRefs) { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - - if opt.what.is_none() { - info!("Repairing the RC"); - self.repair_rc(&must_exit).await?; - } - - if todo(RepairWhat::Blocks) { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - - Ok(()) - } - - async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { - pos = item_key.to_vec(); - - let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; - if version.deleted { - continue; - } - let object = self - .garage - .object_table - .get(&version.bucket, &version.key) - .await?; - let version_exists = match object { - Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), - None => { - warn!( - "Repair versions: object for version {:?} not found", - version - ); - false - } - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket, - version.key, - true, - vec![], - )) - .await?; - } - - if *must_exit.borrow() { - break; - } - } - Ok(()) - } - - async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { - pos = item_key.to_vec(); - - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; - if block_ref.deleted { - continue; - } - let version = self - .garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await?; - let ref_exists = match version { - Some(v) => !v.deleted, - None => { - warn!( - "Block ref repair: version for block ref {:?} not found", - block_ref - ); - false - } - }; - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true, - }) - .await?; - } - - if *must_exit.borrow() { - break; - } - } - Ok(()) - } - - async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - // TODO - warn!("repair_rc: not implemented"); - Ok(()) - } -} |