aboutsummaryrefslogtreecommitdiff
path: root/src/model/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/s3')
-rw-r--r--src/model/s3/lifecycle_worker.rs252
-rw-r--r--src/model/s3/mod.rs2
2 files changed, 254 insertions, 0 deletions
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
new file mode 100644
index 00000000..049fa2a3
--- /dev/null
+++ b/src/model/s3/lifecycle_worker.rs
@@ -0,0 +1,252 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use chrono::prelude::*;
+use std::time::{Duration, Instant};
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::{Error, OkOrMessage};
+use garage_util::persister::PersisterShared;
+use garage_util::time::*;
+
+use garage_table::EmptyKey;
+
+use crate::bucket_table::*;
+use crate::s3::object_table::*;
+
+use crate::garage::Garage;
+
+mod v090 {
+ use chrono::naive::NaiveDate;
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize, Default, Clone, Copy)]
+ pub struct LifecycleWorkerPersisted {
+ pub last_completed: Option<NaiveDate>,
+ }
+
+ impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted {
+ const VERSION_MARKER: &'static [u8] = b"G09lwp";
+ }
+}
+
+pub use v090::*;
+
+pub struct LifecycleWorker {
+ garage: Arc<Garage>,
+
+ state: State,
+
+ persister: PersisterShared<LifecycleWorkerPersisted>,
+}
+
+enum State {
+ Completed(NaiveDate),
+ Running {
+ date: NaiveDate,
+ pos: Vec<u8>,
+ counter: usize,
+ objects_expired: usize,
+ mpu_aborted: usize,
+ last_bucket: Option<Bucket>,
+ },
+}
+
+pub fn register_bg_vars(
+ persister: &PersisterShared<LifecycleWorkerPersisted>,
+ vars: &mut vars::BgVars,
+) {
+ vars.register_ro(persister, "lifecycle-last-completed", |p| {
+ p.get_with(|x| {
+ x.last_completed
+ .map(|date| date.to_string())
+ .unwrap_or("never".to_string())
+ })
+ });
+}
+
+impl LifecycleWorker {
+ pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
+ let today = today();
+ let state = match persister.get_with(|x| x.last_completed) {
+ Some(d) if d >= today => State::Completed(d),
+ _ => State::Running {
+ date: today,
+ pos: vec![],
+ counter: 0,
+ objects_expired: 0,
+ mpu_aborted: 0,
+ last_bucket: None,
+ },
+ };
+ Self {
+ garage,
+ state,
+ persister,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for LifecycleWorker {
+ fn name(&self) -> String {
+ "object lifecycle worker".to_string()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ match &self.state {
+ State::Completed(d) => WorkerStatus {
+ freeform: vec![format!("Last completed: {}", d)],
+ ..Default::default()
+ },
+ State::Running {
+ date,
+ counter,
+ objects_expired,
+ mpu_aborted,
+ ..
+ } => {
+ let n_objects = self
+ .garage
+ .object_table
+ .data
+ .store
+ .fast_len()
+ .unwrap_or(None);
+ let progress = match n_objects {
+ None => "...".to_string(),
+ Some(total) => format!(
+ "~{:.2}%",
+ 100. * std::cmp::min(*counter, total) as f32 / total as f32
+ ),
+ };
+ WorkerStatus {
+ progress: Some(progress),
+ freeform: vec![
+ format!("Started: {}", date),
+ format!("Objects expired: {}", objects_expired),
+ format!("Multipart uploads aborted: { }", mpu_aborted),
+ ],
+ ..Default::default()
+ }
+ }
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match &mut self.state {
+ State::Completed(_) => Ok(WorkerState::Idle),
+ State::Running {
+ date,
+ counter,
+ objects_expired,
+ mpu_aborted,
+ pos,
+ last_bucket,
+ } => {
+ let (object_bytes, next_pos) = match self
+ .garage
+ .object_table
+ .data
+ .store
+ .get_gt(&pos)?
+ {
+ None => {
+ info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
+ self.persister
+ .set_with(|x| x.last_completed = Some(*date))?;
+ self.state = State::Completed(*date);
+ return Ok(WorkerState::Idle);
+ }
+ Some((k, v)) => (v, k),
+ };
+
+ let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
+ process_object(
+ &self.garage,
+ object,
+ objects_expired,
+ mpu_aborted,
+ last_bucket,
+ )
+ .await?;
+
+ *counter += 1;
+ *pos = next_pos;
+
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ match &self.state {
+ State::Completed(d) => {
+ let now = now_msec();
+ let next_start = midnight_ts(d.succ());
+ if now < next_start {
+ tokio::time::sleep_until(
+ (Instant::now() + Duration::from_millis(next_start - now)).into(),
+ )
+ .await;
+ }
+ self.state = State::Running {
+ date: today(),
+ pos: vec![],
+ counter: 0,
+ objects_expired: 0,
+ mpu_aborted: 0,
+ last_bucket: None,
+ };
+ }
+ State::Running { .. } => (),
+ }
+ WorkerState::Busy
+ }
+}
+
+async fn process_object(
+ garage: &Arc<Garage>,
+ object: Object,
+ objects_expired: &mut usize,
+ mpu_aborted: &mut usize,
+ last_bucket: &mut Option<Bucket>,
+) -> Result<(), Error> {
+ let bucket = match last_bucket.take() {
+ Some(b) if b.id == object.bucket_id => b,
+ _ => garage
+ .bucket_table
+ .get(&EmptyKey, &object.bucket_id)
+ .await?
+ .ok_or_message("object in non-existent bucket")?,
+ };
+
+ let lifecycle_policy: &[LifecycleRule] = bucket
+ .state
+ .as_option()
+ .and_then(|s| s.lifecycle_config.get().as_deref())
+ .unwrap_or_default();
+
+ for rule in lifecycle_policy.iter() {
+ todo!()
+ }
+
+ *last_bucket = Some(bucket);
+ Ok(())
+}
+
+fn midnight_ts(date: NaiveDate) -> u64 {
+ date.and_hms(0, 0, 0).timestamp_millis() as u64
+}
+
+fn next_date(ts: u64) -> NaiveDate {
+ NaiveDateTime::from_timestamp_millis(ts as i64)
+ .expect("bad timestamp")
+ .date()
+ .succ()
+}
+
+fn today() -> NaiveDate {
+ Utc::today().naive_utc()
+}
diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs
index 36d67093..5c776fb0 100644
--- a/src/model/s3/mod.rs
+++ b/src/model/s3/mod.rs
@@ -2,3 +2,5 @@ pub mod block_ref_table;
pub mod mpu_table;
pub mod object_table;
pub mod version_table;
+
+pub mod lifecycle_worker;