aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-08-31 00:28:37 +0200
committerAlex Auvolat <alex@adnab.me>2023-08-31 00:28:37 +0200
commit1cfcc61de83b832a78c8f93aaaf935a29845cd8b (patch)
tree73c2e91437d17de6dc6364d62f83b15339e7fb8c /src/model
parentbe03a4610f4a6e3863e6113491e308bbcea9ca94 (diff)
downloadgarage-1cfcc61de83b832a78c8f93aaaf935a29845cd8b.tar.gz
garage-1cfcc61de83b832a78c8f93aaaf935a29845cd8b.zip
lifecycle worker: mitigate potential bugs + refactoring
Diffstat (limited to 'src/model')
-rw-r--r--src/model/s3/lifecycle_worker.rs51
1 files changed, 31 insertions, 20 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index d46d70f3..670ed9fe 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -197,16 +197,21 @@ impl Worker for LifecycleWorker {
async fn wait_for_work(&mut self) -> WorkerState {
match &self.state {
State::Completed(d) => {
- let now = now_msec();
- let next_start = midnight_ts(d.succ_opt().expect("no next day"));
- if now < next_start {
- tokio::time::sleep_until(
- (Instant::now() + Duration::from_millis(next_start - now)).into(),
- )
- .await;
+ let next_day = d.succ_opt().expect("no next day");
+ let next_start = midnight_ts(next_day);
+ loop {
+ let now = now_msec();
+ if now < next_start {
+ tokio::time::sleep_until(
+ (Instant::now() + Duration::from_millis(next_start - now)).into(),
+ )
+ .await;
+ } else {
+ break;
+ }
}
self.state = State::Running {
- date: today(),
+ date: std::cmp::max(next_day, today()),
pos: vec![],
counter: 0,
objects_expired: 0,
@@ -228,6 +233,14 @@ async fn process_object(
mpu_aborted: &mut usize,
last_bucket: &mut Option<Bucket>,
) -> Result<Skip, Error> {
+ if !object
+ .versions()
+ .iter()
+ .any(|x| x.is_data() || x.is_uploading(None))
+ {
+ return Ok(Skip::NextObject);
+ }
+
let bucket = match last_bucket.take() {
Some(b) if b.id == object.bucket_id => b,
_ => garage
@@ -276,7 +289,7 @@ async fn process_object(
if let Ok(exp_date) = parse_lifecycle_date(&exp_date) {
now_date >= exp_date
} else {
- warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
+ warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
false
}
}
@@ -309,17 +322,15 @@ async fn process_object(
.iter()
.filter_map(|v| {
let version_date = next_date(v.timestamp);
- match &v.state {
- ObjectVersionState::Uploading { .. }
- if (now_date - version_date)
- >= chrono::Duration::days(*abort_mpu_days as i64) =>
- {
- Some(ObjectVersion {
- state: ObjectVersionState::Aborted,
- ..*v
- })
- }
- _ => None,
+ if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64)
+ && matches!(&v.state, ObjectVersionState::Uploading { .. })
+ {
+ Some(ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ ..*v
+ })
+ } else {
+ None
}
})
.collect::<Vec<_>>();