From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/garage/repair.rs | 163 --------------------------------------------------- 1 file changed, 163 deletions(-) delete mode 100644 src/garage/repair.rs (limited to 'src/garage/repair.rs') diff --git a/src/garage/repair.rs b/src/garage/repair.rs deleted file mode 100644 index 17e14b8b..00000000 --- a/src/garage/repair.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::watch; - -use garage_model::garage::Garage; -use garage_model::s3::block_ref_table::*; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; -use garage_table::*; -use garage_util::error::Error; - -use crate::*; - -pub struct Repair { - pub garage: Arc, -} - -impl Repair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); - } - } - - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } - } - Ok(()) - } - - async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; - - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); - } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; - } - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) - .await?; - } - } - info!("repair_versions: finished, done {}", i); - Ok(()) - } - - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; - - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } - - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; - } - let version = self - .garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; - } - } - info!("repair_block_ref: finished, done {}", i); - Ok(()) - } -} -- cgit v1.2.3