diff options
author | Alex Auvolat <alex@adnab.me> | 2023-08-31 00:28:37 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-08-31 00:28:37 +0200 |
commit | 1cfcc61de83b832a78c8f93aaaf935a29845cd8b (patch) | |
tree | 73c2e91437d17de6dc6364d62f83b15339e7fb8c /src/model/s3 | |
parent | be03a4610f4a6e3863e6113491e308bbcea9ca94 (diff) | |
download | garage-1cfcc61de83b832a78c8f93aaaf935a29845cd8b.tar.gz garage-1cfcc61de83b832a78c8f93aaaf935a29845cd8b.zip |
lifecycle worker: mitigate potential bugs + refactoring
Diffstat (limited to 'src/model/s3')
-rw-r--r-- | src/model/s3/lifecycle_worker.rs | 51 |
1 files changed, 31 insertions, 20 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index d46d70f3..670ed9fe 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -197,16 +197,21 @@ impl Worker for LifecycleWorker { async fn wait_for_work(&mut self) -> WorkerState { match &self.state { State::Completed(d) => { - let now = now_msec(); - let next_start = midnight_ts(d.succ_opt().expect("no next day")); - if now < next_start { - tokio::time::sleep_until( - (Instant::now() + Duration::from_millis(next_start - now)).into(), - ) - .await; + let next_day = d.succ_opt().expect("no next day"); + let next_start = midnight_ts(next_day); + loop { + let now = now_msec(); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } else { + break; + } } self.state = State::Running { - date: today(), + date: std::cmp::max(next_day, today()), pos: vec![], counter: 0, objects_expired: 0, @@ -228,6 +233,14 @@ async fn process_object( mpu_aborted: &mut usize, last_bucket: &mut Option<Bucket>, ) -> Result<Skip, Error> { + if !object + .versions() + .iter() + .any(|x| x.is_data() || x.is_uploading(None)) + { + return Ok(Skip::NextObject); + } + let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, _ => garage @@ -276,7 +289,7 @@ async fn process_object( if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { now_date >= exp_date } else { - warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); + warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); false } } @@ -309,17 +322,15 @@ async fn process_object( .iter() .filter_map(|v| { let version_date = next_date(v.timestamp); - match &v.state { - ObjectVersionState::Uploading { .. } - if (now_date - version_date) - >= chrono::Duration::days(*abort_mpu_days as i64) => - { - Some(ObjectVersion { - state: ObjectVersionState::Aborted, - ..*v - }) - } - _ => None, + if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64) + && matches!(&v.state, ObjectVersionState::Uploading { .. }) + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } else { + None } }) .collect::<Vec<_>>(); |