From a2e0e34db57b326ad5c9e7c9218fb9e29900e705 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 12:41:11 +0200 Subject: lifecycle: skeleton for lifecycle worker --- src/model/s3/lifecycle_worker.rs | 252 +++++++++++++++++++++++++++++++++++++++ src/model/s3/mod.rs | 2 + 2 files changed, 254 insertions(+) create mode 100644 src/model/s3/lifecycle_worker.rs (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs new file mode 100644 index 00000000..049fa2a3 --- /dev/null +++ b/src/model/s3/lifecycle_worker.rs @@ -0,0 +1,252 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::prelude::*; +use std::time::{Duration, Instant}; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::{Error, OkOrMessage}; +use garage_util::persister::PersisterShared; +use garage_util::time::*; + +use garage_table::EmptyKey; + +use crate::bucket_table::*; +use crate::s3::object_table::*; + +use crate::garage::Garage; + +mod v090 { + use chrono::naive::NaiveDate; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize, Default, Clone, Copy)] + pub struct LifecycleWorkerPersisted { + pub last_completed: Option, + } + + impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { + const VERSION_MARKER: &'static [u8] = b"G09lwp"; + } +} + +pub use v090::*; + +pub struct LifecycleWorker { + garage: Arc, + + state: State, + + persister: PersisterShared, +} + +enum State { + Completed(NaiveDate), + Running { + date: NaiveDate, + pos: Vec, + counter: usize, + objects_expired: usize, + mpu_aborted: usize, + last_bucket: Option, + }, +} + +pub fn register_bg_vars( + persister: &PersisterShared, + vars: &mut vars::BgVars, +) { + vars.register_ro(persister, "lifecycle-last-completed", |p| { + p.get_with(|x| { + x.last_completed + .map(|date| date.to_string()) + .unwrap_or("never".to_string()) + }) + }); +} + +impl LifecycleWorker { + pub fn new(garage: Arc, persister: PersisterShared) -> Self { + let today = today(); + let state = match persister.get_with(|x| x.last_completed) { + Some(d) if d >= today => State::Completed(d), + _ => State::Running { + date: today, + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + }, + }; + Self { + garage, + state, + persister, + } + } +} + +#[async_trait] +impl Worker for LifecycleWorker { + fn name(&self) -> String { + "object lifecycle worker".to_string() + } + + fn status(&self) -> WorkerStatus { + match &self.state { + State::Completed(d) => WorkerStatus { + freeform: vec![format!("Last completed: {}", d)], + ..Default::default() + }, + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + .. + } => { + let n_objects = self + .garage + .object_table + .data + .store + .fast_len() + .unwrap_or(None); + let progress = match n_objects { + None => "...".to_string(), + Some(total) => format!( + "~{:.2}%", + 100. * std::cmp::min(*counter, total) as f32 / total as f32 + ), + }; + WorkerStatus { + progress: Some(progress), + freeform: vec![ + format!("Started: {}", date), + format!("Objects expired: {}", objects_expired), + format!("Multipart uploads aborted: { }", mpu_aborted), + ], + ..Default::default() + } + } + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + match &mut self.state { + State::Completed(_) => Ok(WorkerState::Idle), + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + pos, + last_bucket, + } => { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(*date))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + process_object( + &self.garage, + object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + *pos = next_pos; + + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + match &self.state { + State::Completed(d) => { + let now = now_msec(); + let next_start = midnight_ts(d.succ()); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } + self.state = State::Running { + date: today(), + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + }; + } + State::Running { .. } => (), + } + WorkerState::Busy + } +} + +async fn process_object( + garage: &Arc, + object: Object, + objects_expired: &mut usize, + mpu_aborted: &mut usize, + last_bucket: &mut Option, +) -> Result<(), Error> { + let bucket = match last_bucket.take() { + Some(b) if b.id == object.bucket_id => b, + _ => garage + .bucket_table + .get(&EmptyKey, &object.bucket_id) + .await? + .ok_or_message("object in non-existent bucket")?, + }; + + let lifecycle_policy: &[LifecycleRule] = bucket + .state + .as_option() + .and_then(|s| s.lifecycle_config.get().as_deref()) + .unwrap_or_default(); + + for rule in lifecycle_policy.iter() { + todo!() + } + + *last_bucket = Some(bucket); + Ok(()) +} + +fn midnight_ts(date: NaiveDate) -> u64 { + date.and_hms(0, 0, 0).timestamp_millis() as u64 +} + +fn next_date(ts: u64) -> NaiveDate { + NaiveDateTime::from_timestamp_millis(ts as i64) + .expect("bad timestamp") + .date() + .succ() +} + +fn today() -> NaiveDate { + Utc::today().naive_utc() +} diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 36d67093..5c776fb0 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -2,3 +2,5 @@ pub mod block_ref_table; pub mod mpu_table; pub mod object_table; pub mod version_table; + +pub mod lifecycle_worker; -- cgit v1.2.3 From 2996dc875fc378ec3597bfa3bdb8ba8951e1865c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:28:48 +0200 Subject: lifecycle worker: implement main functionality --- src/model/s3/lifecycle_worker.rs | 102 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 97 insertions(+), 5 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 049fa2a3..069f44a0 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -6,6 +6,7 @@ use std::time::{Duration, Instant}; use tokio::sync::watch; use garage_util::background::*; +use garage_util::data::*; use garage_util::error::{Error, OkOrMessage}; use garage_util::persister::PersisterShared; use garage_util::time::*; @@ -165,6 +166,7 @@ impl Worker for LifecycleWorker { let object = self.garage.object_table.data.decode_entry(&object_bytes)?; process_object( &self.garage, + *date, object, objects_expired, mpu_aborted, @@ -184,7 +186,7 @@ impl Worker for LifecycleWorker { match &self.state { State::Completed(d) => { let now = now_msec(); - let next_start = midnight_ts(d.succ()); + let next_start = midnight_ts(d.succ_opt().expect("no next day")); if now < next_start { tokio::time::sleep_until( (Instant::now() + Duration::from_millis(next_start - now)).into(), @@ -208,6 +210,7 @@ impl Worker for LifecycleWorker { async fn process_object( garage: &Arc, + now_date: NaiveDate, object: Object, objects_expired: &mut usize, mpu_aborted: &mut usize, @@ -229,24 +232,113 @@ async fn process_object( .unwrap_or_default(); for rule in lifecycle_policy.iter() { - todo!() + if let Some(pfx) = &rule.filter.prefix { + if !object.key.starts_with(pfx) { + continue; + } + } + + if let Some(expire) = &rule.expiration { + if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) { + let version_date = next_date(current_version.timestamp); + + let current_version_data = match ¤t_version.state { + ObjectVersionState::Complete(c) => c, + _ => unreachable!(), + }; + + let size_match = check_size_filter(current_version_data, &rule.filter); + let date_match = match expire { + LifecycleExpiration::AfterDays(n_days) => { + (now_date - version_date) >= chrono::Duration::days(*n_days as i64) + } + LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date, + }; + + if size_match && date_match { + // Delete expired version + let deleted_object = Object::new( + object.bucket_id, + object.key.clone(), + vec![ObjectVersion { + uuid: gen_uuid(), + timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1), + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + garage.object_table.insert(&deleted_object).await?; + *objects_expired += 1; + } + } + } + + if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days { + let aborted_versions = object + .versions() + .iter() + .filter_map(|v| { + let version_date = next_date(v.timestamp); + match &v.state { + ObjectVersionState::Uploading { .. } + if (now_date - version_date) + >= chrono::Duration::days(*abort_mpu_days as i64) => + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } + _ => None, + } + }) + .collect::>(); + if !aborted_versions.is_empty() { + // Insert aborted mpu info + let n_aborted = aborted_versions.len(); + let aborted_object = + Object::new(object.bucket_id, object.key.clone(), aborted_versions); + garage.object_table.insert(&aborted_object).await?; + *mpu_aborted += n_aborted; + } + } } *last_bucket = Some(bucket); Ok(()) } +fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { + let size = match version_data { + ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size, + _ => unreachable!(), + }; + if let Some(size_gt) = filter.size_gt { + if !(size > size_gt) { + return false; + } + } + if let Some(size_lt) = filter.size_lt { + if !(size < size_lt) { + return false; + } + } + return true; +} + fn midnight_ts(date: NaiveDate) -> u64 { - date.and_hms(0, 0, 0).timestamp_millis() as u64 + date.and_hms_opt(0, 0, 0) + .expect("midnight does not exist") + .timestamp_millis() as u64 } fn next_date(ts: u64) -> NaiveDate { NaiveDateTime::from_timestamp_millis(ts as i64) .expect("bad timestamp") .date() - .succ() + .succ_opt() + .expect("no next day") } fn today() -> NaiveDate { - Utc::today().naive_utc() + Utc::now().naive_utc().date() } -- cgit v1.2.3 From da8b224e241edad8cfe25f0b0256ebb0d60fa8dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:38:19 +0200 Subject: lifecycle worker: skip entire bucket when no lifecycle config is set --- src/model/s3/lifecycle_worker.rs | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 069f44a0..1981e0fd 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -54,6 +54,12 @@ enum State { }, } +#[derive(Clone, Copy, Eq, PartialEq)] +enum Skip { + SkipBucket, + NextObject, +} + pub fn register_bg_vars( persister: &PersisterShared, vars: &mut vars::BgVars, @@ -164,10 +170,10 @@ impl Worker for LifecycleWorker { }; let object = self.garage.object_table.data.decode_entry(&object_bytes)?; - process_object( + let skip = process_object( &self.garage, *date, - object, + &object, objects_expired, mpu_aborted, last_bucket, @@ -175,7 +181,13 @@ impl Worker for LifecycleWorker { .await?; *counter += 1; - *pos = next_pos; + if skip == Skip::SkipBucket { + let bucket_id_len = object.bucket_id.as_slice().len(); + assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); + *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + } else { + *pos = next_pos; + } Ok(WorkerState::Busy) } @@ -211,11 +223,11 @@ impl Worker for LifecycleWorker { async fn process_object( garage: &Arc, now_date: NaiveDate, - object: Object, + object: &Object, objects_expired: &mut usize, mpu_aborted: &mut usize, last_bucket: &mut Option, -) -> Result<(), Error> { +) -> Result { let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, _ => garage @@ -231,6 +243,10 @@ async fn process_object( .and_then(|s| s.lifecycle_config.get().as_deref()) .unwrap_or_default(); + if lifecycle_policy.is_empty() { + return Ok(Skip::SkipBucket); + } + for rule in lifecycle_policy.iter() { if let Some(pfx) = &rule.filter.prefix { if !object.key.starts_with(pfx) { @@ -304,7 +320,7 @@ async fn process_object( } *last_bucket = Some(bucket); - Ok(()) + Ok(Skip::NextObject) } fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { -- cgit v1.2.3 From 7200954318a1b248b4194ee9273bcd2502b50d58 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:54:52 +0200 Subject: lifecycle worker: add logging --- src/model/s3/lifecycle_worker.rs | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 1981e0fd..02e296e7 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -282,6 +282,10 @@ async fn process_object( state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); + info!( + "Lifecycle: expiring 1 object in bucket {:?}", + object.bucket_id + ); garage.object_table.insert(&deleted_object).await?; *objects_expired += 1; } @@ -311,6 +315,10 @@ async fn process_object( if !aborted_versions.is_empty() { // Insert aborted mpu info let n_aborted = aborted_versions.len(); + info!( + "Lifecycle: aborting {} incomplete upload(s) in bucket {:?}", + n_aborted, object.bucket_id + ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); garage.object_table.insert(&aborted_object).await?; -- cgit v1.2.3 From 75ccc5a95c76f31235fcaab8a2c1795693733a4b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 20:02:07 +0200 Subject: lifecycle config: store date as given, try to debug --- src/model/s3/lifecycle_worker.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 02e296e7..5641b093 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -268,7 +268,14 @@ async fn process_object( LifecycleExpiration::AfterDays(n_days) => { (now_date - version_date) >= chrono::Duration::days(*n_days as i64) } - LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date, + LifecycleExpiration::AtDate(exp_date) => { + if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { + now_date >= exp_date + } else { + warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); + false + } + } }; if size_match && date_match { -- cgit v1.2.3 From 01c327a07a6045055fef6f923848fe6046e937c4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 23:46:15 +0200 Subject: lifecycle worker: avoid building chrono's serde feature --- src/model/s3/lifecycle_worker.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 5641b093..02374bf0 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -19,12 +19,11 @@ use crate::s3::object_table::*; use crate::garage::Garage; mod v090 { - use chrono::naive::NaiveDate; use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize, Default, Clone, Copy)] + #[derive(Serialize, Deserialize, Default, Clone)] pub struct LifecycleWorkerPersisted { - pub last_completed: Option, + pub last_completed: Option, } impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { @@ -65,18 +64,19 @@ pub fn register_bg_vars( vars: &mut vars::BgVars, ) { vars.register_ro(persister, "lifecycle-last-completed", |p| { - p.get_with(|x| { - x.last_completed - .map(|date| date.to_string()) - .unwrap_or("never".to_string()) - }) + p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string())) }); } impl LifecycleWorker { pub fn new(garage: Arc, persister: PersisterShared) -> Self { let today = today(); - let state = match persister.get_with(|x| x.last_completed) { + let last_completed = persister.get_with(|x| { + x.last_completed + .as_deref() + .and_then(|x| x.parse::().ok()) + }); + let state = match last_completed { Some(d) if d >= today => State::Completed(d), _ => State::Running { date: today, @@ -162,7 +162,7 @@ impl Worker for LifecycleWorker { None => { info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); self.persister - .set_with(|x| x.last_completed = Some(*date))?; + .set_with(|x| x.last_completed = Some(date.to_string()))?; self.state = State::Completed(*date); return Ok(WorkerState::Idle); } -- cgit v1.2.3 From b2f679675e3390bea6c6b3b9fb3632d0ed414a75 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 23:52:09 +0200 Subject: lifecycle worker: take into account disabled rules --- src/model/s3/lifecycle_worker.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 02374bf0..d46d70f3 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -243,11 +243,15 @@ async fn process_object( .and_then(|s| s.lifecycle_config.get().as_deref()) .unwrap_or_default(); - if lifecycle_policy.is_empty() { + if lifecycle_policy.iter().all(|x| !x.enabled) { return Ok(Skip::SkipBucket); } for rule in lifecycle_policy.iter() { + if !rule.enabled { + continue; + } + if let Some(pfx) = &rule.filter.prefix { if !object.key.starts_with(pfx) { continue; -- cgit v1.2.3 From 1cfcc61de83b832a78c8f93aaaf935a29845cd8b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 00:28:37 +0200 Subject: lifecycle worker: mitigate potential bugs + refactoring --- src/model/s3/lifecycle_worker.rs | 51 ++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 20 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index d46d70f3..670ed9fe 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -197,16 +197,21 @@ impl Worker for LifecycleWorker { async fn wait_for_work(&mut self) -> WorkerState { match &self.state { State::Completed(d) => { - let now = now_msec(); - let next_start = midnight_ts(d.succ_opt().expect("no next day")); - if now < next_start { - tokio::time::sleep_until( - (Instant::now() + Duration::from_millis(next_start - now)).into(), - ) - .await; + let next_day = d.succ_opt().expect("no next day"); + let next_start = midnight_ts(next_day); + loop { + let now = now_msec(); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } else { + break; + } } self.state = State::Running { - date: today(), + date: std::cmp::max(next_day, today()), pos: vec![], counter: 0, objects_expired: 0, @@ -228,6 +233,14 @@ async fn process_object( mpu_aborted: &mut usize, last_bucket: &mut Option, ) -> Result { + if !object + .versions() + .iter() + .any(|x| x.is_data() || x.is_uploading(None)) + { + return Ok(Skip::NextObject); + } + let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, _ => garage @@ -276,7 +289,7 @@ async fn process_object( if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { now_date >= exp_date } else { - warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); + warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); false } } @@ -309,17 +322,15 @@ async fn process_object( .iter() .filter_map(|v| { let version_date = next_date(v.timestamp); - match &v.state { - ObjectVersionState::Uploading { .. } - if (now_date - version_date) - >= chrono::Duration::days(*abort_mpu_days as i64) => - { - Some(ObjectVersion { - state: ObjectVersionState::Aborted, - ..*v - }) - } - _ => None, + if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64) + && matches!(&v.state, ObjectVersionState::Uploading { .. }) + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } else { + None } }) .collect::>(); -- cgit v1.2.3 From adbf5925de733484998c3a788c4ec7e8cda2cec4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:19:26 +0200 Subject: lifecycle worker: use queue_insert and process objects in batches --- src/model/s3/lifecycle_worker.rs | 81 ++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 36 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 670ed9fe..f99cc935 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -152,41 +152,44 @@ impl Worker for LifecycleWorker { pos, last_bucket, } => { - let (object_bytes, next_pos) = match self - .garage - .object_table - .data - .store - .get_gt(&pos)? - { - None => { - info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); - self.persister - .set_with(|x| x.last_completed = Some(date.to_string()))?; - self.state = State::Completed(*date); - return Ok(WorkerState::Idle); + // Process a batch of 100 items before yielding to bg task scheduler + for _ in 0..100 { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(date.to_string()))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + let skip = process_object( + &self.garage, + *date, + &object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + if skip == Skip::SkipBucket { + let bucket_id_len = object.bucket_id.as_slice().len(); + assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); + *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + } else { + *pos = next_pos; } - Some((k, v)) => (v, k), - }; - - let object = self.garage.object_table.data.decode_entry(&object_bytes)?; - let skip = process_object( - &self.garage, - *date, - &object, - objects_expired, - mpu_aborted, - last_bucket, - ) - .await?; - - *counter += 1; - if skip == Skip::SkipBucket { - let bucket_id_len = object.bucket_id.as_slice().len(); - assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); - *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); - } else { - *pos = next_pos; } Ok(WorkerState::Busy) @@ -260,6 +263,8 @@ async fn process_object( return Ok(Skip::SkipBucket); } + let db = garage.object_table.data.store.db(); + for rule in lifecycle_policy.iter() { if !rule.enabled { continue; @@ -310,7 +315,9 @@ async fn process_object( "Lifecycle: expiring 1 object in bucket {:?}", object.bucket_id ); - garage.object_table.insert(&deleted_object).await?; + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &deleted_object) + })?; *objects_expired += 1; } } @@ -343,7 +350,9 @@ async fn process_object( ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); - garage.object_table.insert(&aborted_object).await?; + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &aborted_object) + })?; *mpu_aborted += n_aborted; } } -- cgit v1.2.3 From a00a52633f7846c3683da65a07266a03f88b0f74 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:25:14 +0200 Subject: lifecycle worker: add log message when starting --- src/model/s3/lifecycle_worker.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index f99cc935..53c84a17 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -78,14 +78,7 @@ impl LifecycleWorker { }); let state = match last_completed { Some(d) if d >= today => State::Completed(d), - _ => State::Running { - date: today, - pos: vec![], - counter: 0, - objects_expired: 0, - mpu_aborted: 0, - last_bucket: None, - }, + _ => State::start(today), }; Self { garage, @@ -95,6 +88,20 @@ impl LifecycleWorker { } } +impl State { + fn start(date: NaiveDate) -> Self { + info!("Starting lifecycle worker for {}", date); + State::Running { + date, + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + } + } +} + #[async_trait] impl Worker for LifecycleWorker { fn name(&self) -> String { @@ -213,14 +220,7 @@ impl Worker for LifecycleWorker { break; } } - self.state = State::Running { - date: std::cmp::max(next_day, today()), - pos: vec![], - counter: 0, - objects_expired: 0, - mpu_aborted: 0, - last_bucket: None, - }; + self.state = State::start(std::cmp::max(next_day, today())); } State::Running { .. } => (), } -- cgit v1.2.3 From f579d6d9b42ef03d639cc7356b2fa15265074120 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:29:54 +0200 Subject: lifecycle worker: fix potential inifinite loop --- src/model/s3/lifecycle_worker.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 53c84a17..0747ffb8 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -193,7 +193,10 @@ impl Worker for LifecycleWorker { if skip == Skip::SkipBucket { let bucket_id_len = object.bucket_id.as_slice().len(); assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); - *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + *pos = std::cmp::max( + next_pos, + [&pos[..bucket_id_len], &[0xFFu8][..]].concat(), + ); } else { *pos = next_pos; } -- cgit v1.2.3 From 1cdc321e28ccfbbe425365f3a03a526c3f456e3f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:36:30 +0200 Subject: lifecycle worker: don't get stuck on non-existent bucket --- src/model/s3/lifecycle_worker.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 0747ffb8..ed762413 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -249,11 +249,22 @@ async fn process_object( let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, - _ => garage - .bucket_table - .get(&EmptyKey, &object.bucket_id) - .await? - .ok_or_message("object in non-existent bucket")?, + _ => { + match garage + .bucket_table + .get(&EmptyKey, &object.bucket_id) + .await? + { + Some(b) => b, + None => { + warn!( + "Lifecycle worker: object in non-existent bucket {:?}", + object.bucket_id + ); + return Ok(Skip::SkipBucket); + } + } + } }; let lifecycle_policy: &[LifecycleRule] = bucket -- cgit v1.2.3 From 8e0c020bb95a05ea657fa75cf19f8e125d9c602d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:45:19 +0200 Subject: lifecycle worker: correct small clippy lints --- src/model/s3/lifecycle_worker.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index ed762413..4734742d 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -7,7 +7,7 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::{Error, OkOrMessage}; +use garage_util::error::Error; use garage_util::persister::PersisterShared; use garage_util::time::*; @@ -305,7 +305,7 @@ async fn process_object( (now_date - version_date) >= chrono::Duration::days(*n_days as i64) } LifecycleExpiration::AtDate(exp_date) => { - if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { + if let Ok(exp_date) = parse_lifecycle_date(exp_date) { now_date >= exp_date } else { warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); @@ -391,7 +391,7 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) return false; } } - return true; + true } fn midnight_ts(date: NaiveDate) -> u64 { -- cgit v1.2.3