aboutsummaryrefslogtreecommitdiff
path: root/src/model/helper/bucket.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-11-07 12:20:59 +0100
committerAlex Auvolat <alex@adnab.me>2022-11-07 12:20:59 +0100
commit28d7a49f6365fadaffaa903cc10434c1ed28d564 (patch)
tree8da5b3213b7ff199af80e64af29a7a1395b9d02d /src/model/helper/bucket.rs
parent3039bb5d431532f0ec907eab5e00f94acc4a3472 (diff)
parent66f2daa0259538c64508b37cec89d76a74a71a02 (diff)
downloadgarage-28d7a49f6365fadaffaa903cc10434c1ed28d564.tar.gz
garage-28d7a49f6365fadaffaa903cc10434c1ed28d564.zip
Merge branch 'main' into optimal-layout
Diffstat (limited to 'src/model/helper/bucket.rs')
-rw-r--r--src/model/helper/bucket.rs69
1 files changed, 68 insertions, 1 deletions
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 130ba5be..4a488d7f 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -1,3 +1,5 @@
+use std::time::Duration;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
@@ -12,7 +14,7 @@ use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
-use crate::s3::object_table::ObjectFilter;
+use crate::s3::object_table::*;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@@ -472,4 +474,69 @@ impl<'a> BucketHelper<'a> {
Ok(true)
}
+
+ // ----
+
+ /// Deletes all incomplete multipart uploads that are older than a certain time.
+ /// Returns the number of uploads aborted
+ pub async fn cleanup_incomplete_uploads(
+ &self,
+ bucket_id: &Uuid,
+ older_than: Duration,
+ ) -> Result<usize, Error> {
+ let older_than = now_msec() - older_than.as_millis() as u64;
+
+ let mut ret = 0usize;
+ let mut start = None;
+
+ loop {
+ let objects = self
+ .0
+ .object_table
+ .get_range(
+ bucket_id,
+ start,
+ Some(ObjectFilter::IsUploading),
+ 1000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ let abortions = objects
+ .iter()
+ .filter_map(|object| {
+ let aborted_versions = object
+ .versions()
+ .iter()
+ .filter(|v| v.is_uploading() && v.timestamp < older_than)
+ .map(|v| ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ uuid: v.uuid,
+ timestamp: v.timestamp,
+ })
+ .collect::<Vec<_>>();
+ if !aborted_versions.is_empty() {
+ Some(Object::new(
+ object.bucket_id,
+ object.key.clone(),
+ aborted_versions,
+ ))
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ ret += abortions.len();
+ self.0.object_table.insert_many(abortions).await?;
+
+ if objects.len() < 1000 {
+ break;
+ } else {
+ start = Some(objects.last().unwrap().key.clone());
+ }
+ }
+
+ Ok(ret)
+ }
}