diff options
-rw-r--r-- | src/model/s3/lifecycle_worker.rs | 81 |
1 files changed, 45 insertions, 36 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 670ed9fe..f99cc935 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -152,41 +152,44 @@ impl Worker for LifecycleWorker { pos, last_bucket, } => { - let (object_bytes, next_pos) = match self - .garage - .object_table - .data - .store - .get_gt(&pos)? - { - None => { - info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); - self.persister - .set_with(|x| x.last_completed = Some(date.to_string()))?; - self.state = State::Completed(*date); - return Ok(WorkerState::Idle); + // Process a batch of 100 items before yielding to bg task scheduler + for _ in 0..100 { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(date.to_string()))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + let skip = process_object( + &self.garage, + *date, + &object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + if skip == Skip::SkipBucket { + let bucket_id_len = object.bucket_id.as_slice().len(); + assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); + *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + } else { + *pos = next_pos; } - Some((k, v)) => (v, k), - }; - - let object = self.garage.object_table.data.decode_entry(&object_bytes)?; - let skip = process_object( - &self.garage, - *date, - &object, - objects_expired, - mpu_aborted, - last_bucket, - ) - .await?; - - *counter += 1; - if skip == Skip::SkipBucket { - let bucket_id_len = object.bucket_id.as_slice().len(); - assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); - *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); - } else { - *pos = next_pos; } Ok(WorkerState::Busy) @@ -260,6 +263,8 @@ async fn process_object( return Ok(Skip::SkipBucket); } + let db = garage.object_table.data.store.db(); + for rule in lifecycle_policy.iter() { if !rule.enabled { continue; @@ -310,7 +315,9 @@ async fn process_object( "Lifecycle: expiring 1 object in bucket {:?}", object.bucket_id ); - garage.object_table.insert(&deleted_object).await?; + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &deleted_object) + })?; *objects_expired += 1; } } @@ -343,7 +350,9 @@ async fn process_object( ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); - garage.object_table.insert(&aborted_object).await?; + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &aborted_object) + })?; *mpu_aborted += n_aborted; } } |