From 75a0e013725f984077a6d0fe85138afee82cebcc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 19:21:35 +0200 Subject: fix online repair --- src/garage/repair/online.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) (limited to 'src/garage/repair') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 0e14ed51..9b6b3cad 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -107,28 +107,29 @@ impl Worker for RepairVersionsWorker { let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; if !version.deleted.get() { - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, + let version_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => { + let object = self.garage.object_table.get(&bucket_id, &key).await?; + match object { + Some(o) => o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }), + None => false, + } + } + VersionBacklink::MultipartUpload { upload_id } => { + let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + match mpu { + Some(u) => !u.deleted.get(), + None => false, + } + } }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); self.garage .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) + .insert(&Version::new(version.uuid, version.backlink, true)) .await?; } } -- cgit v1.2.3 From 058518c22b701d5d2dc3e838518d88ce9a4cc875 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:36:48 +0200 Subject: refactor repair workers with a trait --- src/garage/repair/online.rs | 149 ++++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 68 deletions(-) (limited to 'src/garage/repair') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9b6b3cad..6d8a91fe 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,15 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +36,11 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,49 +71,100 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table; + + async fn process( + &mut self, + garage: &Garage, + entry: <::T as TableSchema>::E, + ) -> Result; +} + +struct TableRepairWorker { garage: Arc, pos: Vec, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc) -> Self { +impl TableRepairWorker { + fn new(garage: Arc, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl Worker for TableRepairWorker { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; + let entry = ::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { let version_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => { - let object = self.garage.object_table.get(&bucket_id, &key).await?; + let object = garage.object_table.get(&bucket_id, &key).await?; match object { Some(o) => o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted @@ -118,7 +173,7 @@ impl Worker for RepairVersionsWorker { } } VersionBacklink::MultipartUpload { upload_id } => { - let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; match mpu { Some(u) => !u.deleted.get(), None => false, @@ -127,69 +182,33 @@ impl Worker for RepairVersionsWorker { }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); - self.garage + garage .version_table .insert(&Version::new(version.uuid, version.backlink, true)) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } // ---- -struct RepairBlockrefsWorker { - garage: Arc, - pos: Vec, - counter: usize, -} - -impl RepairBlockrefsWorker { - fn new(garage: Arc) -> Self { - Self { - garage, - pos: vec![], - counter: 0, - } - } -} +struct RepairBlockRefs; #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() - } +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } + fn table(garage: &Garage) -> &Table { + &garage.block_ref_table } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = self - .garage + let version = garage .version_table .get(&block_ref.version, &EmptyKey) .await?; @@ -200,7 +219,7 @@ impl Worker for RepairBlockrefsWorker { "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage + garage .block_ref_table .insert(&BlockRef { block: block_ref.block, @@ -208,16 +227,10 @@ impl Worker for RepairBlockrefsWorker { deleted: true.into(), }) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } -- cgit v1.2.3 From 4ea53dc75930d813b84b79c3427b194b6e664ce7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:45:44 +0200 Subject: Add multipart upload repair --- src/garage/repair/online.rs | 102 +++++++++++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 30 deletions(-) (limited to 'src/garage/repair') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6d8a91fe..9de6166d 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -8,6 +8,7 @@ use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -38,6 +39,10 @@ pub async fn launch_online_repair( info!("Repairing the versions table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); @@ -162,25 +167,26 @@ impl TableRepair for RepairVersions { async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { - let version_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => { - let object = garage.object_table.get(&bucket_id, &key).await?; - match object { - Some(o) => o.versions().iter().any(|x| { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(&bucket_id, &key) + .await? + .map(|o| { + o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }), - None => false, - } - } - VersionBacklink::MultipartUpload { upload_id } => { - let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; - match mpu { - Some(u) => !u.deleted.get(), - None => false, - } - } + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(&upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), }; - if !version_exists { + + if !ref_exists { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table @@ -206,27 +212,63 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table { + &garage.mpu_table + } + + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; return Ok(true); } } -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/garage/repair/online.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/garage/repair') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9de6166d..abfaf9f9 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -170,7 +170,7 @@ impl TableRepair for RepairVersions { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage .object_table - .get(&bucket_id, &key) + .get(bucket_id, key) .await? .map(|o| { o.versions().iter().any(|x| { @@ -180,7 +180,7 @@ impl TableRepair for RepairVersions { .unwrap_or(false), VersionBacklink::MultipartUpload { upload_id } => garage .mpu_table - .get(&upload_id, &EmptyKey) + .get(upload_id, &EmptyKey) .await? .map(|u| !u.deleted.get()) .unwrap_or(false), -- cgit v1.2.3