aboutsummaryrefslogtreecommitdiff
path: root/src/store
diff options
context:
space:
mode:
Diffstat (limited to 'src/store')
-rw-r--r--src/store/key_table.rs3
-rw-r--r--src/store/mod.rs1
-rw-r--r--src/store/repair.rs184
3 files changed, 187 insertions, 1 deletions
diff --git a/src/store/key_table.rs b/src/store/key_table.rs
index 7476622f..6c3f96d6 100644
--- a/src/store/key_table.rs
+++ b/src/store/key_table.rs
@@ -30,7 +30,8 @@ impl Key {
authorized_buckets: vec![],
};
for b in buckets {
- ret.add_bucket(b);
+ ret.add_bucket(b)
+ .expect("Duplicate AllowedBucket in Key constructor");
}
ret
}
diff --git a/src/store/mod.rs b/src/store/mod.rs
index b6a8dc46..962264c4 100644
--- a/src/store/mod.rs
+++ b/src/store/mod.rs
@@ -3,4 +3,5 @@ pub mod block_ref_table;
pub mod bucket_table;
pub mod key_table;
pub mod object_table;
+pub mod repair;
pub mod version_table;
diff --git a/src/store/repair.rs b/src/store/repair.rs
new file mode 100644
index 00000000..39c57fc1
--- /dev/null
+++ b/src/store/repair.rs
@@ -0,0 +1,184 @@
+use std::sync::Arc;
+
+use tokio::sync::watch;
+
+use crate::error::Error;
+use crate::server::Garage;
+use crate::table::*;
+
+use crate::store::block_ref_table::*;
+use crate::store::version_table::*;
+
+use crate::*;
+
+pub struct Repair {
+ pub garage: Arc<Garage>,
+}
+
+impl Repair {
+ pub async fn repair_worker(
+ &self,
+ opt: RepairOpt,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
+
+ if todo(RepairWhat::Tables) {
+ info!("Launching a full sync of tables");
+ self.garage
+ .bucket_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ self.garage
+ .object_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ self.garage
+ .version_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ self.garage
+ .block_ref_table
+ .syncer
+ .load_full()
+ .unwrap()
+ .add_full_scan()
+ .await;
+ }
+
+ // TODO: wait for full sync to finish before proceeding to the rest?
+
+ if todo(RepairWhat::Versions) {
+ info!("Repairing the versions table");
+ self.repair_versions(&must_exit).await?;
+ }
+
+ if todo(RepairWhat::BlockRefs) {
+ info!("Repairing the block refs table");
+ self.repair_block_ref(&must_exit).await?;
+ }
+
+ if opt.what.is_none() {
+ info!("Repairing the RC");
+ self.repair_rc(&must_exit).await?;
+ }
+
+ if todo(RepairWhat::Blocks) {
+ info!("Repairing the stored blocks");
+ self.garage
+ .block_manager
+ .repair_data_store(&must_exit)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ let mut pos = vec![];
+
+ while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
+ pos = item_key.to_vec();
+
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
+ if version.deleted {
+ continue;
+ }
+ let object = self
+ .garage
+ .object_table
+ .get(&version.bucket, &version.key)
+ .await?;
+ let version_exists = match object {
+ Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid),
+ None => {
+ warn!(
+ "Repair versions: object for version {:?} not found",
+ version
+ );
+ false
+ }
+ };
+ if !version_exists {
+ info!("Repair versions: marking version as deleted: {:?}", version);
+ self.garage
+ .version_table
+ .insert(&Version::new(
+ version.uuid,
+ version.bucket,
+ version.key,
+ true,
+ vec![],
+ ))
+ .await?;
+ }
+
+ if *must_exit.borrow() {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ let mut pos = vec![];
+
+ while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
+ pos = item_key.to_vec();
+
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
+ if block_ref.deleted {
+ continue;
+ }
+ let version = self
+ .garage
+ .version_table
+ .get(&block_ref.version, &EmptyKey)
+ .await?;
+ let ref_exists = match version {
+ Some(v) => !v.deleted,
+ None => {
+ warn!(
+ "Block ref repair: version for block ref {:?} not found",
+ block_ref
+ );
+ false
+ }
+ };
+ if !ref_exists {
+ info!(
+ "Repair block ref: marking block_ref as deleted: {:?}",
+ block_ref
+ );
+ self.garage
+ .block_ref_table
+ .insert(&BlockRef {
+ block: block_ref.block,
+ version: block_ref.version,
+ deleted: true,
+ })
+ .await?;
+ }
+
+ if *must_exit.borrow() {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ // TODO
+ warn!("repair_rc: not implemented");
+ Ok(())
+ }
+}