diff options
author | Alex Auvolat <alex@adnab.me> | 2023-05-03 16:17:40 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-06-09 16:23:37 +0200 |
commit | 87be8eeb930f37e7ebc23037eecf7f79f173434a (patch) | |
tree | 116e606fa08963a700114e47bcab07ac01670401 | |
parent | 82e75c0e296c74c374f3d40feeb1aadcb58398f0 (diff) | |
download | garage-87be8eeb930f37e7ebc23037eecf7f79f173434a.tar.gz garage-87be8eeb930f37e7ebc23037eecf7f79f173434a.zip |
updaet block admin for new multipartupload models
-rw-r--r-- | src/garage/admin/block.rs | 113 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 2 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 3 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 43 |
4 files changed, 109 insertions, 52 deletions
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index e9e3ff96..2d84b5cf 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -34,6 +34,7 @@ impl AdminRpcHandler { .get_range(&hash, None, None, 10000, Default::default()) .await?; let mut versions = vec![]; + let mut uploads = vec![]; for br in block_refs { if let Some(v) = self .garage @@ -41,6 +42,11 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { + if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { + if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + uploads.push(u); + } + } versions.push(Ok(v)); } else { versions.push(Err(br.version)); @@ -50,6 +56,7 @@ impl AdminRpcHandler { hash, refcount, versions, + uploads, }) } @@ -93,6 +100,7 @@ impl AdminRpcHandler { } let mut obj_dels = 0; + let mut mpu_dels = 0; let mut ver_dels = 0; for hash in blocks { @@ -105,56 +113,81 @@ impl AdminRpcHandler { .await?; for br in block_refs { - let version = match self + if let Some(version) = self .garage .version_table .get(&br.version, &EmptyKey) .await? { - Some(v) => v, - None => continue, - }; + self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?; - if let Some(object) = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == br.version { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - version.bucket_id, - version.key.clone(), - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - obj_dels += 1; - } - } - } + if !version.deleted.get() { + let deleted_version = + Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.bucket_id, version.key.clone(), true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } Ok(AdminRpc::Ok(format!( - "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), + ver_dels, obj_dels, - ver_dels + mpu_dels, ))) + } + + async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object{bucket_id, key} => { + (*bucket_id, key.clone(), version.uuid) + } + VersionBacklink::MultipartUpload{upload_id} => { + if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self + .garage + .object_table + .get(&bucket_id, &key) + .await? + { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete( + ObjectVersionData::DeleteMarker, + ), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) } + } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 93f6dd08..07b0012d 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -27,6 +27,7 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; use crate::cli::*; @@ -66,6 +67,7 @@ pub enum AdminRpc { hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>, + uploads: Vec<MultipartUpload>, }, } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 905b14d3..fb77a927 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -215,8 +215,9 @@ pub async fn cmd_admin( hash, refcount, versions, + uploads, } => { - print_block_info(hash, refcount, versions); + print_block_info(hash, refcount, versions, uploads); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 2c6be2f4..22a3442d 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; -use garage_model::s3::version_table::Version; +use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -385,29 +386,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { format_table(table); } -pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) { +pub fn print_block_info( + hash: Hash, + refcount: u64, + versions: Vec<Result<Version, Uuid>>, + uploads: Vec<MultipartUpload>, +) { println!("Block hash: {}", hex::encode(hash.as_slice())); println!("Refcount: {}", refcount); println!(); - let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; let mut nondeleted_count = 0; for v in versions.iter() { match v { Ok(ver) => { - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}", - ver.uuid, - ver.bucket_id, - ver.key, - ver.deleted.get() - )); + match &ver.backlink { + VersionBacklink::Object { bucket_id, key } => { + table.push(format!( + "{:?}\t{:?}\t{}\t\t{:?}", + ver.uuid, + bucket_id, + key, + ver.deleted.get() + )); + } + VersionBacklink::MultipartUpload { upload_id } => { + let upload = uploads.iter().find(|x| x.upload_id == *upload_id); + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}\t{:?}", + ver.uuid, + upload.map(|u| u.bucket_id).unwrap_or_default(), + upload.map(|u| u.key.as_str()).unwrap_or_default(), + upload_id, + ver.deleted.get() + )); + } + } if !ver.deleted.get() { nondeleted_count += 1; } } Err(vh) => { - table.push(format!("{:?}\t\t\tyes", vh)); + table.push(format!("{:?}\t\t\t\tyes", vh)); } } } |