aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/model/s3/lifecycle_worker.rs81
1 files changed, 45 insertions, 36 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index 670ed9fe..f99cc935 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -152,41 +152,44 @@ impl Worker for LifecycleWorker {
pos,
last_bucket,
} => {
- let (object_bytes, next_pos) = match self
- .garage
- .object_table
- .data
- .store
- .get_gt(&pos)?
- {
- None => {
- info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
- self.persister
- .set_with(|x| x.last_completed = Some(date.to_string()))?;
- self.state = State::Completed(*date);
- return Ok(WorkerState::Idle);
+ // Process a batch of 100 items before yielding to bg task scheduler
+ for _ in 0..100 {
+ let (object_bytes, next_pos) = match self
+ .garage
+ .object_table
+ .data
+ .store
+ .get_gt(&pos)?
+ {
+ None => {
+ info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
+ self.persister
+ .set_with(|x| x.last_completed = Some(date.to_string()))?;
+ self.state = State::Completed(*date);
+ return Ok(WorkerState::Idle);
+ }
+ Some((k, v)) => (v, k),
+ };
+
+ let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
+ let skip = process_object(
+ &self.garage,
+ *date,
+ &object,
+ objects_expired,
+ mpu_aborted,
+ last_bucket,
+ )
+ .await?;
+
+ *counter += 1;
+ if skip == Skip::SkipBucket {
+ let bucket_id_len = object.bucket_id.as_slice().len();
+ assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
+ *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat();
+ } else {
+ *pos = next_pos;
}
- Some((k, v)) => (v, k),
- };
-
- let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
- let skip = process_object(
- &self.garage,
- *date,
- &object,
- objects_expired,
- mpu_aborted,
- last_bucket,
- )
- .await?;
-
- *counter += 1;
- if skip == Skip::SkipBucket {
- let bucket_id_len = object.bucket_id.as_slice().len();
- assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
- *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat();
- } else {
- *pos = next_pos;
}
Ok(WorkerState::Busy)
@@ -260,6 +263,8 @@ async fn process_object(
return Ok(Skip::SkipBucket);
}
+ let db = garage.object_table.data.store.db();
+
for rule in lifecycle_policy.iter() {
if !rule.enabled {
continue;
@@ -310,7 +315,9 @@ async fn process_object(
"Lifecycle: expiring 1 object in bucket {:?}",
object.bucket_id
);
- garage.object_table.insert(&deleted_object).await?;
+ db.transaction(|mut tx| {
+ garage.object_table.queue_insert(&mut tx, &deleted_object)
+ })?;
*objects_expired += 1;
}
}
@@ -343,7 +350,9 @@ async fn process_object(
);
let aborted_object =
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
- garage.object_table.insert(&aborted_object).await?;
+ db.transaction(|mut tx| {
+ garage.object_table.queue_insert(&mut tx, &aborted_object)
+ })?;
*mpu_aborted += n_aborted;
}
}