diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 57 | ||||
-rw-r--r-- | src/model/garage.rs | 26 | ||||
-rw-r--r-- | src/model/migrate.rs | 1 | ||||
-rw-r--r-- | src/model/s3/lifecycle_worker.rs | 413 | ||||
-rw-r--r-- | src/model/s3/mod.rs | 2 |
6 files changed, 494 insertions, 6 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 69f7eea4..3794cc59 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -23,6 +23,7 @@ garage_util.workspace = true async-trait = "0.1.7" arc-swap = "1.0" blake2 = "0.10" +chrono = "0.4" err-derive = "0.3" hex = "0.4" base64 = "0.21" diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index ac163736..4c48a76f 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -48,6 +48,9 @@ mod v08 { pub website_config: crdt::Lww<Option<WebsiteConfig>>, /// CORS rules pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, + /// Lifecycle configuration + #[serde(default)] + pub lifecycle_config: crdt::Lww<Option<Vec<LifecycleRule>>>, /// Bucket quotas #[serde(default)] pub quotas: crdt::Lww<BucketQuotas>, @@ -69,6 +72,42 @@ mod v08 { pub expose_headers: Vec<String>, } + /// Lifecycle configuration rule + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct LifecycleRule { + /// The ID of the rule + pub id: Option<String>, + /// Whether the rule is active + pub enabled: bool, + /// The filter to check whether rule applies to a given object + pub filter: LifecycleFilter, + /// Number of days after which incomplete multipart uploads are aborted + pub abort_incomplete_mpu_days: Option<usize>, + /// Expiration policy for stored objects + pub expiration: Option<LifecycleExpiration>, + } + + /// A lifecycle filter is a set of conditions that must all be true. + /// For each condition, if it is None, it is not verified (always true), + /// and if it is Some(x), then it is verified for value x + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)] + pub struct LifecycleFilter { + /// If Some(x), object key has to start with prefix x + pub prefix: Option<String>, + /// If Some(x), object size has to be more than x + pub size_gt: Option<u64>, + /// If Some(x), object size has to be less than x + pub size_lt: Option<u64>, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum LifecycleExpiration { + /// Objects expire x days after they were created + AfterDays(usize), + /// Objects expire at date x (must be in yyyy-mm-dd format) + AtDate(String), + } + #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct BucketQuotas { /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) @@ -88,7 +127,7 @@ impl AutoCrdt for BucketQuotas { impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss - pub fn new() -> Self { + fn new() -> Self { BucketParams { creation_date: now_msec(), authorized_keys: crdt::Map::new(), @@ -96,6 +135,7 @@ impl BucketParams { local_aliases: crdt::LwwMap::new(), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), + lifecycle_config: crdt::Lww::new(None), quotas: crdt::Lww::new(BucketQuotas::default()), } } @@ -111,10 +151,25 @@ impl Crdt for BucketParams { self.website_config.merge(&o.website_config); self.cors_config.merge(&o.cors_config); + self.lifecycle_config.merge(&o.lifecycle_config); self.quotas.merge(&o.quotas); } } +pub fn parse_lifecycle_date(date: &str) -> Result<chrono::NaiveDate, &'static str> { + use chrono::prelude::*; + + if let Ok(datetime) = NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%SZ") { + if datetime.time() == NaiveTime::MIN { + Ok(datetime.date()) + } else { + Err("date must be at midnight") + } + } else { + NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| "date has invalid format") + } +} + impl Default for Bucket { fn default() -> Self { Self::new() diff --git a/src/model/garage.rs b/src/model/garage.rs index db2475ed..981430fb 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,6 +7,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; +use garage_util::persister::PersisterShared; use garage_rpc::replication_mode::ReplicationMode; use garage_rpc::system::System; @@ -17,6 +18,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::s3::block_ref_table::*; +use crate::s3::lifecycle_worker; use crate::s3::mpu_table::*; use crate::s3::object_table::*; use crate::s3::version_table::*; @@ -67,6 +69,9 @@ pub struct Garage { /// Table containing S3 block references (not blocks themselves) pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + /// Persister for lifecycle worker info + pub lifecycle_persister: PersisterShared<lifecycle_worker::LifecycleWorkerPersisted>, + #[cfg(feature = "k2v")] pub k2v: GarageK2V, } @@ -199,6 +204,9 @@ impl Garage { let replication_mode = ReplicationMode::parse(&config.replication_mode) .ok_or_message("Invalid replication_mode in config file.")?; + info!("Initialize background variable system..."); + let mut bg_vars = vars::BgVars::new(); + info!("Initialize membership management system..."); let system = System::new(network_key, replication_mode, &config)?; @@ -230,6 +238,7 @@ impl Garage { data_rep_param, system.clone(), ); + block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- info!("Initialize bucket_table..."); @@ -296,14 +305,15 @@ impl Garage { &db, ); + info!("Load lifecycle worker state..."); + let lifecycle_persister = + PersisterShared::new(&system.metadata_dir, "lifecycle_worker_state"); + lifecycle_worker::register_bg_vars(&lifecycle_persister, &mut bg_vars); + // ---- K2V ---- #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - // Initialize bg vars - let mut bg_vars = vars::BgVars::new(); - block_manager.register_bg_vars(&mut bg_vars); - // -- done -- Ok(Arc::new(Self { config, @@ -321,12 +331,13 @@ impl Garage { mpu_counter_table, version_table, block_ref_table, + lifecycle_persister, #[cfg(feature = "k2v")] k2v, })) } - pub fn spawn_workers(&self, bg: &BackgroundRunner) { + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { self.block_manager.spawn_workers(bg); self.bucket_table.spawn_workers(bg); @@ -340,6 +351,11 @@ impl Garage { self.version_table.spawn_workers(bg); self.block_ref_table.spawn_workers(bg); + bg.spawn_worker(lifecycle_worker::LifecycleWorker::new( + self.clone(), + self.lifecycle_persister.clone(), + )); + #[cfg(feature = "k2v")] self.k2v.spawn_workers(bg); } diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 6b4c3eed..4c74b43b 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -78,6 +78,7 @@ impl Migrate { local_aliases: LwwMap::new(), website_config: Lww::new(website), cors_config: Lww::new(None), + lifecycle_config: Lww::new(None), quotas: Lww::new(Default::default()), }), }) diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs new file mode 100644 index 00000000..4734742d --- /dev/null +++ b/src/model/s3/lifecycle_worker.rs @@ -0,0 +1,413 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::prelude::*; +use std::time::{Duration, Instant}; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::Error; +use garage_util::persister::PersisterShared; +use garage_util::time::*; + +use garage_table::EmptyKey; + +use crate::bucket_table::*; +use crate::s3::object_table::*; + +use crate::garage::Garage; + +mod v090 { + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize, Default, Clone)] + pub struct LifecycleWorkerPersisted { + pub last_completed: Option<String>, + } + + impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { + const VERSION_MARKER: &'static [u8] = b"G09lwp"; + } +} + +pub use v090::*; + +pub struct LifecycleWorker { + garage: Arc<Garage>, + + state: State, + + persister: PersisterShared<LifecycleWorkerPersisted>, +} + +enum State { + Completed(NaiveDate), + Running { + date: NaiveDate, + pos: Vec<u8>, + counter: usize, + objects_expired: usize, + mpu_aborted: usize, + last_bucket: Option<Bucket>, + }, +} + +#[derive(Clone, Copy, Eq, PartialEq)] +enum Skip { + SkipBucket, + NextObject, +} + +pub fn register_bg_vars( + persister: &PersisterShared<LifecycleWorkerPersisted>, + vars: &mut vars::BgVars, +) { + vars.register_ro(persister, "lifecycle-last-completed", |p| { + p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string())) + }); +} + +impl LifecycleWorker { + pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self { + let today = today(); + let last_completed = persister.get_with(|x| { + x.last_completed + .as_deref() + .and_then(|x| x.parse::<NaiveDate>().ok()) + }); + let state = match last_completed { + Some(d) if d >= today => State::Completed(d), + _ => State::start(today), + }; + Self { + garage, + state, + persister, + } + } +} + +impl State { + fn start(date: NaiveDate) -> Self { + info!("Starting lifecycle worker for {}", date); + State::Running { + date, + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + } + } +} + +#[async_trait] +impl Worker for LifecycleWorker { + fn name(&self) -> String { + "object lifecycle worker".to_string() + } + + fn status(&self) -> WorkerStatus { + match &self.state { + State::Completed(d) => WorkerStatus { + freeform: vec![format!("Last completed: {}", d)], + ..Default::default() + }, + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + .. + } => { + let n_objects = self + .garage + .object_table + .data + .store + .fast_len() + .unwrap_or(None); + let progress = match n_objects { + None => "...".to_string(), + Some(total) => format!( + "~{:.2}%", + 100. * std::cmp::min(*counter, total) as f32 / total as f32 + ), + }; + WorkerStatus { + progress: Some(progress), + freeform: vec![ + format!("Started: {}", date), + format!("Objects expired: {}", objects_expired), + format!("Multipart uploads aborted: { }", mpu_aborted), + ], + ..Default::default() + } + } + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + match &mut self.state { + State::Completed(_) => Ok(WorkerState::Idle), + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + pos, + last_bucket, + } => { + // Process a batch of 100 items before yielding to bg task scheduler + for _ in 0..100 { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(date.to_string()))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + let skip = process_object( + &self.garage, + *date, + &object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + if skip == Skip::SkipBucket { + let bucket_id_len = object.bucket_id.as_slice().len(); + assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); + *pos = std::cmp::max( + next_pos, + [&pos[..bucket_id_len], &[0xFFu8][..]].concat(), + ); + } else { + *pos = next_pos; + } + } + + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + match &self.state { + State::Completed(d) => { + let next_day = d.succ_opt().expect("no next day"); + let next_start = midnight_ts(next_day); + loop { + let now = now_msec(); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } else { + break; + } + } + self.state = State::start(std::cmp::max(next_day, today())); + } + State::Running { .. } => (), + } + WorkerState::Busy + } +} + +async fn process_object( + garage: &Arc<Garage>, + now_date: NaiveDate, + object: &Object, + objects_expired: &mut usize, + mpu_aborted: &mut usize, + last_bucket: &mut Option<Bucket>, +) -> Result<Skip, Error> { + if !object + .versions() + .iter() + .any(|x| x.is_data() || x.is_uploading(None)) + { + return Ok(Skip::NextObject); + } + + let bucket = match last_bucket.take() { + Some(b) if b.id == object.bucket_id => b, + _ => { + match garage + .bucket_table + .get(&EmptyKey, &object.bucket_id) + .await? + { + Some(b) => b, + None => { + warn!( + "Lifecycle worker: object in non-existent bucket {:?}", + object.bucket_id + ); + return Ok(Skip::SkipBucket); + } + } + } + }; + + let lifecycle_policy: &[LifecycleRule] = bucket + .state + .as_option() + .and_then(|s| s.lifecycle_config.get().as_deref()) + .unwrap_or_default(); + + if lifecycle_policy.iter().all(|x| !x.enabled) { + return Ok(Skip::SkipBucket); + } + + let db = garage.object_table.data.store.db(); + + for rule in lifecycle_policy.iter() { + if !rule.enabled { + continue; + } + + if let Some(pfx) = &rule.filter.prefix { + if !object.key.starts_with(pfx) { + continue; + } + } + + if let Some(expire) = &rule.expiration { + if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) { + let version_date = next_date(current_version.timestamp); + + let current_version_data = match ¤t_version.state { + ObjectVersionState::Complete(c) => c, + _ => unreachable!(), + }; + + let size_match = check_size_filter(current_version_data, &rule.filter); + let date_match = match expire { + LifecycleExpiration::AfterDays(n_days) => { + (now_date - version_date) >= chrono::Duration::days(*n_days as i64) + } + LifecycleExpiration::AtDate(exp_date) => { + if let Ok(exp_date) = parse_lifecycle_date(exp_date) { + now_date >= exp_date + } else { + warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); + false + } + } + }; + + if size_match && date_match { + // Delete expired version + let deleted_object = Object::new( + object.bucket_id, + object.key.clone(), + vec![ObjectVersion { + uuid: gen_uuid(), + timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1), + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + info!( + "Lifecycle: expiring 1 object in bucket {:?}", + object.bucket_id + ); + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &deleted_object) + })?; + *objects_expired += 1; + } + } + } + + if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days { + let aborted_versions = object + .versions() + .iter() + .filter_map(|v| { + let version_date = next_date(v.timestamp); + if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64) + && matches!(&v.state, ObjectVersionState::Uploading { .. }) + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } else { + None + } + }) + .collect::<Vec<_>>(); + if !aborted_versions.is_empty() { + // Insert aborted mpu info + let n_aborted = aborted_versions.len(); + info!( + "Lifecycle: aborting {} incomplete upload(s) in bucket {:?}", + n_aborted, object.bucket_id + ); + let aborted_object = + Object::new(object.bucket_id, object.key.clone(), aborted_versions); + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &aborted_object) + })?; + *mpu_aborted += n_aborted; + } + } + } + + *last_bucket = Some(bucket); + Ok(Skip::NextObject) +} + +fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { + let size = match version_data { + ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size, + _ => unreachable!(), + }; + if let Some(size_gt) = filter.size_gt { + if !(size > size_gt) { + return false; + } + } + if let Some(size_lt) = filter.size_lt { + if !(size < size_lt) { + return false; + } + } + true +} + +fn midnight_ts(date: NaiveDate) -> u64 { + date.and_hms_opt(0, 0, 0) + .expect("midnight does not exist") + .timestamp_millis() as u64 +} + +fn next_date(ts: u64) -> NaiveDate { + NaiveDateTime::from_timestamp_millis(ts as i64) + .expect("bad timestamp") + .date() + .succ_opt() + .expect("no next day") +} + +fn today() -> NaiveDate { + Utc::now().naive_utc().date() +} diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 36d67093..5c776fb0 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -2,3 +2,5 @@ pub mod block_ref_table; pub mod mpu_table; pub mod object_table; pub mod version_table; + +pub mod lifecycle_worker; |