From 87be8eeb930f37e7ebc23037eecf7f79f173434a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 16:17:40 +0200 Subject: updaet block admin for new multipartupload models --- src/garage/admin/block.rs | 113 ++++++++++++++++++++++++++++++---------------- src/garage/admin/mod.rs | 2 + src/garage/cli/cmd.rs | 3 +- src/garage/cli/util.rs | 43 +++++++++++++----- 4 files changed, 109 insertions(+), 52 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index e9e3ff96..2d84b5cf 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -34,6 +34,7 @@ impl AdminRpcHandler { .get_range(&hash, None, None, 10000, Default::default()) .await?; let mut versions = vec![]; + let mut uploads = vec![]; for br in block_refs { if let Some(v) = self .garage @@ -41,6 +42,11 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { + if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { + if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + uploads.push(u); + } + } versions.push(Ok(v)); } else { versions.push(Err(br.version)); @@ -50,6 +56,7 @@ impl AdminRpcHandler { hash, refcount, versions, + uploads, }) } @@ -93,6 +100,7 @@ impl AdminRpcHandler { } let mut obj_dels = 0; + let mut mpu_dels = 0; let mut ver_dels = 0; for hash in blocks { @@ -105,56 +113,81 @@ impl AdminRpcHandler { .await?; for br in block_refs { - let version = match self + if let Some(version) = self .garage .version_table .get(&br.version, &EmptyKey) .await? { - Some(v) => v, - None => continue, - }; + self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?; - if let Some(object) = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == br.version { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - version.bucket_id, - version.key.clone(), - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - obj_dels += 1; - } - } - } + if !version.deleted.get() { + let deleted_version = + Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.bucket_id, version.key.clone(), true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } Ok(AdminRpc::Ok(format!( - "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), + ver_dels, obj_dels, - ver_dels + mpu_dels, ))) + } + + async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object{bucket_id, key} => { + (*bucket_id, key.clone(), version.uuid) + } + VersionBacklink::MultipartUpload{upload_id} => { + if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self + .garage + .object_table + .get(&bucket_id, &key) + .await? + { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete( + ObjectVersionData::DeleteMarker, + ), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) } + } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 93f6dd08..07b0012d 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -27,6 +27,7 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; use crate::cli::*; @@ -66,6 +67,7 @@ pub enum AdminRpc { hash: Hash, refcount: u64, versions: Vec>, + uploads: Vec, }, } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 905b14d3..fb77a927 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -215,8 +215,9 @@ pub async fn cmd_admin( hash, refcount, versions, + uploads, } => { - print_block_info(hash, refcount, versions); + print_block_info(hash, refcount, versions, uploads); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 2c6be2f4..22a3442d 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; -use garage_model::s3::version_table::Version; +use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -385,29 +386,49 @@ pub fn print_block_error_list(el: Vec) { format_table(table); } -pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec>) { +pub fn print_block_info( + hash: Hash, + refcount: u64, + versions: Vec>, + uploads: Vec, +) { println!("Block hash: {}", hex::encode(hash.as_slice())); println!("Refcount: {}", refcount); println!(); - let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; let mut nondeleted_count = 0; for v in versions.iter() { match v { Ok(ver) => { - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}", - ver.uuid, - ver.bucket_id, - ver.key, - ver.deleted.get() - )); + match &ver.backlink { + VersionBacklink::Object { bucket_id, key } => { + table.push(format!( + "{:?}\t{:?}\t{}\t\t{:?}", + ver.uuid, + bucket_id, + key, + ver.deleted.get() + )); + } + VersionBacklink::MultipartUpload { upload_id } => { + let upload = uploads.iter().find(|x| x.upload_id == *upload_id); + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}\t{:?}", + ver.uuid, + upload.map(|u| u.bucket_id).unwrap_or_default(), + upload.map(|u| u.key.as_str()).unwrap_or_default(), + upload_id, + ver.deleted.get() + )); + } + } if !ver.deleted.get() { nondeleted_count += 1; } } Err(vh) => { - table.push(format!("{:?}\t\t\tyes", vh)); + table.push(format!("{:?}\t\t\t\tyes", vh)); } } } -- cgit v1.2.3 From bb176ebcb87ea77b07480b696c9e87be92136c70 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 16:43:36 +0200 Subject: cargo fmt --- src/garage/admin/block.rs | 123 +++++++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 62 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index 2d84b5cf..6c2649e0 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -119,75 +119,74 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { - self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?; - - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.backlink, true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } - } + self.handle_block_purge_version_backlink( + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; + + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } Ok(AdminRpc::Ok(format!( "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), ver_dels, obj_dels, - mpu_dels, + mpu_dels, ))) - } - - async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> { - let (bucket_id, key, ov_id) = match &version.backlink { - VersionBacklink::Object{bucket_id, key} => { - (*bucket_id, key.clone(), version.uuid) - } - VersionBacklink::MultipartUpload{upload_id} => { - if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { - if !mpu.deleted.get() { - mpu.parts.clear(); - mpu.deleted.set(); - self.garage.mpu_table.insert(&mpu).await?; - *mpu_dels += 1; - } - (mpu.bucket_id, mpu.key.clone(), *upload_id) - } else { - return Ok(()); - } - } - }; - - if let Some(object) = self - .garage - .object_table - .get(&bucket_id, &key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == ov_id { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - bucket_id, - key, - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - *obj_dels += 1; - } - } - } - - Ok(()) } + async fn handle_block_purge_version_backlink( + &self, + version: &Version, + obj_dels: &mut usize, + mpu_dels: &mut usize, + ) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) + } } -- cgit v1.2.3 From 75a0e013725f984077a6d0fe85138afee82cebcc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 19:21:35 +0200 Subject: fix online repair --- src/garage/repair/online.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) (limited to 'src/garage') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 0e14ed51..9b6b3cad 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -107,28 +107,29 @@ impl Worker for RepairVersionsWorker { let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; if !version.deleted.get() { - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, + let version_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => { + let object = self.garage.object_table.get(&bucket_id, &key).await?; + match object { + Some(o) => o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }), + None => false, + } + } + VersionBacklink::MultipartUpload { upload_id } => { + let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + match mpu { + Some(u) => !u.deleted.get(), + None => false, + } + } }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); self.garage .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) + .insert(&Version::new(version.uuid, version.backlink, true)) .await?; } } -- cgit v1.2.3 From 8644376ac2dd8015e9212c19f30df63811426e1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:09:52 +0200 Subject: fix test; simplify code --- src/garage/tests/s3/multipart.rs | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) (limited to 'src/garage') diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index 895a2993..ee1373cc 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -65,7 +65,8 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 1); - let fp = ps.iter().find(|x| x.part_number == 2).unwrap(); + assert_eq!(ps[0].part_number, 2); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), @@ -100,13 +101,24 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 2); - let fp = ps.iter().find(|x| x.part_number == 1).unwrap(); + + assert_eq!(ps[0].part_number, 1); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3c484266f9315485694556e6c693bfa2\"" ); assert_eq!(fp.size, SZ_5MB as i64); + + assert_eq!(ps[1].part_number, 2); + let fp = &ps[1]; + assert!(fp.last_modified.is_some()); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + assert_eq!(fp.size, SZ_5MB as i64); } { @@ -123,12 +135,19 @@ async fn test_uploadlistpart() { .unwrap(); assert!(r.part_number_marker.is_none()); - assert!(r.next_part_number_marker.is_some()); + assert_eq!(r.next_part_number_marker.as_deref(), Some("1")); assert_eq!(r.max_parts, 1_i32); assert!(r.is_truncated); assert_eq!(r.key.unwrap(), "a"); assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r.parts.unwrap().len(), 1); + let parts = r.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 1); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3c484266f9315485694556e6c693bfa2\"" + ); let r2 = ctx .client @@ -147,10 +166,18 @@ async fn test_uploadlistpart() { r.next_part_number_marker.as_ref().unwrap() ); assert_eq!(r2.max_parts, 1_i32); - assert!(r2.is_truncated); assert_eq!(r2.key.unwrap(), "a"); assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r2.parts.unwrap().len(), 1); + let parts = r2.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 2); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + //assert!(r2.is_truncated); // WHY? (this was the test before) + assert!(!r2.is_truncated); } let cmp = CompletedMultipartUpload::builder() -- cgit v1.2.3 From 058518c22b701d5d2dc3e838518d88ce9a4cc875 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:36:48 +0200 Subject: refactor repair workers with a trait --- src/garage/repair/online.rs | 149 ++++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 68 deletions(-) (limited to 'src/garage') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9b6b3cad..6d8a91fe 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,15 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +36,11 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,49 +71,100 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table; + + async fn process( + &mut self, + garage: &Garage, + entry: <::T as TableSchema>::E, + ) -> Result; +} + +struct TableRepairWorker { garage: Arc, pos: Vec, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc) -> Self { +impl TableRepairWorker { + fn new(garage: Arc, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl Worker for TableRepairWorker { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; + let entry = ::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { let version_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => { - let object = self.garage.object_table.get(&bucket_id, &key).await?; + let object = garage.object_table.get(&bucket_id, &key).await?; match object { Some(o) => o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted @@ -118,7 +173,7 @@ impl Worker for RepairVersionsWorker { } } VersionBacklink::MultipartUpload { upload_id } => { - let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; match mpu { Some(u) => !u.deleted.get(), None => false, @@ -127,69 +182,33 @@ impl Worker for RepairVersionsWorker { }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); - self.garage + garage .version_table .insert(&Version::new(version.uuid, version.backlink, true)) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } // ---- -struct RepairBlockrefsWorker { - garage: Arc, - pos: Vec, - counter: usize, -} - -impl RepairBlockrefsWorker { - fn new(garage: Arc) -> Self { - Self { - garage, - pos: vec![], - counter: 0, - } - } -} +struct RepairBlockRefs; #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() - } +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } + fn table(garage: &Garage) -> &Table { + &garage.block_ref_table } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = self - .garage + let version = garage .version_table .get(&block_ref.version, &EmptyKey) .await?; @@ -200,7 +219,7 @@ impl Worker for RepairBlockrefsWorker { "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage + garage .block_ref_table .insert(&BlockRef { block: block_ref.block, @@ -208,16 +227,10 @@ impl Worker for RepairBlockrefsWorker { deleted: true.into(), }) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } -- cgit v1.2.3 From 4ea53dc75930d813b84b79c3427b194b6e664ce7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:45:44 +0200 Subject: Add multipart upload repair --- src/garage/cli/structs.rs | 3 ++ src/garage/repair/online.rs | 102 +++++++++++++++++++++++++++++++------------- 2 files changed, 75 insertions(+), 30 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 986592ae..6444d374 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -452,6 +452,9 @@ pub enum RepairWhat { /// Only redo the propagation of object deletions to the version table (slow) #[structopt(name = "versions", version = garage_version())] Versions, + /// Only redo the propagation of object deletions to the multipart upload table (slow) + #[structopt(name = "mpu", version = garage_version())] + MultipartUploads, /// Only redo the propagation of version deletions to the block ref table (extremely slow) #[structopt(name = "block_refs", version = garage_version())] BlockRefs, diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6d8a91fe..9de6166d 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -8,6 +8,7 @@ use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -38,6 +39,10 @@ pub async fn launch_online_repair( info!("Repairing the versions table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); @@ -162,25 +167,26 @@ impl TableRepair for RepairVersions { async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { - let version_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => { - let object = garage.object_table.get(&bucket_id, &key).await?; - match object { - Some(o) => o.versions().iter().any(|x| { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(&bucket_id, &key) + .await? + .map(|o| { + o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }), - None => false, - } - } - VersionBacklink::MultipartUpload { upload_id } => { - let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; - match mpu { - Some(u) => !u.deleted.get(), - None => false, - } - } + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(&upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), }; - if !version_exists { + + if !ref_exists { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table @@ -206,27 +212,63 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table { + &garage.mpu_table + } + + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; return Ok(true); } } -- cgit v1.2.3 From 511e07ecd489fa72040171fe908323873a57ac19 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 11:49:23 +0200 Subject: fix mpu counter (add missing workers) and report info at appropriate places --- src/garage/admin/bucket.rs | 10 ++++++++++ src/garage/admin/mod.rs | 1 + src/garage/cli/cmd.rs | 3 ++- src/garage/cli/structs.rs | 12 ++++++------ src/garage/cli/util.rs | 25 +++++++++++++++++-------- 5 files changed, 36 insertions(+), 15 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 11bb8730..0781cb8b 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -73,6 +73,15 @@ impl AdminRpcHandler { .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) .unwrap_or_default(); + let mpu_counters = self + .garage + .mpu_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -112,6 +121,7 @@ impl AdminRpcHandler { bucket, relevant_keys, counters, + mpu_counters, }) } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 07b0012d..33c21eba 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -53,6 +53,7 @@ pub enum AdminRpc { bucket: Bucket, relevant_keys: HashMap, counters: HashMap, + mpu_counters: HashMap, }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap), diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index fb77a927..045f050c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -190,8 +190,9 @@ pub async fn cmd_admin( bucket, relevant_keys, counters, + mpu_counters, } => { - print_bucket_info(&bucket, &relevant_keys, &counters); + print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 6444d374..5dc99a0d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -443,22 +443,22 @@ pub struct RepairOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { - /// Only do a full sync of metadata tables + /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] Tables, - /// Only repair (resync/rebalance) the set of stored blocks + /// Repair (resync/rebalance) the set of stored blocks #[structopt(name = "blocks", version = garage_version())] Blocks, - /// Only redo the propagation of object deletions to the version table (slow) + /// Repropagate object deletions to the version table #[structopt(name = "versions", version = garage_version())] Versions, - /// Only redo the propagation of object deletions to the multipart upload table (slow) + /// Repropagate object deletions to the multipart upload table #[structopt(name = "mpu", version = garage_version())] MultipartUploads, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) + /// Repropagate version deletions to the block ref table #[structopt(name = "block_refs", version = garage_version())] BlockRefs, - /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) + /// Verify integrity of all blocks on disc #[structopt(name = "scrub", version = garage_version())] Scrub { #[structopt(subcommand)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 22a3442d..d87f9eab 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,8 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; +use garage_model::s3::mpu_table::{self, MultipartUpload}; +use garage_model::s3::object_table; use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -136,6 +136,7 @@ pub fn print_bucket_info( bucket: &Bucket, relevant_keys: &HashMap, counters: &HashMap, + mpu_counters: &HashMap, ) { let key_name = |k| { relevant_keys @@ -149,7 +150,7 @@ pub fn print_bucket_info( Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { let size = - bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64); println!( "\nSize: {} ({})", size.to_string_as(true), @@ -157,14 +158,22 @@ pub fn print_bucket_info( ); println!( "Objects: {}", - counters.get(OBJECTS).cloned().unwrap_or_default() + *counters.get(object_table::OBJECTS).unwrap_or(&0) + ); + println!( + "Unfinished uploads (multipart and non-multipart): {}", + *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0) ); println!( "Unfinished multipart uploads: {}", - counters - .get(UNFINISHED_UPLOADS) - .cloned() - .unwrap_or_default() + *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0) + ); + let mpu_size = + bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64); + println!( + "Size of unfinished multipart uploads: {} ({})", + mpu_size.to_string_as(true), + mpu_size.to_string_as(false), ); println!("\nWebsite access: {}", p.website_config.get().is_some()); -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/garage/admin/block.rs | 2 +- src/garage/repair/online.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index 6c2649e0..c4a45738 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -153,7 +153,7 @@ impl AdminRpcHandler { let (bucket_id, key, ov_id) = match &version.backlink { VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), VersionBacklink::MultipartUpload { upload_id } => { - if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { if !mpu.deleted.get() { mpu.parts.clear(); mpu.deleted.set(); diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9de6166d..abfaf9f9 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -170,7 +170,7 @@ impl TableRepair for RepairVersions { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage .object_table - .get(&bucket_id, &key) + .get(bucket_id, key) .await? .map(|o| { o.versions().iter().any(|x| { @@ -180,7 +180,7 @@ impl TableRepair for RepairVersions { .unwrap_or(false), VersionBacklink::MultipartUpload { upload_id } => garage .mpu_table - .get(&upload_id, &EmptyKey) + .get(upload_id, &EmptyKey) .await? .map(|u| !u.deleted.get()) .unwrap_or(false), -- cgit v1.2.3 From c14d3735e5514c395a691a2ab4bb93aef57035e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 9 May 2023 13:02:39 +0200 Subject: Add test for multipart uploads and fix part renumbering --- src/garage/tests/s3/multipart.rs | 192 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 188 insertions(+), 4 deletions(-) (limited to 'src/garage') diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index ee1373cc..8ae6b66e 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -5,6 +5,190 @@ use aws_sdk_s3::types::ByteStream; const SZ_5MB: usize = 5 * 1024 * 1024; const SZ_10MB: usize = 10 * 1024 * 1024; +#[tokio::test] +async fn test_multipart_upload() { + let ctx = common::context(); + let bucket = ctx.create_bucket("testmpu"); + + let u1 = vec![0x11; SZ_5MB]; + let u2 = vec![0x22; SZ_5MB]; + let u3 = vec![0x33; SZ_5MB]; + let u4 = vec![0x44; SZ_5MB]; + let u5 = vec![0x55; SZ_5MB]; + + let up = ctx + .client + .create_multipart_upload() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + assert!(up.upload_id.is_some()); + + let uid = up.upload_id.as_ref().unwrap(); + + let p3 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(3) + .body(ByteStream::from(u3.clone())) + .send() + .await + .unwrap(); + + let _p1 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u1)) + .send() + .await + .unwrap(); + + let _p4 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(4) + .body(ByteStream::from(u4)) + .send() + .await + .unwrap(); + + let p1bis = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u2.clone())) + .send() + .await + .unwrap(); + + let p6 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(6) + .body(ByteStream::from(u5.clone())) + .send() + .await + .unwrap(); + + { + let r = ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .unwrap(); + assert_eq!(r.parts.unwrap().len(), 4); + } + + let cmp = CompletedMultipartUpload::builder() + .parts( + CompletedPart::builder() + .part_number(1) + .e_tag(p1bis.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(3) + .e_tag(p3.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(6) + .e_tag(p6.e_tag.unwrap()) + .build(), + ) + .build(); + + ctx.client + .complete_multipart_upload() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .multipart_upload(cmp) + .send() + .await + .unwrap(); + + // The multipart upload must not appear anymore + assert!(ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .is_err()); + + { + // The object must appear as a regular object + let r = ctx + .client + .head_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_eq!(r.content_length, (SZ_5MB * 3) as i64); + } + + { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, &[&u2[..], &u3[..], &u5[..]].concat()); + } + + { + for (part_number, data) in [(1, &u2), (2, &u3), (3, &u5)] { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .part_number(part_number) + .send() + .await + .unwrap(); + + eprintln!("get_object with part_number = {}", part_number); + assert_eq!(o.content_length, SZ_5MB as i64); + assert_bytes_eq!(o.body, data); + } + } +} + #[tokio::test] async fn test_uploadlistpart() { let ctx = common::context(); @@ -112,13 +296,13 @@ async fn test_uploadlistpart() { assert_eq!(fp.size, SZ_5MB as i64); assert_eq!(ps[1].part_number, 2); - let fp = &ps[1]; - assert!(fp.last_modified.is_some()); + let sp = &ps[1]; + assert!(sp.last_modified.is_some()); assert_eq!( - fp.e_tag.as_ref().unwrap(), + sp.e_tag.as_ref().unwrap(), "\"3366bb9dcf710d6801b5926467d02e19\"" ); - assert_eq!(fp.size, SZ_5MB as i64); + assert_eq!(sp.size, SZ_5MB as i64); } { -- cgit v1.2.3