use std::sync::Arc; use async_trait::async_trait; use chrono::prelude::*; use std::time::{Duration, Instant}; use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_table::EmptyKey; use crate::bucket_table::*; use crate::s3::object_table::*; use crate::garage::Garage; mod v090 { use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Default, Clone)] pub struct LifecycleWorkerPersisted { pub last_completed: Option<String>, } impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { const VERSION_MARKER: &'static [u8] = b"G09lwp"; } } pub use v090::*; pub struct LifecycleWorker { garage: Arc<Garage>, state: State, persister: PersisterShared<LifecycleWorkerPersisted>, } enum State { Completed(NaiveDate), Running { date: NaiveDate, pos: Vec<u8>, counter: usize, objects_expired: usize, mpu_aborted: usize, last_bucket: Option<Bucket>, }, } #[derive(Clone, Copy, Eq, PartialEq)] enum Skip { SkipBucket, NextObject, } pub fn register_bg_vars( persister: &PersisterShared<LifecycleWorkerPersisted>, vars: &mut vars::BgVars, ) { vars.register_ro(persister, "lifecycle-last-completed", |p| { p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string())) }); } impl LifecycleWorker { pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self { let today = today(); let last_completed = persister.get_with(|x| { x.last_completed .as_deref() .and_then(|x| x.parse::<NaiveDate>().ok()) }); let state = match last_completed { Some(d) if d >= today => State::Completed(d), _ => State::start(today), }; Self { garage, state, persister, } } } impl State { fn start(date: NaiveDate) -> Self { info!("Starting lifecycle worker for {}", date); State::Running { date, pos: vec![], counter: 0, objects_expired: 0, mpu_aborted: 0, last_bucket: None, } } } #[async_trait] impl Worker for LifecycleWorker { fn name(&self) -> String { "object lifecycle worker".to_string() } fn status(&self) -> WorkerStatus { match &self.state { State::Completed(d) => WorkerStatus { freeform: vec![format!("Last completed: {}", d)], ..Default::default() }, State::Running { date, counter, objects_expired, mpu_aborted, .. } => { let n_objects = self .garage .object_table .data .store .fast_len() .unwrap_or(None); let progress = match n_objects { None => "...".to_string(), Some(total) => format!( "~{:.2}%", 100. * std::cmp::min(*counter, total) as f32 / total as f32 ), }; WorkerStatus { progress: Some(progress), freeform: vec![ format!("Started: {}", date), format!("Objects expired: {}", objects_expired), format!("Multipart uploads aborted: { }", mpu_aborted), ], ..Default::default() } } } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { match &mut self.state { State::Completed(_) => Ok(WorkerState::Idle), State::Running { date, counter, objects_expired, mpu_aborted, pos, last_bucket, } => { // Process a batch of 100 items before yielding to bg task scheduler for _ in 0..100 { let (object_bytes, next_pos) = match self .garage .object_table .data .store .get_gt(&pos)? { None => { info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); self.persister .set_with(|x| x.last_completed = Some(date.to_string()))?; self.state = State::Completed(*date); return Ok(WorkerState::Idle); } Some((k, v)) => (v, k), }; let object = self.garage.object_table.data.decode_entry(&object_bytes)?; let skip = process_object( &self.garage, *date, &object, objects_expired, mpu_aborted, last_bucket, ) .await?; *counter += 1; if skip == Skip::SkipBucket { let bucket_id_len = object.bucket_id.as_slice().len(); assert_eq!( next_pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()) ); let last_bucket_pos = [&next_pos[..bucket_id_len], &[0xFFu8][..]].concat(); *pos = std::cmp::max(next_pos, last_bucket_pos); } else { *pos = next_pos; } } Ok(WorkerState::Busy) } } } async fn wait_for_work(&mut self) -> WorkerState { match &self.state { State::Completed(d) => { let next_day = d.succ_opt().expect("no next day"); let next_start = midnight_ts(next_day); loop { let now = now_msec(); if now < next_start { tokio::time::sleep_until( (Instant::now() + Duration::from_millis(next_start - now)).into(), ) .await; } else { break; } } self.state = State::start(std::cmp::max(next_day, today())); } State::Running { .. } => (), } WorkerState::Busy } } async fn process_object( garage: &Arc<Garage>, now_date: NaiveDate, object: &Object, objects_expired: &mut usize, mpu_aborted: &mut usize, last_bucket: &mut Option<Bucket>, ) -> Result<Skip, Error> { if !object .versions() .iter() .any(|x| x.is_data() || x.is_uploading(None)) { return Ok(Skip::NextObject); } let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, _ => { match garage .bucket_table .get(&EmptyKey, &object.bucket_id) .await? { Some(b) => b, None => { warn!( "Lifecycle worker: object in non-existent bucket {:?}", object.bucket_id ); return Ok(Skip::SkipBucket); } } } }; let lifecycle_policy: &[LifecycleRule] = bucket .state .as_option() .and_then(|s| s.lifecycle_config.get().as_deref()) .unwrap_or_default(); if lifecycle_policy.iter().all(|x| !x.enabled) { return Ok(Skip::SkipBucket); } let db = garage.object_table.data.store.db(); for rule in lifecycle_policy.iter() { if !rule.enabled { continue; } if let Some(pfx) = &rule.filter.prefix { if !object.key.starts_with(pfx) { continue; } } if let Some(expire) = &rule.expiration { if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) { let version_date = next_date(current_version.timestamp); let current_version_data = match ¤t_version.state { ObjectVersionState::Complete(c) => c, _ => unreachable!(), }; let size_match = check_size_filter(current_version_data, &rule.filter); let date_match = match expire { LifecycleExpiration::AfterDays(n_days) => { (now_date - version_date) >= chrono::Duration::days(*n_days as i64) } LifecycleExpiration::AtDate(exp_date) => { if let Ok(exp_date) = parse_lifecycle_date(exp_date) { now_date >= exp_date } else { warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); false } } }; if size_match && date_match { // Delete expired version let deleted_object = Object::new( object.bucket_id, object.key.clone(), vec![ObjectVersion { uuid: gen_uuid(), timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1), state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); info!( "Lifecycle: expiring 1 object in bucket {:?}", object.bucket_id ); db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?; *objects_expired += 1; } } } if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days { let aborted_versions = object .versions() .iter() .filter_map(|v| { let version_date = next_date(v.timestamp); if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64) && matches!(&v.state, ObjectVersionState::Uploading { .. }) { Some(ObjectVersion { state: ObjectVersionState::Aborted, ..*v }) } else { None } }) .collect::<Vec<_>>(); if !aborted_versions.is_empty() { // Insert aborted mpu info let n_aborted = aborted_versions.len(); info!( "Lifecycle: aborting {} incomplete upload(s) in bucket {:?}", n_aborted, object.bucket_id ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?; *mpu_aborted += n_aborted; } } } *last_bucket = Some(bucket); Ok(Skip::NextObject) } fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { let size = match version_data { ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size, _ => unreachable!(), }; if let Some(size_gt) = filter.size_gt { if !(size > size_gt) { return false; } } if let Some(size_lt) = filter.size_lt { if !(size < size_lt) { return false; } } true } fn midnight_ts(date: NaiveDate) -> u64 { date.and_hms_opt(0, 0, 0) .expect("midnight does not exist") .timestamp_millis() as u64 } fn next_date(ts: u64) -> NaiveDate { NaiveDateTime::from_timestamp_millis(ts as i64) .expect("bad timestamp") .date() .succ_opt() .expect("no next day") } fn today() -> NaiveDate { Utc::now().naive_utc().date() }