aboutsummaryrefslogtreecommitdiff
path: root/src/store/repair.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-23 18:36:12 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-23 18:36:12 +0000
commit4ef84a0558c0bf6641094e762ede0c962781204d (patch)
tree8e6014ee12541c7085b371e99941c5c3da53f2b0 /src/store/repair.rs
parent44a1089d9569b442c098c2ceebb3f691816e52d2 (diff)
downloadgarage-4ef84a0558c0bf6641094e762ede0c962781204d.tar.gz
garage-4ef84a0558c0bf6641094e762ede0c962781204d.zip
Move repair to separate file
Diffstat (limited to 'src/store/repair.rs')
-rw-r--r--src/store/repair.rs184
1 files changed, 184 insertions, 0 deletions
diff --git a/src/store/repair.rs b/src/store/repair.rs
new file mode 100644
index 00000000..39c57fc1
--- /dev/null
+++ b/src/store/repair.rs
@@ -0,0 +1,184 @@
+use std::sync::Arc;
+
+use tokio::sync::watch;
+
+use crate::error::Error;
+use crate::server::Garage;
+use crate::table::*;
+
+use crate::store::block_ref_table::*;
+use crate::store::version_table::*;
+
+use crate::*;
+
+pub struct Repair {
+ pub garage: Arc<Garage>,
+}
+
+impl Repair {
+ pub async fn repair_worker(
+ &self,
+ opt: RepairOpt,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
+
+ if todo(RepairWhat::Tables) {
+ info!("Launching a full sync of tables");
+ self.garage
+ .bucket_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ self.garage
+ .object_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ self.garage
+ .version_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ self.garage
+ .block_ref_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ }
+
+ // TODO: wait for full sync to finish before proceeding to the rest?
+
+ if todo(RepairWhat::Versions) {
+ info!("Repairing the versions table");
+ self.repair_versions(&must_exit).await?;
+ }
+
+ if todo(RepairWhat::BlockRefs) {
+ info!("Repairing the block refs table");
+ self.repair_block_ref(&must_exit).await?;
+ }
+
+ if opt.what.is_none() {
+ info!("Repairing the RC");
+ self.repair_rc(&must_exit).await?;
+ }
+
+ if todo(RepairWhat::Blocks) {
+ info!("Repairing the stored blocks");
+ self.garage
+ .block_manager
+ .repair_data_store(&must_exit)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ let mut pos = vec![];
+
+ while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
+ pos = item_key.to_vec();
+
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
+ if version.deleted {
+ continue;
+ }
+ let object = self
+ .garage
+ .object_table
+ .get(&version.bucket, &version.key)
+ .await?;
+ let version_exists = match object {
+ Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid),
+ None => {
+ warn!(
+ "Repair versions: object for version {:?} not found",
+ version
+ );
+ false
+ }
+ };
+ if !version_exists {
+ info!("Repair versions: marking version as deleted: {:?}", version);
+ self.garage
+ .version_table
+ .insert(&Version::new(
+ version.uuid,
+ version.bucket,
+ version.key,
+ true,
+ vec![],
+ ))
+ .await?;
+ }
+
+ if *must_exit.borrow() {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ let mut pos = vec![];
+
+ while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
+ pos = item_key.to_vec();
+
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
+ if block_ref.deleted {
+ continue;
+ }
+ let version = self
+ .garage
+ .version_table
+ .get(&block_ref.version, &EmptyKey)
+ .await?;
+ let ref_exists = match version {
+ Some(v) => !v.deleted,
+ None => {
+ warn!(
+ "Block ref repair: version for block ref {:?} not found",
+ block_ref
+ );
+ false
+ }
+ };
+ if !ref_exists {
+ info!(
+ "Repair block ref: marking block_ref as deleted: {:?}",
+ block_ref
+ );
+ self.garage
+ .block_ref_table
+ .insert(&BlockRef {
+ block: block_ref.block,
+ version: block_ref.version,
+ deleted: true,
+ })
+ .await?;
+ }
+
+ if *must_exit.borrow() {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ // TODO
+ warn!("repair_rc: not implemented");
+ Ok(())
+ }
+}