aboutsummaryrefslogtreecommitdiff
path: root/src/model/helper
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-11-04 15:51:16 +0000
committerAlex <alex@adnab.me>2022-11-04 15:51:16 +0000
commit0d279918b7681d9d71cde85c90d1da75026fd7bf (patch)
treeb96e5830c8a40d25988e0e496a52eba4c71454c5 /src/model/helper
parent043246c575d1ae85c7f375ef577b8fef6940a6d5 (diff)
parente03d9062f7f21dd0493dd82a7dcf82f2cd035943 (diff)
downloadgarage-0d279918b7681d9d71cde85c90d1da75026fd7bf.tar.gz
garage-0d279918b7681d9d71cde85c90d1da75026fd7bf.zip
Merge pull request 'Improvements to CLI' (#410) from cleanup-uploads-command into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/410
Diffstat (limited to 'src/model/helper')
-rw-r--r--src/model/helper/bucket.rs69
1 files changed, 68 insertions, 1 deletions
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 130ba5be..4a488d7f 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -1,3 +1,5 @@
+use std::time::Duration;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
@@ -12,7 +14,7 @@ use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
-use crate::s3::object_table::ObjectFilter;
+use crate::s3::object_table::*;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@@ -472,4 +474,69 @@ impl<'a> BucketHelper<'a> {
Ok(true)
}
+
+ // ----
+
+ /// Deletes all incomplete multipart uploads that are older than a certain time.
+ /// Returns the number of uploads aborted
+ pub async fn cleanup_incomplete_uploads(
+ &self,
+ bucket_id: &Uuid,
+ older_than: Duration,
+ ) -> Result<usize, Error> {
+ let older_than = now_msec() - older_than.as_millis() as u64;
+
+ let mut ret = 0usize;
+ let mut start = None;
+
+ loop {
+ let objects = self
+ .0
+ .object_table
+ .get_range(
+ bucket_id,
+ start,
+ Some(ObjectFilter::IsUploading),
+ 1000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ let abortions = objects
+ .iter()
+ .filter_map(|object| {
+ let aborted_versions = object
+ .versions()
+ .iter()
+ .filter(|v| v.is_uploading() && v.timestamp < older_than)
+ .map(|v| ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ uuid: v.uuid,
+ timestamp: v.timestamp,
+ })
+ .collect::<Vec<_>>();
+ if !aborted_versions.is_empty() {
+ Some(Object::new(
+ object.bucket_id,
+ object.key.clone(),
+ aborted_versions,
+ ))
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ ret += abortions.len();
+ self.0.object_table.insert_many(abortions).await?;
+
+ if objects.len() < 1000 {
+ break;
+ } else {
+ start = Some(objects.last().unwrap().key.clone());
+ }
+ }
+
+ Ok(ret)
+ }
}