aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-05-03 16:17:40 +0200
committerAlex Auvolat <alex@adnab.me>2023-06-09 16:23:37 +0200
commit87be8eeb930f37e7ebc23037eecf7f79f173434a (patch)
tree116e606fa08963a700114e47bcab07ac01670401 /src/garage/admin/block.rs
parent82e75c0e296c74c374f3d40feeb1aadcb58398f0 (diff)
downloadgarage-87be8eeb930f37e7ebc23037eecf7f79f173434a.tar.gz
garage-87be8eeb930f37e7ebc23037eecf7f79f173434a.zip
updaet block admin for new multipartupload models
Diffstat (limited to 'src/garage/admin/block.rs')
-rw-r--r--src/garage/admin/block.rs113
1 files changed, 73 insertions, 40 deletions
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
index e9e3ff96..2d84b5cf 100644
--- a/src/garage/admin/block.rs
+++ b/src/garage/admin/block.rs
@@ -34,6 +34,7 @@ impl AdminRpcHandler {
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
+ let mut uploads = vec![];
for br in block_refs {
if let Some(v) = self
.garage
@@ -41,6 +42,11 @@ impl AdminRpcHandler {
.get(&br.version, &EmptyKey)
.await?
{
+ if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
+ if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ uploads.push(u);
+ }
+ }
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
@@ -50,6 +56,7 @@ impl AdminRpcHandler {
hash,
refcount,
versions,
+ uploads,
})
}
@@ -93,6 +100,7 @@ impl AdminRpcHandler {
}
let mut obj_dels = 0;
+ let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
@@ -105,56 +113,81 @@ impl AdminRpcHandler {
.await?;
for br in block_refs {
- let version = match self
+ if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
- Some(v) => v,
- None => continue,
- };
+ self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?;
- if let Some(object) = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?
- {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == br.version {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- version.bucket_id,
- version.key.clone(),
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(
- ObjectVersionData::DeleteMarker,
- ),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- obj_dels += 1;
- }
- }
- }
+ if !version.deleted.get() {
+ let deleted_version =
+ Version::new(version.uuid, version.backlink, true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ }
- if !version.deleted.get() {
- let deleted_version =
- Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
- }
- }
Ok(AdminRpc::Ok(format!(
- "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(),
+ ver_dels,
obj_dels,
- ver_dels
+ mpu_dels,
)))
+ }
+
+ async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> {
+ let (bucket_id, key, ov_id) = match &version.backlink {
+ VersionBacklink::Object{bucket_id, key} => {
+ (*bucket_id, key.clone(), version.uuid)
+ }
+ VersionBacklink::MultipartUpload{upload_id} => {
+ if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? {
+ if !mpu.deleted.get() {
+ mpu.parts.clear();
+ mpu.deleted.set();
+ self.garage.mpu_table.insert(&mpu).await?;
+ *mpu_dels += 1;
+ }
+ (mpu.bucket_id, mpu.key.clone(), *upload_id)
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&bucket_id, &key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == ov_id {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ bucket_id,
+ key,
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ *obj_dels += 1;
+ }
+ }
+ }
+
+ Ok(())
}
+
}