aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-09-04 09:45:10 +0000
committerAlex <alex@adnab.me>2023-09-04 09:45:10 +0000
commit3f461d889143c5f6edf64ff9649647d944a2ab17 (patch)
treea2351a3eceaf4ab94dbae783f3f7e5855cc5b747
parent2e90e1c124ea298de5e613de5a672f7c90ab6704 (diff)
parent8e0c020bb95a05ea657fa75cf19f8e125d9c602d (diff)
downloadgarage-3f461d889143c5f6edf64ff9649647d944a2ab17.tar.gz
garage-3f461d889143c5f6edf64ff9649647d944a2ab17.zip
Merge pull request 'object lifecycles (fix #309)' (#620) from bucket-lifecycle into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/620
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.nix3
-rw-r--r--doc/book/reference-manual/s3-compatibility.md32
-rw-r--r--src/api/s3/api_server.rs16
-rw-r--r--src/api/s3/cors.rs22
-rw-r--r--src/api/s3/lifecycle.rs401
-rw-r--r--src/api/s3/mod.rs1
-rw-r--r--src/api/s3/website.rs22
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/bucket_table.rs57
-rw-r--r--src/model/garage.rs26
-rw-r--r--src/model/migrate.rs1
-rw-r--r--src/model/s3/lifecycle_worker.rs413
-rw-r--r--src/model/s3/mod.rs2
14 files changed, 945 insertions, 53 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 3472190b..79b35191 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1340,6 +1340,7 @@ dependencies = [
"async-trait",
"base64 0.21.3",
"blake2",
+ "chrono",
"err-derive",
"futures",
"futures-util",
diff --git a/Cargo.nix b/Cargo.nix
index d044c649..dc30c355 100644
--- a/Cargo.nix
+++ b/Cargo.nix
@@ -33,7 +33,7 @@ args@{
ignoreLockHash,
}:
let
- nixifiedLockHash = "d4392b23d407f7ebc20d7f5db7583847e362665c1abb09f1c1d3305205e5996d";
+ nixifiedLockHash = "f5b86f9d75664ba528a26ae71f07a38e9c72c78fe331420b9b639e2a099d4dad";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash
@@ -1911,6 +1911,7 @@ in
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.73" { profileName = "__noProfile"; }).out;
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.3" { inherit profileName; }).out;
blake2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.10.6" { inherit profileName; }).out;
+ chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.26" { inherit profileName; }).out;
err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.28" { inherit profileName; }).out;
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.28" { inherit profileName; }).out;
diff --git a/doc/book/reference-manual/s3-compatibility.md b/doc/book/reference-manual/s3-compatibility.md
index 15b29bd1..1bcfd123 100644
--- a/doc/book/reference-manual/s3-compatibility.md
+++ b/doc/book/reference-manual/s3-compatibility.md
@@ -75,16 +75,13 @@ but these endpoints are documented in [Red Hat Ceph Storage - Chapter 2. Ceph Ob
| Endpoint | Garage | [Openstack Swift](https://docs.openstack.org/swift/latest/s3_compat.html) | [Ceph Object Gateway](https://docs.ceph.com/en/latest/radosgw/s3/) | [Riak CS](https://docs.riak.com/riak/cs/2.1.1/references/apis/storage/s3/index.html) | [OpenIO](https://docs.openio.io/latest/source/arch-design/s3_compliancy.html) |
|------------------------------|----------------------------------|-----------------|---------------|---------|-----|
-| [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
-| [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) | ✅ Implemented (see details below) | ✅ | ✅ | ✅ | ✅ |
-| [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html) | ✅ Implemented | ✅| ✅ | ✅ | ✅ |
-| [ListMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
-| [ListParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
-| [UploadPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html) | ✅ Implemented (see details below) | ✅ | ✅| ✅ | ✅ |
-| [UploadPartCopy](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
-
-Our implementation of Multipart Upload is currently a bit more restrictive than Amazon's one in some edge cases.
-For more information, please refer to our [issue tracker](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/204).
+| [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
+| [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
+| [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html) | ✅ Implemented | ✅| ✅ | ✅ | ✅ |
+| [ListMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
+| [ListParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
+| [UploadPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html) | ✅ Implemented | ✅ | ✅| ✅ | ✅ |
+| [UploadPartCopy](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
### Website endpoints
@@ -127,15 +124,22 @@ If you need this feature, please [share your use case in our dedicated issue](ht
| Endpoint | Garage | [Openstack Swift](https://docs.openstack.org/swift/latest/s3_compat.html) | [Ceph Object Gateway](https://docs.ceph.com/en/latest/radosgw/s3/) | [Riak CS](https://docs.riak.com/riak/cs/2.1.1/references/apis/storage/s3/index.html) | [OpenIO](https://docs.openio.io/latest/source/arch-design/s3_compliancy.html) |
|------------------------------|----------------------------------|-----------------|---------------|---------|-----|
-| [DeleteBucketLifecycle](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html) | ❌ Missing | ❌| ✅| ❌| ✅|
-| [GetBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
-| [PutBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
+| [DeleteBucketLifecycle](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html) | ✅ Implemented | ❌| ✅| ❌| ✅|
+| [GetBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html) | ✅ Implemented | ❌| ✅ | ❌| ✅|
+| [PutBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html) | ⚠ Partially implemented (see below) | ❌| ✅ | ❌| ✅|
| [GetBucketVersioning](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html) | ❌ Stub (see below) | ✅| ✅ | ❌| ✅|
| [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
| [PutBucketVersioning](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html) | ❌ Missing | ❌| ✅| ❌| ✅|
+**PutBucketLifecycleConfiguration:** The only actions supported are
+`AbortIncompleteMultipartUpload` and `Expiration` (without the
+`ExpiredObjectDeleteMarker` field). All other operations are dependent on
+either bucket versionning or storage classes which Garage currently does not
+implement. The deprecated `Prefix` member directly in the the `Rule`
+structure/XML tag is not supported, specified prefixes must be inside the
+`Filter` structure/XML tag.
-**GetBucketVersioning:** Stub implementation (Garage does not yet support versionning so this always returns "versionning not enabled").
+**GetBucketVersioning:** Stub implementation which always returns "versionning not enabled", since Garage does not yet support bucket versionning.
### Replication endpoints
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 5e793082..3f995d34 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -26,6 +26,7 @@ use crate::s3::copy::*;
use crate::s3::cors::*;
use crate::s3::delete::*;
use crate::s3::get::*;
+use crate::s3::lifecycle::*;
use crate::s3::list::*;
use crate::s3::multipart::*;
use crate::s3::post_object::handle_post_object;
@@ -354,14 +355,21 @@ impl ApiHandler for S3ApiServer {
}
Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
Endpoint::PutBucketWebsite {} => {
- handle_put_website(garage, bucket_id, req, content_sha256).await
+ handle_put_website(garage, bucket.clone(), req, content_sha256).await
}
- Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
+ Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await,
Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
Endpoint::PutBucketCors {} => {
- handle_put_cors(garage, bucket_id, req, content_sha256).await
+ handle_put_cors(garage, bucket.clone(), req, content_sha256).await
+ }
+ Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await,
+ Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await,
+ Endpoint::PutBucketLifecycleConfiguration {} => {
+ handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await
+ }
+ Endpoint::DeleteBucketLifecycle {} => {
+ handle_delete_lifecycle(garage, bucket.clone()).await
}
- Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index c7273464..49097ad1 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -44,14 +44,11 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
pub async fn handle_delete_cors(
garage: Arc<Garage>,
- bucket_id: Uuid,
+ mut bucket: Bucket,
) -> Result<Response<Body>, Error> {
- let mut bucket = garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let param = bucket.params_mut().unwrap();
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
param.cors_config.update(None);
garage.bucket_table.insert(&bucket).await?;
@@ -63,7 +60,7 @@ pub async fn handle_delete_cors(
pub async fn handle_put_cors(
garage: Arc<Garage>,
- bucket_id: Uuid,
+ mut bucket: Bucket,
req: Request<Body>,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@@ -73,12 +70,9 @@ pub async fn handle_put_cors(
verify_signed_content(content_sha256, &body[..])?;
}
- let mut bucket = garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let param = bucket.params_mut().unwrap();
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
new file mode 100644
index 00000000..1e7d6755
--- /dev/null
+++ b/src/api/s3/lifecycle.rs
@@ -0,0 +1,401 @@
+use quick_xml::de::from_reader;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use serde::{Deserialize, Serialize};
+
+use crate::s3::error::*;
+use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+use crate::signature::verify_signed_content;
+
+use garage_model::bucket_table::{
+ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
+ LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
+};
+use garage_model::garage::Garage;
+use garage_util::data::*;
+
+pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> {
+ let param = bucket
+ .params()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ if let Some(lifecycle) = param.lifecycle_config.get() {
+ let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle);
+ let xml = to_xml_with_header(&wc)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, "application/xml")
+ .body(Body::from(xml))?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+ }
+}
+
+pub async fn handle_delete_lifecycle(
+ garage: Arc<Garage>,
+ mut bucket: Bucket,
+) -> Result<Response<Body>, Error> {
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ param.lifecycle_config.update(None);
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_put_lifecycle(
+ garage: Arc<Garage>,
+ mut bucket: Bucket,
+ req: Request<Body>,
+ content_sha256: Option<Hash>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
+
+ let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
+ let config = conf
+ .validate_into_garage_lifecycle_config()
+ .ok_or_bad_request("Invalid lifecycle configuration")?;
+
+ param.lifecycle_config.update(Some(config));
+ garage.bucket_table.insert(&bucket).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct LifecycleConfiguration {
+ #[serde(serialize_with = "xmlns_tag", skip_deserializing)]
+ pub xmlns: (),
+ #[serde(rename = "Rule")]
+ pub lifecycle_rules: Vec<LifecycleRule>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct LifecycleRule {
+ #[serde(rename = "ID")]
+ pub id: Option<Value>,
+ #[serde(rename = "Status")]
+ pub status: Value,
+ #[serde(rename = "Filter", default)]
+ pub filter: Option<Filter>,
+ #[serde(rename = "Expiration", default)]
+ pub expiration: Option<Expiration>,
+ #[serde(rename = "AbortIncompleteMultipartUpload", default)]
+ pub abort_incomplete_mpu: Option<AbortIncompleteMpu>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Default)]
+pub struct Filter {
+ #[serde(rename = "And")]
+ pub and: Option<Box<Filter>>,
+ #[serde(rename = "Prefix")]
+ pub prefix: Option<Value>,
+ #[serde(rename = "ObjectSizeGreaterThan")]
+ pub size_gt: Option<IntValue>,
+ #[serde(rename = "ObjectSizeLessThan")]
+ pub size_lt: Option<IntValue>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Expiration {
+ #[serde(rename = "Days")]
+ pub days: Option<IntValue>,
+ #[serde(rename = "Date")]
+ pub at_date: Option<Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
+pub struct AbortIncompleteMpu {
+ #[serde(rename = "DaysAfterInitiation")]
+ pub days: IntValue,
+}
+
+impl LifecycleConfiguration {
+ pub fn validate_into_garage_lifecycle_config(
+ self,
+ ) -> Result<Vec<GarageLifecycleRule>, &'static str> {
+ let mut ret = vec![];
+ for rule in self.lifecycle_rules {
+ ret.push(rule.validate_into_garage_lifecycle_rule()?);
+ }
+ Ok(ret)
+ }
+
+ pub fn from_garage_lifecycle_config(config: &[GarageLifecycleRule]) -> Self {
+ Self {
+ xmlns: (),
+ lifecycle_rules: config
+ .iter()
+ .map(LifecycleRule::from_garage_lifecycle_rule)
+ .collect(),
+ }
+ }
+}
+
+impl LifecycleRule {
+ pub fn validate_into_garage_lifecycle_rule(self) -> Result<GarageLifecycleRule, &'static str> {
+ let enabled = match self.status.0.as_str() {
+ "Enabled" => true,
+ "Disabled" => false,
+ _ => return Err("invalid value for <Status>"),
+ };
+
+ let filter = self
+ .filter
+ .map(Filter::validate_into_garage_lifecycle_filter)
+ .transpose()?
+ .unwrap_or_default();
+
+ let abort_incomplete_mpu_days = self.abort_incomplete_mpu.map(|x| x.days.0 as usize);
+
+ let expiration = self
+ .expiration
+ .map(Expiration::validate_into_garage_lifecycle_expiration)
+ .transpose()?;
+
+ Ok(GarageLifecycleRule {
+ id: self.id.map(|x| x.0),
+ enabled,
+ filter,
+ abort_incomplete_mpu_days,
+ expiration,
+ })
+ }
+
+ pub fn from_garage_lifecycle_rule(rule: &GarageLifecycleRule) -> Self {
+ Self {
+ id: rule.id.as_deref().map(Value::from),
+ status: if rule.enabled {
+ Value::from("Enabled")
+ } else {
+ Value::from("Disabled")
+ },
+ filter: Filter::from_garage_lifecycle_filter(&rule.filter),
+ abort_incomplete_mpu: rule
+ .abort_incomplete_mpu_days
+ .map(|days| AbortIncompleteMpu {
+ days: IntValue(days as i64),
+ }),
+ expiration: rule
+ .expiration
+ .as_ref()
+ .map(Expiration::from_garage_lifecycle_expiration),
+ }
+ }
+}
+
+impl Filter {
+ pub fn count(&self) -> i32 {
+ fn count<T>(x: &Option<T>) -> i32 {
+ x.as_ref().map(|_| 1).unwrap_or(0)
+ }
+ count(&self.prefix) + count(&self.size_gt) + count(&self.size_lt)
+ }
+
+ pub fn validate_into_garage_lifecycle_filter(
+ self,
+ ) -> Result<GarageLifecycleFilter, &'static str> {
+ if self.count() > 0 && self.and.is_some() {
+ Err("Filter tag cannot contain both <And> and another condition")
+ } else if let Some(and) = self.and {
+ if and.and.is_some() {
+ return Err("Nested <And> tags");
+ }
+ Ok(and.internal_into_garage_lifecycle_filter())
+ } else if self.count() > 1 {
+ Err("Multiple Filter conditions must be wrapped in an <And> tag")
+ } else {
+ Ok(self.internal_into_garage_lifecycle_filter())
+ }
+ }
+
+ fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter {
+ GarageLifecycleFilter {
+ prefix: self.prefix.map(|x| x.0),
+ size_gt: self.size_gt.map(|x| x.0 as u64),
+ size_lt: self.size_lt.map(|x| x.0 as u64),
+ }
+ }
+
+ pub fn from_garage_lifecycle_filter(rule: &GarageLifecycleFilter) -> Option<Self> {
+ let filter = Filter {
+ and: None,
+ prefix: rule.prefix.as_deref().map(Value::from),
+ size_gt: rule.size_gt.map(|x| IntValue(x as i64)),
+ size_lt: rule.size_lt.map(|x| IntValue(x as i64)),
+ };
+ match filter.count() {
+ 0 => None,
+ 1 => Some(filter),
+ _ => Some(Filter {
+ and: Some(Box::new(filter)),
+ ..Default::default()
+ }),
+ }
+ }
+}
+
+impl Expiration {
+ pub fn validate_into_garage_lifecycle_expiration(
+ self,
+ ) -> Result<GarageLifecycleExpiration, &'static str> {
+ match (self.days, self.at_date) {
+ (Some(_), Some(_)) => Err("cannot have both <Days> and <Date> in <Expiration>"),
+ (None, None) => Err("<Expiration> must contain either <Days> or <Date>"),
+ (Some(days), None) => Ok(GarageLifecycleExpiration::AfterDays(days.0 as usize)),
+ (None, Some(date)) => {
+ parse_lifecycle_date(&date.0)?;
+ Ok(GarageLifecycleExpiration::AtDate(date.0))
+ }
+ }
+ }
+
+ pub fn from_garage_lifecycle_expiration(exp: &GarageLifecycleExpiration) -> Self {
+ match exp {
+ GarageLifecycleExpiration::AfterDays(days) => Expiration {
+ days: Some(IntValue(*days as i64)),
+ at_date: None,
+ },
+ GarageLifecycleExpiration::AtDate(date) => Expiration {
+ days: None,
+ at_date: Some(Value(date.to_string())),
+ },
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use quick_xml::de::from_str;
+
+ #[test]
+ fn test_deserialize_lifecycle_config() -> Result<(), Error> {
+ let message = r#"<?xml version="1.0" encoding="UTF-8"?>
+<LifecycleConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <Rule>
+ <ID>id1</ID>
+ <Status>Enabled</Status>
+ <Filter>
+ <Prefix>documents/</Prefix>
+ </Filter>
+ <AbortIncompleteMultipartUpload>
+ <DaysAfterInitiation>7</DaysAfterInitiation>
+ </AbortIncompleteMultipartUpload>
+ </Rule>
+ <Rule>
+ <ID>id2</ID>
+ <Status>Enabled</Status>
+ <Filter>
+ <And>
+ <Prefix>logs/</Prefix>
+ <ObjectSizeGreaterThan>1000000</ObjectSizeGreaterThan>
+ </And>
+ </Filter>
+ <Expiration>
+ <Days>365</Days>
+ </Expiration>
+ </Rule>
+</LifecycleConfiguration>"#;
+ let conf: LifecycleConfiguration = from_str(message).unwrap();
+ let ref_value = LifecycleConfiguration {
+ xmlns: (),
+ lifecycle_rules: vec![
+ LifecycleRule {
+ id: Some("id1".into()),
+ status: "Enabled".into(),
+ filter: Some(Filter {
+ prefix: Some("documents/".into()),
+ ..Default::default()
+ }),
+ expiration: None,
+ abort_incomplete_mpu: Some(AbortIncompleteMpu { days: IntValue(7) }),
+ },
+ LifecycleRule {
+ id: Some("id2".into()),
+ status: "Enabled".into(),
+ filter: Some(Filter {
+ and: Some(Box::new(Filter {
+ prefix: Some("logs/".into()),
+ size_gt: Some(IntValue(1000000)),
+ ..Default::default()
+ })),
+ ..Default::default()
+ }),
+ expiration: Some(Expiration {
+ days: Some(IntValue(365)),
+ at_date: None,
+ }),
+ abort_incomplete_mpu: None,
+ },
+ ],
+ };
+ assert_eq! {
+ ref_value,
+ conf
+ };
+
+ let message2 = to_xml_with_header(&ref_value)?;
+
+ let cleanup = |c: &str| c.replace(char::is_whitespace, "");
+ assert_eq!(cleanup(message), cleanup(&message2));
+
+ // Check validation
+ let validated = ref_value
+ .validate_into_garage_lifecycle_config()
+ .ok_or_bad_request("invalid xml config")?;
+
+ let ref_config = vec![
+ GarageLifecycleRule {
+ id: Some("id1".into()),
+ enabled: true,
+ filter: GarageLifecycleFilter {
+ prefix: Some("documents/".into()),
+ ..Default::default()
+ },
+ expiration: None,
+ abort_incomplete_mpu_days: Some(7),
+ },
+ GarageLifecycleRule {
+ id: Some("id2".into()),
+ enabled: true,
+ filter: GarageLifecycleFilter {
+ prefix: Some("logs/".into()),
+ size_gt: Some(1000000),
+ ..Default::default()
+ },
+ expiration: Some(GarageLifecycleExpiration::AfterDays(365)),
+ abort_incomplete_mpu_days: None,
+ },
+ ];
+ assert_eq!(validated, ref_config);
+
+ let message3 = to_xml_with_header(&LifecycleConfiguration::from_garage_lifecycle_config(
+ &validated,
+ ))?;
+ assert_eq!(cleanup(message), cleanup(&message3));
+
+ Ok(())
+ }
+}
diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs
index b5237bf7..cbdb94ab 100644
--- a/src/api/s3/mod.rs
+++ b/src/api/s3/mod.rs
@@ -6,6 +6,7 @@ mod copy;
pub mod cors;
mod delete;
pub mod get;
+mod lifecycle;
mod list;
mod multipart;
mod post_object;
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index 77738971..7f2ab925 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -43,14 +43,11 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error
pub async fn handle_delete_website(
garage: Arc<Garage>,
- bucket_id: Uuid,
+ mut bucket: Bucket,
) -> Result<Response<Body>, Error> {
- let mut bucket = garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let param = bucket.params_mut().unwrap();
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
param.website_config.update(None);
garage.bucket_table.insert(&bucket).await?;
@@ -62,7 +59,7 @@ pub async fn handle_delete_website(
pub async fn handle_put_website(
garage: Arc<Garage>,
- bucket_id: Uuid,
+ mut bucket: Bucket,
req: Request<Body>,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@@ -72,12 +69,9 @@ pub async fn handle_put_website(
verify_signed_content(content_sha256, &body[..])?;
}
- let mut bucket = garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let param = bucket.params_mut().unwrap();
+ let param = bucket
+ .params_mut()
+ .ok_or_internal_error("Bucket should not be deleted at this point")?;
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 69f7eea4..3794cc59 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -23,6 +23,7 @@ garage_util.workspace = true
async-trait = "0.1.7"
arc-swap = "1.0"
blake2 = "0.10"
+chrono = "0.4"
err-derive = "0.3"
hex = "0.4"
base64 = "0.21"
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index ac163736..4c48a76f 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -48,6 +48,9 @@ mod v08 {
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Lifecycle configuration
+ #[serde(default)]
+ pub lifecycle_config: crdt::Lww<Option<Vec<LifecycleRule>>>,
/// Bucket quotas
#[serde(default)]
pub quotas: crdt::Lww<BucketQuotas>,
@@ -69,6 +72,42 @@ mod v08 {
pub expose_headers: Vec<String>,
}
+ /// Lifecycle configuration rule
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct LifecycleRule {
+ /// The ID of the rule
+ pub id: Option<String>,
+ /// Whether the rule is active
+ pub enabled: bool,
+ /// The filter to check whether rule applies to a given object
+ pub filter: LifecycleFilter,
+ /// Number of days after which incomplete multipart uploads are aborted
+ pub abort_incomplete_mpu_days: Option<usize>,
+ /// Expiration policy for stored objects
+ pub expiration: Option<LifecycleExpiration>,
+ }
+
+ /// A lifecycle filter is a set of conditions that must all be true.
+ /// For each condition, if it is None, it is not verified (always true),
+ /// and if it is Some(x), then it is verified for value x
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)]
+ pub struct LifecycleFilter {
+ /// If Some(x), object key has to start with prefix x
+ pub prefix: Option<String>,
+ /// If Some(x), object size has to be more than x
+ pub size_gt: Option<u64>,
+ /// If Some(x), object size has to be less than x
+ pub size_lt: Option<u64>,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum LifecycleExpiration {
+ /// Objects expire x days after they were created
+ AfterDays(usize),
+ /// Objects expire at date x (must be in yyyy-mm-dd format)
+ AtDate(String),
+ }
+
#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct BucketQuotas {
/// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
@@ -88,7 +127,7 @@ impl AutoCrdt for BucketQuotas {
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
- pub fn new() -> Self {
+ fn new() -> Self {
BucketParams {
creation_date: now_msec(),
authorized_keys: crdt::Map::new(),
@@ -96,6 +135,7 @@ impl BucketParams {
local_aliases: crdt::LwwMap::new(),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
+ lifecycle_config: crdt::Lww::new(None),
quotas: crdt::Lww::new(BucketQuotas::default()),
}
}
@@ -111,10 +151,25 @@ impl Crdt for BucketParams {
self.website_config.merge(&o.website_config);
self.cors_config.merge(&o.cors_config);
+ self.lifecycle_config.merge(&o.lifecycle_config);
self.quotas.merge(&o.quotas);
}
}
+pub fn parse_lifecycle_date(date: &str) -> Result<chrono::NaiveDate, &'static str> {
+ use chrono::prelude::*;
+
+ if let Ok(datetime) = NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%SZ") {
+ if datetime.time() == NaiveTime::MIN {
+ Ok(datetime.date())
+ } else {
+ Err("date must be at midnight")
+ }
+ } else {
+ NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| "date has invalid format")
+ }
+}
+
impl Default for Bucket {
fn default() -> Self {
Self::new()
diff --git a/src/model/garage.rs b/src/model/garage.rs
index db2475ed..981430fb 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -7,6 +7,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
+use garage_util::persister::PersisterShared;
use garage_rpc::replication_mode::ReplicationMode;
use garage_rpc::system::System;
@@ -17,6 +18,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::s3::block_ref_table::*;
+use crate::s3::lifecycle_worker;
use crate::s3::mpu_table::*;
use crate::s3::object_table::*;
use crate::s3::version_table::*;
@@ -67,6 +69,9 @@ pub struct Garage {
/// Table containing S3 block references (not blocks themselves)
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ /// Persister for lifecycle worker info
+ pub lifecycle_persister: PersisterShared<lifecycle_worker::LifecycleWorkerPersisted>,
+
#[cfg(feature = "k2v")]
pub k2v: GarageK2V,
}
@@ -199,6 +204,9 @@ impl Garage {
let replication_mode = ReplicationMode::parse(&config.replication_mode)
.ok_or_message("Invalid replication_mode in config file.")?;
+ info!("Initialize background variable system...");
+ let mut bg_vars = vars::BgVars::new();
+
info!("Initialize membership management system...");
let system = System::new(network_key, replication_mode, &config)?;
@@ -230,6 +238,7 @@ impl Garage {
data_rep_param,
system.clone(),
);
+ block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----
info!("Initialize bucket_table...");
@@ -296,14 +305,15 @@ impl Garage {
&db,
);
+ info!("Load lifecycle worker state...");
+ let lifecycle_persister =
+ PersisterShared::new(&system.metadata_dir, "lifecycle_worker_state");
+ lifecycle_worker::register_bg_vars(&lifecycle_persister, &mut bg_vars);
+
// ---- K2V ----
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- // Initialize bg vars
- let mut bg_vars = vars::BgVars::new();
- block_manager.register_bg_vars(&mut bg_vars);
-
// -- done --
Ok(Arc::new(Self {
config,
@@ -321,12 +331,13 @@ impl Garage {
mpu_counter_table,
version_table,
block_ref_table,
+ lifecycle_persister,
#[cfg(feature = "k2v")]
k2v,
}))
}
- pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
self.block_manager.spawn_workers(bg);
self.bucket_table.spawn_workers(bg);
@@ -340,6 +351,11 @@ impl Garage {
self.version_table.spawn_workers(bg);
self.block_ref_table.spawn_workers(bg);
+ bg.spawn_worker(lifecycle_worker::LifecycleWorker::new(
+ self.clone(),
+ self.lifecycle_persister.clone(),
+ ));
+
#[cfg(feature = "k2v")]
self.k2v.spawn_workers(bg);
}
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 6b4c3eed..4c74b43b 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -78,6 +78,7 @@ impl Migrate {
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
+ lifecycle_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
})
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
new file mode 100644
index 00000000..4734742d
--- /dev/null
+++ b/src/model/s3/lifecycle_worker.rs
@@ -0,0 +1,413 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use chrono::prelude::*;
+use std::time::{Duration, Instant};
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::Error;
+use garage_util::persister::PersisterShared;
+use garage_util::time::*;
+
+use garage_table::EmptyKey;
+
+use crate::bucket_table::*;
+use crate::s3::object_table::*;
+
+use crate::garage::Garage;
+
+mod v090 {
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize, Default, Clone)]
+ pub struct LifecycleWorkerPersisted {
+ pub last_completed: Option<String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted {
+ const VERSION_MARKER: &'static [u8] = b"G09lwp";
+ }
+}
+
+pub use v090::*;
+
+pub struct LifecycleWorker {
+ garage: Arc<Garage>,
+
+ state: State,
+
+ persister: PersisterShared<LifecycleWorkerPersisted>,
+}
+
+enum State {
+ Completed(NaiveDate),
+ Running {
+ date: NaiveDate,
+ pos: Vec<u8>,
+ counter: usize,
+ objects_expired: usize,
+ mpu_aborted: usize,
+ last_bucket: Option<Bucket>,
+ },
+}
+
+#[derive(Clone, Copy, Eq, PartialEq)]
+enum Skip {
+ SkipBucket,
+ NextObject,
+}
+
+pub fn register_bg_vars(
+ persister: &PersisterShared<LifecycleWorkerPersisted>,
+ vars: &mut vars::BgVars,
+) {
+ vars.register_ro(persister, "lifecycle-last-completed", |p| {
+ p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string()))
+ });
+}
+
+impl LifecycleWorker {
+ pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
+ let today = today();
+ let last_completed = persister.get_with(|x| {
+ x.last_completed
+ .as_deref()
+ .and_then(|x| x.parse::<NaiveDate>().ok())
+ });
+ let state = match last_completed {
+ Some(d) if d >= today => State::Completed(d),
+ _ => State::start(today),
+ };
+ Self {
+ garage,
+ state,
+ persister,
+ }
+ }
+}
+
+impl State {
+ fn start(date: NaiveDate) -> Self {
+ info!("Starting lifecycle worker for {}", date);
+ State::Running {
+ date,
+ pos: vec![],
+ counter: 0,
+ objects_expired: 0,
+ mpu_aborted: 0,
+ last_bucket: None,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for LifecycleWorker {
+ fn name(&self) -> String {
+ "object lifecycle worker".to_string()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ match &self.state {
+ State::Completed(d) => WorkerStatus {
+ freeform: vec![format!("Last completed: {}", d)],
+ ..Default::default()
+ },
+ State::Running {
+ date,
+ counter,
+ objects_expired,
+ mpu_aborted,
+ ..
+ } => {
+ let n_objects = self
+ .garage
+ .object_table
+ .data
+ .store
+ .fast_len()
+ .unwrap_or(None);
+ let progress = match n_objects {
+ None => "...".to_string(),
+ Some(total) => format!(
+ "~{:.2}%",
+ 100. * std::cmp::min(*counter, total) as f32 / total as f32
+ ),
+ };
+ WorkerStatus {
+ progress: Some(progress),
+ freeform: vec![
+ format!("Started: {}", date),
+ format!("Objects expired: {}", objects_expired),
+ format!("Multipart uploads aborted: { }", mpu_aborted),
+ ],
+ ..Default::default()
+ }
+ }
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match &mut self.state {
+ State::Completed(_) => Ok(WorkerState::Idle),
+ State::Running {
+ date,
+ counter,
+ objects_expired,
+ mpu_aborted,
+ pos,
+ last_bucket,
+ } => {
+ // Process a batch of 100 items before yielding to bg task scheduler
+ for _ in 0..100 {
+ let (object_bytes, next_pos) = match self
+ .garage
+ .object_table
+ .data
+ .store
+ .get_gt(&pos)?
+ {
+ None => {
+ info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
+ self.persister
+ .set_with(|x| x.last_completed = Some(date.to_string()))?;
+ self.state = State::Completed(*date);
+ return Ok(WorkerState::Idle);
+ }
+ Some((k, v)) => (v, k),
+ };
+
+ let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
+ let skip = process_object(
+ &self.garage,
+ *date,
+ &object,
+ objects_expired,
+ mpu_aborted,
+ last_bucket,
+ )
+ .await?;
+
+ *counter += 1;
+ if skip == Skip::SkipBucket {
+ let bucket_id_len = object.bucket_id.as_slice().len();
+ assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
+ *pos = std::cmp::max(
+ next_pos,
+ [&pos[..bucket_id_len], &[0xFFu8][..]].concat(),
+ );
+ } else {
+ *pos = next_pos;
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ match &self.state {
+ State::Completed(d) => {
+ let next_day = d.succ_opt().expect("no next day");
+ let next_start = midnight_ts(next_day);
+ loop {
+ let now = now_msec();
+ if now < next_start {
+ tokio::time::sleep_until(
+ (Instant::now() + Duration::from_millis(next_start - now)).into(),
+ )
+ .await;
+ } else {
+ break;
+ }
+ }
+ self.state = State::start(std::cmp::max(next_day, today()));
+ }
+ State::Running { .. } => (),
+ }
+ WorkerState::Busy
+ }
+}
+
+async fn process_object(
+ garage: &Arc<Garage>,
+ now_date: NaiveDate,
+ object: &Object,
+ objects_expired: &mut usize,
+ mpu_aborted: &mut usize,
+ last_bucket: &mut Option<Bucket>,
+) -> Result<Skip, Error> {
+ if !object
+ .versions()
+ .iter()
+ .any(|x| x.is_data() || x.is_uploading(None))
+ {
+ return Ok(Skip::NextObject);
+ }
+
+ let bucket = match last_bucket.take() {
+ Some(b) if b.id == object.bucket_id => b,
+ _ => {
+ match garage
+ .bucket_table
+ .get(&EmptyKey, &object.bucket_id)
+ .await?
+ {
+ Some(b) => b,
+ None => {
+ warn!(
+ "Lifecycle worker: object in non-existent bucket {:?}",
+ object.bucket_id
+ );
+ return Ok(Skip::SkipBucket);
+ }
+ }
+ }
+ };
+
+ let lifecycle_policy: &[LifecycleRule] = bucket
+ .state
+ .as_option()
+ .and_then(|s| s.lifecycle_config.get().as_deref())
+ .unwrap_or_default();
+
+ if lifecycle_policy.iter().all(|x| !x.enabled) {
+ return Ok(Skip::SkipBucket);
+ }
+
+ let db = garage.object_table.data.store.db();
+
+ for rule in lifecycle_policy.iter() {
+ if !rule.enabled {
+ continue;
+ }
+
+ if let Some(pfx) = &rule.filter.prefix {
+ if !object.key.starts_with(pfx) {
+ continue;
+ }
+ }
+
+ if let Some(expire) = &rule.expiration {
+ if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) {
+ let version_date = next_date(current_version.timestamp);
+
+ let current_version_data = match &current_version.state {
+ ObjectVersionState::Complete(c) => c,
+ _ => unreachable!(),
+ };
+
+ let size_match = check_size_filter(current_version_data, &rule.filter);
+ let date_match = match expire {
+ LifecycleExpiration::AfterDays(n_days) => {
+ (now_date - version_date) >= chrono::Duration::days(*n_days as i64)
+ }
+ LifecycleExpiration::AtDate(exp_date) => {
+ if let Ok(exp_date) = parse_lifecycle_date(exp_date) {
+ now_date >= exp_date
+ } else {
+ warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
+ false
+ }
+ }
+ };
+
+ if size_match && date_match {
+ // Delete expired version
+ let deleted_object = Object::new(
+ object.bucket_id,
+ object.key.clone(),
+ vec![ObjectVersion {
+ uuid: gen_uuid(),
+ timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1),
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ info!(
+ "Lifecycle: expiring 1 object in bucket {:?}",
+ object.bucket_id
+ );
+ db.transaction(|mut tx| {
+ garage.object_table.queue_insert(&mut tx, &deleted_object)
+ })?;
+ *objects_expired += 1;
+ }
+ }
+ }
+
+ if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days {
+ let aborted_versions = object
+ .versions()
+ .iter()
+ .filter_map(|v| {
+ let version_date = next_date(v.timestamp);
+ if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64)
+ && matches!(&v.state, ObjectVersionState::Uploading { .. })
+ {
+ Some(ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ ..*v
+ })
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+ if !aborted_versions.is_empty() {
+ // Insert aborted mpu info
+ let n_aborted = aborted_versions.len();
+ info!(
+ "Lifecycle: aborting {} incomplete upload(s) in bucket {:?}",
+ n_aborted, object.bucket_id
+ );
+ let aborted_object =
+ Object::new(object.bucket_id, object.key.clone(), aborted_versions);
+ db.transaction(|mut tx| {
+ garage.object_table.queue_insert(&mut tx, &aborted_object)
+ })?;
+ *mpu_aborted += n_aborted;
+ }
+ }
+ }
+
+ *last_bucket = Some(bucket);
+ Ok(Skip::NextObject)
+}
+
+fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {
+ let size = match version_data {
+ ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size,
+ _ => unreachable!(),
+ };
+ if let Some(size_gt) = filter.size_gt {
+ if !(size > size_gt) {
+ return false;
+ }
+ }
+ if let Some(size_lt) = filter.size_lt {
+ if !(size < size_lt) {
+ return false;
+ }
+ }
+ true
+}
+
+fn midnight_ts(date: NaiveDate) -> u64 {
+ date.and_hms_opt(0, 0, 0)
+ .expect("midnight does not exist")
+ .timestamp_millis() as u64
+}
+
+fn next_date(ts: u64) -> NaiveDate {
+ NaiveDateTime::from_timestamp_millis(ts as i64)
+ .expect("bad timestamp")
+ .date()
+ .succ_opt()
+ .expect("no next day")
+}
+
+fn today() -> NaiveDate {
+ Utc::now().naive_utc().date()
+}
diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs
index 36d67093..5c776fb0 100644
--- a/src/model/s3/mod.rs
+++ b/src/model/s3/mod.rs
@@ -2,3 +2,5 @@ pub mod block_ref_table;
pub mod mpu_table;
pub mod object_table;
pub mod version_table;
+
+pub mod lifecycle_worker;