diff options
author | Alex Auvolat <alex@adnab.me> | 2023-05-04 10:45:44 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-06-09 16:23:37 +0200 |
commit | 4ea53dc75930d813b84b79c3427b194b6e664ce7 (patch) | |
tree | 7bdd1e4b13c8dcb13fec64d1a7fb319386d29645 /src/garage/repair/online.rs | |
parent | 058518c22b701d5d2dc3e838518d88ce9a4cc875 (diff) | |
download | garage-4ea53dc75930d813b84b79c3427b194b6e664ce7.tar.gz garage-4ea53dc75930d813b84b79c3427b194b6e664ce7.zip |
Add multipart upload repair
Diffstat (limited to 'src/garage/repair/online.rs')
-rw-r--r-- | src/garage/repair/online.rs | 102 |
1 files changed, 72 insertions, 30 deletions
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6d8a91fe..9de6166d 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -8,6 +8,7 @@ use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -38,6 +39,10 @@ pub async fn launch_online_repair( info!("Repairing the versions table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); @@ -162,25 +167,26 @@ impl TableRepair for RepairVersions { async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> { if !version.deleted.get() { - let version_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => { - let object = garage.object_table.get(&bucket_id, &key).await?; - match object { - Some(o) => o.versions().iter().any(|x| { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(&bucket_id, &key) + .await? + .map(|o| { + o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }), - None => false, - } - } - VersionBacklink::MultipartUpload { upload_id } => { - let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; - match mpu { - Some(u) => !u.deleted.get(), - None => false, - } - } + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(&upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), }; - if !version_exists { + + if !ref_exists { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table @@ -206,27 +212,63 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result<bool, Error> { + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> { if !block_ref.deleted.get() { - let version = garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.mpu_table + } + + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; return Ok(true); } } |