aboutsummaryrefslogtreecommitdiff
path: root/src/model/s3/lifecycle_worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/s3/lifecycle_worker.rs')
-rw-r--r--src/model/s3/lifecycle_worker.rs28
1 files changed, 22 insertions, 6 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index 069f44a0..1981e0fd 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -54,6 +54,12 @@ enum State {
},
}
+#[derive(Clone, Copy, Eq, PartialEq)]
+enum Skip {
+ SkipBucket,
+ NextObject,
+}
+
pub fn register_bg_vars(
persister: &PersisterShared<LifecycleWorkerPersisted>,
vars: &mut vars::BgVars,
@@ -164,10 +170,10 @@ impl Worker for LifecycleWorker {
};
let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
- process_object(
+ let skip = process_object(
&self.garage,
*date,
- object,
+ &object,
objects_expired,
mpu_aborted,
last_bucket,
@@ -175,7 +181,13 @@ impl Worker for LifecycleWorker {
.await?;
*counter += 1;
- *pos = next_pos;
+ if skip == Skip::SkipBucket {
+ let bucket_id_len = object.bucket_id.as_slice().len();
+ assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
+ *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat();
+ } else {
+ *pos = next_pos;
+ }
Ok(WorkerState::Busy)
}
@@ -211,11 +223,11 @@ impl Worker for LifecycleWorker {
async fn process_object(
garage: &Arc<Garage>,
now_date: NaiveDate,
- object: Object,
+ object: &Object,
objects_expired: &mut usize,
mpu_aborted: &mut usize,
last_bucket: &mut Option<Bucket>,
-) -> Result<(), Error> {
+) -> Result<Skip, Error> {
let bucket = match last_bucket.take() {
Some(b) if b.id == object.bucket_id => b,
_ => garage
@@ -231,6 +243,10 @@ async fn process_object(
.and_then(|s| s.lifecycle_config.get().as_deref())
.unwrap_or_default();
+ if lifecycle_policy.is_empty() {
+ return Ok(Skip::SkipBucket);
+ }
+
for rule in lifecycle_policy.iter() {
if let Some(pfx) = &rule.filter.prefix {
if !object.key.starts_with(pfx) {
@@ -304,7 +320,7 @@ async fn process_object(
}
*last_bucket = Some(bucket);
- Ok(())
+ Ok(Skip::NextObject)
}
fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {