From 2996dc875fc378ec3597bfa3bdb8ba8951e1865c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:28:48 +0200 Subject: lifecycle worker: implement main functionality --- src/model/bucket_table.rs | 4 +- src/model/s3/lifecycle_worker.rs | 102 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 99 insertions(+), 7 deletions(-) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 306a58ab..e9d574c5 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -95,9 +95,9 @@ mod v08 { /// If Some(x), object key has to start with prefix x pub prefix: Option, /// If Some(x), object size has to be more than x - pub size_gt: Option, + pub size_gt: Option, /// If Some(x), object size has to be less than x - pub size_lt: Option, + pub size_lt: Option, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 049fa2a3..069f44a0 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -6,6 +6,7 @@ use std::time::{Duration, Instant}; use tokio::sync::watch; use garage_util::background::*; +use garage_util::data::*; use garage_util::error::{Error, OkOrMessage}; use garage_util::persister::PersisterShared; use garage_util::time::*; @@ -165,6 +166,7 @@ impl Worker for LifecycleWorker { let object = self.garage.object_table.data.decode_entry(&object_bytes)?; process_object( &self.garage, + *date, object, objects_expired, mpu_aborted, @@ -184,7 +186,7 @@ impl Worker for LifecycleWorker { match &self.state { State::Completed(d) => { let now = now_msec(); - let next_start = midnight_ts(d.succ()); + let next_start = midnight_ts(d.succ_opt().expect("no next day")); if now < next_start { tokio::time::sleep_until( (Instant::now() + Duration::from_millis(next_start - now)).into(), @@ -208,6 +210,7 @@ impl Worker for LifecycleWorker { async fn process_object( garage: &Arc, + now_date: NaiveDate, object: Object, objects_expired: &mut usize, mpu_aborted: &mut usize, @@ -229,24 +232,113 @@ async fn process_object( .unwrap_or_default(); for rule in lifecycle_policy.iter() { - todo!() + if let Some(pfx) = &rule.filter.prefix { + if !object.key.starts_with(pfx) { + continue; + } + } + + if let Some(expire) = &rule.expiration { + if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) { + let version_date = next_date(current_version.timestamp); + + let current_version_data = match ¤t_version.state { + ObjectVersionState::Complete(c) => c, + _ => unreachable!(), + }; + + let size_match = check_size_filter(current_version_data, &rule.filter); + let date_match = match expire { + LifecycleExpiration::AfterDays(n_days) => { + (now_date - version_date) >= chrono::Duration::days(*n_days as i64) + } + LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date, + }; + + if size_match && date_match { + // Delete expired version + let deleted_object = Object::new( + object.bucket_id, + object.key.clone(), + vec![ObjectVersion { + uuid: gen_uuid(), + timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1), + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + garage.object_table.insert(&deleted_object).await?; + *objects_expired += 1; + } + } + } + + if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days { + let aborted_versions = object + .versions() + .iter() + .filter_map(|v| { + let version_date = next_date(v.timestamp); + match &v.state { + ObjectVersionState::Uploading { .. } + if (now_date - version_date) + >= chrono::Duration::days(*abort_mpu_days as i64) => + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } + _ => None, + } + }) + .collect::>(); + if !aborted_versions.is_empty() { + // Insert aborted mpu info + let n_aborted = aborted_versions.len(); + let aborted_object = + Object::new(object.bucket_id, object.key.clone(), aborted_versions); + garage.object_table.insert(&aborted_object).await?; + *mpu_aborted += n_aborted; + } + } } *last_bucket = Some(bucket); Ok(()) } +fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { + let size = match version_data { + ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size, + _ => unreachable!(), + }; + if let Some(size_gt) = filter.size_gt { + if !(size > size_gt) { + return false; + } + } + if let Some(size_lt) = filter.size_lt { + if !(size < size_lt) { + return false; + } + } + return true; +} + fn midnight_ts(date: NaiveDate) -> u64 { - date.and_hms(0, 0, 0).timestamp_millis() as u64 + date.and_hms_opt(0, 0, 0) + .expect("midnight does not exist") + .timestamp_millis() as u64 } fn next_date(ts: u64) -> NaiveDate { NaiveDateTime::from_timestamp_millis(ts as i64) .expect("bad timestamp") .date() - .succ() + .succ_opt() + .expect("no next day") } fn today() -> NaiveDate { - Utc::today().naive_utc() + Utc::now().naive_utc().date() } -- cgit v1.2.3