diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/model/s3/lifecycle_worker.rs | 252 | ||||
-rw-r--r-- | src/model/s3/mod.rs | 2 |
2 files changed, 254 insertions, 0 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs new file mode 100644 index 00000000..049fa2a3 --- /dev/null +++ b/src/model/s3/lifecycle_worker.rs @@ -0,0 +1,252 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::prelude::*; +use std::time::{Duration, Instant}; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::{Error, OkOrMessage}; +use garage_util::persister::PersisterShared; +use garage_util::time::*; + +use garage_table::EmptyKey; + +use crate::bucket_table::*; +use crate::s3::object_table::*; + +use crate::garage::Garage; + +mod v090 { + use chrono::naive::NaiveDate; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize, Default, Clone, Copy)] + pub struct LifecycleWorkerPersisted { + pub last_completed: Option<NaiveDate>, + } + + impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { + const VERSION_MARKER: &'static [u8] = b"G09lwp"; + } +} + +pub use v090::*; + +pub struct LifecycleWorker { + garage: Arc<Garage>, + + state: State, + + persister: PersisterShared<LifecycleWorkerPersisted>, +} + +enum State { + Completed(NaiveDate), + Running { + date: NaiveDate, + pos: Vec<u8>, + counter: usize, + objects_expired: usize, + mpu_aborted: usize, + last_bucket: Option<Bucket>, + }, +} + +pub fn register_bg_vars( + persister: &PersisterShared<LifecycleWorkerPersisted>, + vars: &mut vars::BgVars, +) { + vars.register_ro(persister, "lifecycle-last-completed", |p| { + p.get_with(|x| { + x.last_completed + .map(|date| date.to_string()) + .unwrap_or("never".to_string()) + }) + }); +} + +impl LifecycleWorker { + pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self { + let today = today(); + let state = match persister.get_with(|x| x.last_completed) { + Some(d) if d >= today => State::Completed(d), + _ => State::Running { + date: today, + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + }, + }; + Self { + garage, + state, + persister, + } + } +} + +#[async_trait] +impl Worker for LifecycleWorker { + fn name(&self) -> String { + "object lifecycle worker".to_string() + } + + fn status(&self) -> WorkerStatus { + match &self.state { + State::Completed(d) => WorkerStatus { + freeform: vec![format!("Last completed: {}", d)], + ..Default::default() + }, + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + .. + } => { + let n_objects = self + .garage + .object_table + .data + .store + .fast_len() + .unwrap_or(None); + let progress = match n_objects { + None => "...".to_string(), + Some(total) => format!( + "~{:.2}%", + 100. * std::cmp::min(*counter, total) as f32 / total as f32 + ), + }; + WorkerStatus { + progress: Some(progress), + freeform: vec![ + format!("Started: {}", date), + format!("Objects expired: {}", objects_expired), + format!("Multipart uploads aborted: { }", mpu_aborted), + ], + ..Default::default() + } + } + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + match &mut self.state { + State::Completed(_) => Ok(WorkerState::Idle), + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + pos, + last_bucket, + } => { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(*date))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + process_object( + &self.garage, + object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + *pos = next_pos; + + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + match &self.state { + State::Completed(d) => { + let now = now_msec(); + let next_start = midnight_ts(d.succ()); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } + self.state = State::Running { + date: today(), + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + }; + } + State::Running { .. } => (), + } + WorkerState::Busy + } +} + +async fn process_object( + garage: &Arc<Garage>, + object: Object, + objects_expired: &mut usize, + mpu_aborted: &mut usize, + last_bucket: &mut Option<Bucket>, +) -> Result<(), Error> { + let bucket = match last_bucket.take() { + Some(b) if b.id == object.bucket_id => b, + _ => garage + .bucket_table + .get(&EmptyKey, &object.bucket_id) + .await? + .ok_or_message("object in non-existent bucket")?, + }; + + let lifecycle_policy: &[LifecycleRule] = bucket + .state + .as_option() + .and_then(|s| s.lifecycle_config.get().as_deref()) + .unwrap_or_default(); + + for rule in lifecycle_policy.iter() { + todo!() + } + + *last_bucket = Some(bucket); + Ok(()) +} + +fn midnight_ts(date: NaiveDate) -> u64 { + date.and_hms(0, 0, 0).timestamp_millis() as u64 +} + +fn next_date(ts: u64) -> NaiveDate { + NaiveDateTime::from_timestamp_millis(ts as i64) + .expect("bad timestamp") + .date() + .succ() +} + +fn today() -> NaiveDate { + Utc::today().naive_utc() +} diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 36d67093..5c776fb0 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -2,3 +2,5 @@ pub mod block_ref_table; pub mod mpu_table; pub mod object_table; pub mod version_table; + +pub mod lifecycle_worker; |