aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-08-30 14:28:48 +0200
committerAlex Auvolat <alex@adnab.me>2023-08-30 14:29:03 +0200
commit2996dc875fc378ec3597bfa3bdb8ba8951e1865c (patch)
tree16c5ab74ee56de7e944691abbeb291976bcd9db2
parenta2e0e34db57b326ad5c9e7c9218fb9e29900e705 (diff)
downloadgarage-2996dc875fc378ec3597bfa3bdb8ba8951e1865c.tar.gz
garage-2996dc875fc378ec3597bfa3bdb8ba8951e1865c.zip
lifecycle worker: implement main functionality
-rw-r--r--src/api/s3/lifecycle.rs4
-rw-r--r--src/model/bucket_table.rs4
-rw-r--r--src/model/s3/lifecycle_worker.rs102
3 files changed, 101 insertions, 9 deletions
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 278cf26d..2d621eac 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -239,8 +239,8 @@ impl Filter {
fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter {
GarageLifecycleFilter {
prefix: self.prefix.map(|x| x.0),
- size_gt: self.size_gt.map(|x| x.0 as usize),
- size_lt: self.size_lt.map(|x| x.0 as usize),
+ size_gt: self.size_gt.map(|x| x.0 as u64),
+ size_lt: self.size_lt.map(|x| x.0 as u64),
}
}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 306a58ab..e9d574c5 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -95,9 +95,9 @@ mod v08 {
/// If Some(x), object key has to start with prefix x
pub prefix: Option<String>,
/// If Some(x), object size has to be more than x
- pub size_gt: Option<usize>,
+ pub size_gt: Option<u64>,
/// If Some(x), object size has to be less than x
- pub size_lt: Option<usize>,
+ pub size_lt: Option<u64>,
}
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index 049fa2a3..069f44a0 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -6,6 +6,7 @@ use std::time::{Duration, Instant};
use tokio::sync::watch;
use garage_util::background::*;
+use garage_util::data::*;
use garage_util::error::{Error, OkOrMessage};
use garage_util::persister::PersisterShared;
use garage_util::time::*;
@@ -165,6 +166,7 @@ impl Worker for LifecycleWorker {
let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
process_object(
&self.garage,
+ *date,
object,
objects_expired,
mpu_aborted,
@@ -184,7 +186,7 @@ impl Worker for LifecycleWorker {
match &self.state {
State::Completed(d) => {
let now = now_msec();
- let next_start = midnight_ts(d.succ());
+ let next_start = midnight_ts(d.succ_opt().expect("no next day"));
if now < next_start {
tokio::time::sleep_until(
(Instant::now() + Duration::from_millis(next_start - now)).into(),
@@ -208,6 +210,7 @@ impl Worker for LifecycleWorker {
async fn process_object(
garage: &Arc<Garage>,
+ now_date: NaiveDate,
object: Object,
objects_expired: &mut usize,
mpu_aborted: &mut usize,
@@ -229,24 +232,113 @@ async fn process_object(
.unwrap_or_default();
for rule in lifecycle_policy.iter() {
- todo!()
+ if let Some(pfx) = &rule.filter.prefix {
+ if !object.key.starts_with(pfx) {
+ continue;
+ }
+ }
+
+ if let Some(expire) = &rule.expiration {
+ if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) {
+ let version_date = next_date(current_version.timestamp);
+
+ let current_version_data = match &current_version.state {
+ ObjectVersionState::Complete(c) => c,
+ _ => unreachable!(),
+ };
+
+ let size_match = check_size_filter(current_version_data, &rule.filter);
+ let date_match = match expire {
+ LifecycleExpiration::AfterDays(n_days) => {
+ (now_date - version_date) >= chrono::Duration::days(*n_days as i64)
+ }
+ LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date,
+ };
+
+ if size_match && date_match {
+ // Delete expired version
+ let deleted_object = Object::new(
+ object.bucket_id,
+ object.key.clone(),
+ vec![ObjectVersion {
+ uuid: gen_uuid(),
+ timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1),
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ garage.object_table.insert(&deleted_object).await?;
+ *objects_expired += 1;
+ }
+ }
+ }
+
+ if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days {
+ let aborted_versions = object
+ .versions()
+ .iter()
+ .filter_map(|v| {
+ let version_date = next_date(v.timestamp);
+ match &v.state {
+ ObjectVersionState::Uploading { .. }
+ if (now_date - version_date)
+ >= chrono::Duration::days(*abort_mpu_days as i64) =>
+ {
+ Some(ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ ..*v
+ })
+ }
+ _ => None,
+ }
+ })
+ .collect::<Vec<_>>();
+ if !aborted_versions.is_empty() {
+ // Insert aborted mpu info
+ let n_aborted = aborted_versions.len();
+ let aborted_object =
+ Object::new(object.bucket_id, object.key.clone(), aborted_versions);
+ garage.object_table.insert(&aborted_object).await?;
+ *mpu_aborted += n_aborted;
+ }
+ }
}
*last_bucket = Some(bucket);
Ok(())
}
+fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {
+ let size = match version_data {
+ ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size,
+ _ => unreachable!(),
+ };
+ if let Some(size_gt) = filter.size_gt {
+ if !(size > size_gt) {
+ return false;
+ }
+ }
+ if let Some(size_lt) = filter.size_lt {
+ if !(size < size_lt) {
+ return false;
+ }
+ }
+ return true;
+}
+
fn midnight_ts(date: NaiveDate) -> u64 {
- date.and_hms(0, 0, 0).timestamp_millis() as u64
+ date.and_hms_opt(0, 0, 0)
+ .expect("midnight does not exist")
+ .timestamp_millis() as u64
}
fn next_date(ts: u64) -> NaiveDate {
NaiveDateTime::from_timestamp_millis(ts as i64)
.expect("bad timestamp")
.date()
- .succ()
+ .succ_opt()
+ .expect("no next day")
}
fn today() -> NaiveDate {
- Utc::today().naive_utc()
+ Utc::now().naive_utc().date()
}