aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/bucket_table.rs57
-rw-r--r--src/model/garage.rs89
-rw-r--r--src/model/helper/bucket.rs10
-rw-r--r--src/model/key_table.rs14
-rw-r--r--src/model/migrate.rs1
-rw-r--r--src/model/s3/lifecycle_worker.rs414
-rw-r--r--src/model/s3/mod.rs3
-rw-r--r--src/model/s3/mpu_table.rs254
-rw-r--r--src/model/s3/object_table.rs172
-rw-r--r--src/model/s3/version_table.rs95
11 files changed, 1060 insertions, 52 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 1d3600a6..bb3e2b11 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -23,6 +23,7 @@ garage_util.workspace = true
async-trait = "0.1.7"
arc-swap = "1.0"
blake2 = "0.10"
+chrono = "0.4"
err-derive = "0.3"
hex = "0.4"
base64 = "0.21"
@@ -41,7 +42,7 @@ opentelemetry = "0.17"
netapp = "0.5"
[features]
-default = [ "sled" ]
+default = [ "sled", "lmdb", "sqlite" ]
k2v = [ "garage_util/k2v" ]
lmdb = [ "garage_db/lmdb" ]
sled = [ "garage_db/sled" ]
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index ac163736..4c48a76f 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -48,6 +48,9 @@ mod v08 {
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Lifecycle configuration
+ #[serde(default)]
+ pub lifecycle_config: crdt::Lww<Option<Vec<LifecycleRule>>>,
/// Bucket quotas
#[serde(default)]
pub quotas: crdt::Lww<BucketQuotas>,
@@ -69,6 +72,42 @@ mod v08 {
pub expose_headers: Vec<String>,
}
+ /// Lifecycle configuration rule
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct LifecycleRule {
+ /// The ID of the rule
+ pub id: Option<String>,
+ /// Whether the rule is active
+ pub enabled: bool,
+ /// The filter to check whether rule applies to a given object
+ pub filter: LifecycleFilter,
+ /// Number of days after which incomplete multipart uploads are aborted
+ pub abort_incomplete_mpu_days: Option<usize>,
+ /// Expiration policy for stored objects
+ pub expiration: Option<LifecycleExpiration>,
+ }
+
+ /// A lifecycle filter is a set of conditions that must all be true.
+ /// For each condition, if it is None, it is not verified (always true),
+ /// and if it is Some(x), then it is verified for value x
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)]
+ pub struct LifecycleFilter {
+ /// If Some(x), object key has to start with prefix x
+ pub prefix: Option<String>,
+ /// If Some(x), object size has to be more than x
+ pub size_gt: Option<u64>,
+ /// If Some(x), object size has to be less than x
+ pub size_lt: Option<u64>,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum LifecycleExpiration {
+ /// Objects expire x days after they were created
+ AfterDays(usize),
+ /// Objects expire at date x (must be in yyyy-mm-dd format)
+ AtDate(String),
+ }
+
#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct BucketQuotas {
/// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
@@ -88,7 +127,7 @@ impl AutoCrdt for BucketQuotas {
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
- pub fn new() -> Self {
+ fn new() -> Self {
BucketParams {
creation_date: now_msec(),
authorized_keys: crdt::Map::new(),
@@ -96,6 +135,7 @@ impl BucketParams {
local_aliases: crdt::LwwMap::new(),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
+ lifecycle_config: crdt::Lww::new(None),
quotas: crdt::Lww::new(BucketQuotas::default()),
}
}
@@ -111,10 +151,25 @@ impl Crdt for BucketParams {
self.website_config.merge(&o.website_config);
self.cors_config.merge(&o.cors_config);
+ self.lifecycle_config.merge(&o.lifecycle_config);
self.quotas.merge(&o.quotas);
}
}
+pub fn parse_lifecycle_date(date: &str) -> Result<chrono::NaiveDate, &'static str> {
+ use chrono::prelude::*;
+
+ if let Ok(datetime) = NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%SZ") {
+ if datetime.time() == NaiveTime::MIN {
+ Ok(datetime.date())
+ } else {
+ Err("date must be at midnight")
+ }
+ } else {
+ NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| "date has invalid format")
+ }
+}
+
impl Default for Bucket {
fn default() -> Self {
Self::new()
diff --git a/src/model/garage.rs b/src/model/garage.rs
index a432aa7a..8c9a3af3 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -7,6 +7,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
+use garage_util::persister::PersisterShared;
use garage_rpc::replication_mode::ReplicationMode;
use garage_rpc::system::System;
@@ -17,6 +18,8 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::s3::block_ref_table::*;
+use crate::s3::lifecycle_worker;
+use crate::s3::mpu_table::*;
use crate::s3::object_table::*;
use crate::s3::version_table::*;
@@ -57,11 +60,18 @@ pub struct Garage {
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
/// Counting table containing object counters
pub object_counter_table: Arc<IndexCounter<Object>>,
+ /// Table containing S3 multipart uploads
+ pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>,
+ /// Counting table containing multipart object counters
+ pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ /// Persister for lifecycle worker info
+ pub lifecycle_persister: PersisterShared<lifecycle_worker::LifecycleWorkerPersisted>,
+
#[cfg(feature = "k2v")]
pub k2v: GarageK2V,
}
@@ -82,8 +92,22 @@ impl Garage {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
- std::fs::create_dir_all(&config.data_dir)
- .ok_or_message("Unable to create Garage data directory")?;
+ match &config.data_dir {
+ DataDirEnum::Single(data_dir) => {
+ std::fs::create_dir_all(data_dir).ok_or_message(format!(
+ "Unable to create Garage data directory: {}",
+ data_dir.to_string_lossy()
+ ))?;
+ }
+ DataDirEnum::Multiple(data_dirs) => {
+ for dir in data_dirs {
+ std::fs::create_dir_all(&dir.path).ok_or_message(format!(
+ "Unable to create Garage data directory: {}",
+ dir.path.to_string_lossy()
+ ))?;
+ }
+ }
+ }
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
@@ -91,6 +115,11 @@ impl Garage {
// ---- Sled DB ----
#[cfg(feature = "sled")]
"sled" => {
+ if config.metadata_fsync {
+ return Err(Error::Message(format!(
+ "`metadata_fsync = true` is not supported with the Sled database engine"
+ )));
+ }
db_path.push("db");
info!("Opening Sled database at: {}", db_path.display());
let db = db::sled_adapter::sled::Config::default()
@@ -109,6 +138,15 @@ impl Garage {
db_path.push("db.sqlite");
info!("Opening Sqlite database at: {}", db_path.display());
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .and_then(|db| {
+ db.pragma_update(None, "journal_mode", &"WAL")?;
+ if config.metadata_fsync {
+ db.pragma_update(None, "synchronous", &"NORMAL")?;
+ } else {
+ db.pragma_update(None, "synchronous", &"OFF")?;
+ }
+ Ok(db)
+ })
.ok_or_message("Unable to open sqlite DB")?;
db::sqlite_adapter::SqliteDb::init(db)
}
@@ -136,8 +174,10 @@ impl Garage {
env_builder.max_readers(500);
env_builder.map_size(map_size);
unsafe {
- env_builder.flag(heed::flags::Flags::MdbNoSync);
env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
+ if !config.metadata_fsync {
+ env_builder.flag(heed::flags::Flags::MdbNoSync);
+ }
}
let db = match env_builder.open(&db_path) {
Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
@@ -182,6 +222,9 @@ impl Garage {
let replication_mode = ReplicationMode::parse(&config.replication_mode)
.ok_or_message("Invalid replication_mode in config file.")?;
+ info!("Initialize background variable system...");
+ let mut bg_vars = vars::BgVars::new();
+
info!("Initialize membership management system...");
let system = System::new(network_key, replication_mode, &config)?;
@@ -208,10 +251,12 @@ impl Garage {
let block_manager = BlockManager::new(
&db,
config.data_dir.clone(),
+ config.data_fsync,
config.compression_level,
data_rep_param,
system.clone(),
- );
+ )?;
+ block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----
info!("Initialize bucket_table...");
@@ -248,6 +293,20 @@ impl Garage {
&db,
);
+ info!("Initialize multipart upload counter table...");
+ let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
+ info!("Initialize multipart upload table...");
+ let mpu_table = Table::new(
+ MultipartUploadTable {
+ version_table: version_table.clone(),
+ mpu_counter_table: mpu_counter_table.clone(),
+ },
+ meta_rep_param.clone(),
+ system.clone(),
+ &db,
+ );
+
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
@@ -256,6 +315,7 @@ impl Garage {
let object_table = Table::new(
ObjectTable {
version_table: version_table.clone(),
+ mpu_table: mpu_table.clone(),
object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
@@ -263,14 +323,15 @@ impl Garage {
&db,
);
+ info!("Load lifecycle worker state...");
+ let lifecycle_persister =
+ PersisterShared::new(&system.metadata_dir, "lifecycle_worker_state");
+ lifecycle_worker::register_bg_vars(&lifecycle_persister, &mut bg_vars);
+
// ---- K2V ----
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- // Initialize bg vars
- let mut bg_vars = vars::BgVars::new();
- block_manager.register_bg_vars(&mut bg_vars);
-
// -- done --
Ok(Arc::new(Self {
config,
@@ -284,14 +345,17 @@ impl Garage {
key_table,
object_table,
object_counter_table,
+ mpu_table,
+ mpu_counter_table,
version_table,
block_ref_table,
+ lifecycle_persister,
#[cfg(feature = "k2v")]
k2v,
}))
}
- pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
self.block_manager.spawn_workers(bg);
self.bucket_table.spawn_workers(bg);
@@ -300,9 +364,16 @@ impl Garage {
self.object_table.spawn_workers(bg);
self.object_counter_table.spawn_workers(bg);
+ self.mpu_table.spawn_workers(bg);
+ self.mpu_counter_table.spawn_workers(bg);
self.version_table.spawn_workers(bg);
self.block_ref_table.spawn_workers(bg);
+ bg.spawn_worker(lifecycle_worker::LifecycleWorker::new(
+ self.clone(),
+ self.lifecycle_persister.clone(),
+ ));
+
#[cfg(feature = "k2v")]
self.k2v.spawn_workers(bg);
}
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 4a488d7f..576d03f3 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -478,7 +478,9 @@ impl<'a> BucketHelper<'a> {
// ----
/// Deletes all incomplete multipart uploads that are older than a certain time.
- /// Returns the number of uploads aborted
+ /// Returns the number of uploads aborted.
+ /// This will also include non-multipart uploads, which may be lingering
+ /// after a node crash
pub async fn cleanup_incomplete_uploads(
&self,
bucket_id: &Uuid,
@@ -496,7 +498,9 @@ impl<'a> BucketHelper<'a> {
.get_range(
bucket_id,
start,
- Some(ObjectFilter::IsUploading),
+ Some(ObjectFilter::IsUploading {
+ check_multipart: None,
+ }),
1000,
EnumerationOrder::Forward,
)
@@ -508,7 +512,7 @@ impl<'a> BucketHelper<'a> {
let aborted_versions = object
.versions()
.iter()
- .filter(|v| v.is_uploading() && v.timestamp < older_than)
+ .filter(|v| v.is_uploading(None) && v.timestamp < older_than)
.map(|v| ObjectVersion {
state: ObjectVersionState::Aborted,
uuid: v.uuid,
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index bb5334a3..a9762f1b 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -149,11 +149,19 @@ impl Key {
}
/// Import a key from it's parts
- pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
- Self {
+ pub fn import(key_id: &str, secret_key: &str, name: &str) -> Result<Self, &'static str> {
+ if key_id.len() != 26 || &key_id[..2] != "GK" || hex::decode(&key_id[2..]).is_err() {
+ return Err("The specified key ID is not a valid Garage key ID (starts with `GK`, followed by 12 hex-encoded bytes)");
+ }
+
+ if secret_key.len() != 64 || hex::decode(&secret_key).is_err() {
+ return Err("The specified secret key is not a valid Garage secret key (composed of 32 hex-encoded bytes)");
+ }
+
+ Ok(Self {
key_id: key_id.to_string(),
state: crdt::Deletable::present(KeyParams::new(secret_key, name)),
- }
+ })
}
/// Create a new Key which can me merged to mark an existing key deleted
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 6b4c3eed..4c74b43b 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -78,6 +78,7 @@ impl Migrate {
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
+ lifecycle_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
})
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
new file mode 100644
index 00000000..42e661eb
--- /dev/null
+++ b/src/model/s3/lifecycle_worker.rs
@@ -0,0 +1,414 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use chrono::prelude::*;
+use std::time::{Duration, Instant};
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::Error;
+use garage_util::persister::PersisterShared;
+use garage_util::time::*;
+
+use garage_table::EmptyKey;
+
+use crate::bucket_table::*;
+use crate::s3::object_table::*;
+
+use crate::garage::Garage;
+
+mod v090 {
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize, Default, Clone)]
+ pub struct LifecycleWorkerPersisted {
+ pub last_completed: Option<String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted {
+ const VERSION_MARKER: &'static [u8] = b"G09lwp";
+ }
+}
+
+pub use v090::*;
+
+pub struct LifecycleWorker {
+ garage: Arc<Garage>,
+
+ state: State,
+
+ persister: PersisterShared<LifecycleWorkerPersisted>,
+}
+
+enum State {
+ Completed(NaiveDate),
+ Running {
+ date: NaiveDate,
+ pos: Vec<u8>,
+ counter: usize,
+ objects_expired: usize,
+ mpu_aborted: usize,
+ last_bucket: Option<Bucket>,
+ },
+}
+
+#[derive(Clone, Copy, Eq, PartialEq)]
+enum Skip {
+ SkipBucket,
+ NextObject,
+}
+
+pub fn register_bg_vars(
+ persister: &PersisterShared<LifecycleWorkerPersisted>,
+ vars: &mut vars::BgVars,
+) {
+ vars.register_ro(persister, "lifecycle-last-completed", |p| {
+ p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string()))
+ });
+}
+
+impl LifecycleWorker {
+ pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
+ let today = today();
+ let last_completed = persister.get_with(|x| {
+ x.last_completed
+ .as_deref()
+ .and_then(|x| x.parse::<NaiveDate>().ok())
+ });
+ let state = match last_completed {
+ Some(d) if d >= today => State::Completed(d),
+ _ => State::start(today),
+ };
+ Self {
+ garage,
+ state,
+ persister,
+ }
+ }
+}
+
+impl State {
+ fn start(date: NaiveDate) -> Self {
+ info!("Starting lifecycle worker for {}", date);
+ State::Running {
+ date,
+ pos: vec![],
+ counter: 0,
+ objects_expired: 0,
+ mpu_aborted: 0,
+ last_bucket: None,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for LifecycleWorker {
+ fn name(&self) -> String {
+ "object lifecycle worker".to_string()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ match &self.state {
+ State::Completed(d) => WorkerStatus {
+ freeform: vec![format!("Last completed: {}", d)],
+ ..Default::default()
+ },
+ State::Running {
+ date,
+ counter,
+ objects_expired,
+ mpu_aborted,
+ ..
+ } => {
+ let n_objects = self
+ .garage
+ .object_table
+ .data
+ .store
+ .fast_len()
+ .unwrap_or(None);
+ let progress = match n_objects {
+ None => "...".to_string(),
+ Some(total) => format!(
+ "~{:.2}%",
+ 100. * std::cmp::min(*counter, total) as f32 / total as f32
+ ),
+ };
+ WorkerStatus {
+ progress: Some(progress),
+ freeform: vec![
+ format!("Started: {}", date),
+ format!("Objects expired: {}", objects_expired),
+ format!("Multipart uploads aborted: { }", mpu_aborted),
+ ],
+ ..Default::default()
+ }
+ }
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match &mut self.state {
+ State::Completed(_) => Ok(WorkerState::Idle),
+ State::Running {
+ date,
+ counter,
+ objects_expired,
+ mpu_aborted,
+ pos,
+ last_bucket,
+ } => {
+ // Process a batch of 100 items before yielding to bg task scheduler
+ for _ in 0..100 {
+ let (object_bytes, next_pos) = match self
+ .garage
+ .object_table
+ .data
+ .store
+ .get_gt(&pos)?
+ {
+ None => {
+ info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
+ self.persister
+ .set_with(|x| x.last_completed = Some(date.to_string()))?;
+ self.state = State::Completed(*date);
+ return Ok(WorkerState::Idle);
+ }
+ Some((k, v)) => (v, k),
+ };
+
+ let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
+ let skip = process_object(
+ &self.garage,
+ *date,
+ &object,
+ objects_expired,
+ mpu_aborted,
+ last_bucket,
+ )
+ .await?;
+
+ *counter += 1;
+ if skip == Skip::SkipBucket {
+ let bucket_id_len = object.bucket_id.as_slice().len();
+ assert_eq!(
+ next_pos.get(..bucket_id_len),
+ Some(object.bucket_id.as_slice())
+ );
+ let last_bucket_pos = [&next_pos[..bucket_id_len], &[0xFFu8][..]].concat();
+ *pos = std::cmp::max(next_pos, last_bucket_pos);
+ } else {
+ *pos = next_pos;
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ match &self.state {
+ State::Completed(d) => {
+ let next_day = d.succ_opt().expect("no next day");
+ let next_start = midnight_ts(next_day);
+ loop {
+ let now = now_msec();
+ if now < next_start {
+ tokio::time::sleep_until(
+ (Instant::now() + Duration::from_millis(next_start - now)).into(),
+ )
+ .await;
+ } else {
+ break;
+ }
+ }
+ self.state = State::start(std::cmp::max(next_day, today()));
+ }
+ State::Running { .. } => (),
+ }
+ WorkerState::Busy
+ }
+}
+
+async fn process_object(
+ garage: &Arc<Garage>,
+ now_date: NaiveDate,
+ object: &Object,
+ objects_expired: &mut usize,
+ mpu_aborted: &mut usize,
+ last_bucket: &mut Option<Bucket>,
+) -> Result<Skip, Error> {
+ if !object
+ .versions()
+ .iter()
+ .any(|x| x.is_data() || x.is_uploading(None))
+ {
+ return Ok(Skip::NextObject);
+ }
+
+ let bucket = match last_bucket.take() {
+ Some(b) if b.id == object.bucket_id => b,
+ _ => {
+ match garage
+ .bucket_table
+ .get(&EmptyKey, &object.bucket_id)
+ .await?
+ {
+ Some(b) => b,
+ None => {
+ warn!(
+ "Lifecycle worker: object in non-existent bucket {:?}",
+ object.bucket_id
+ );
+ return Ok(Skip::SkipBucket);
+ }
+ }
+ }
+ };
+
+ let lifecycle_policy: &[LifecycleRule] = bucket
+ .state
+ .as_option()
+ .and_then(|s| s.lifecycle_config.get().as_deref())
+ .unwrap_or_default();
+
+ if lifecycle_policy.iter().all(|x| !x.enabled) {
+ return Ok(Skip::SkipBucket);
+ }
+
+ let db = garage.object_table.data.store.db();
+
+ for rule in lifecycle_policy.iter() {
+ if !rule.enabled {
+ continue;
+ }
+
+ if let Some(pfx) = &rule.filter.prefix {
+ if !object.key.starts_with(pfx) {
+ continue;
+ }
+ }
+
+ if let Some(expire) = &rule.expiration {
+ if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) {
+ let version_date = next_date(current_version.timestamp);
+
+ let current_version_data = match &current_version.state {
+ ObjectVersionState::Complete(c) => c,
+ _ => unreachable!(),
+ };
+
+ let size_match = check_size_filter(current_version_data, &rule.filter);
+ let date_match = match expire {
+ LifecycleExpiration::AfterDays(n_days) => {
+ (now_date - version_date) >= chrono::Duration::days(*n_days as i64)
+ }
+ LifecycleExpiration::AtDate(exp_date) => {
+ if let Ok(exp_date) = parse_lifecycle_date(exp_date) {
+ now_date >= exp_date
+ } else {
+ warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
+ false
+ }
+ }
+ };
+
+ if size_match && date_match {
+ // Delete expired version
+ let deleted_object = Object::new(
+ object.bucket_id,
+ object.key.clone(),
+ vec![ObjectVersion {
+ uuid: gen_uuid(),
+ timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1),
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ info!(
+ "Lifecycle: expiring 1 object in bucket {:?}",
+ object.bucket_id
+ );
+ db.transaction(|mut tx| {
+ garage.object_table.queue_insert(&mut tx, &deleted_object)
+ })?;
+ *objects_expired += 1;
+ }
+ }
+ }
+
+ if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days {
+ let aborted_versions = object
+ .versions()
+ .iter()
+ .filter_map(|v| {
+ let version_date = next_date(v.timestamp);
+ if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64)
+ && matches!(&v.state, ObjectVersionState::Uploading { .. })
+ {
+ Some(ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ ..*v
+ })
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+ if !aborted_versions.is_empty() {
+ // Insert aborted mpu info
+ let n_aborted = aborted_versions.len();
+ info!(
+ "Lifecycle: aborting {} incomplete upload(s) in bucket {:?}",
+ n_aborted, object.bucket_id
+ );
+ let aborted_object =
+ Object::new(object.bucket_id, object.key.clone(), aborted_versions);
+ db.transaction(|mut tx| {
+ garage.object_table.queue_insert(&mut tx, &aborted_object)
+ })?;
+ *mpu_aborted += n_aborted;
+ }
+ }
+ }
+
+ *last_bucket = Some(bucket);
+ Ok(Skip::NextObject)
+}
+
+fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {
+ let size = match version_data {
+ ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size,
+ _ => unreachable!(),
+ };
+ if let Some(size_gt) = filter.size_gt {
+ if !(size > size_gt) {
+ return false;
+ }
+ }
+ if let Some(size_lt) = filter.size_lt {
+ if !(size < size_lt) {
+ return false;
+ }
+ }
+ true
+}
+
+fn midnight_ts(date: NaiveDate) -> u64 {
+ date.and_hms_opt(0, 0, 0)
+ .expect("midnight does not exist")
+ .timestamp_millis() as u64
+}
+
+fn next_date(ts: u64) -> NaiveDate {
+ NaiveDateTime::from_timestamp_millis(ts as i64)
+ .expect("bad timestamp")
+ .date()
+ .succ_opt()
+ .expect("no next day")
+}
+
+fn today() -> NaiveDate {
+ Utc::now().naive_utc().date()
+}
diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs
index 4e94337d..5c776fb0 100644
--- a/src/model/s3/mod.rs
+++ b/src/model/s3/mod.rs
@@ -1,3 +1,6 @@
pub mod block_ref_table;
+pub mod mpu_table;
pub mod object_table;
pub mod version_table;
+
+pub mod lifecycle_worker;
diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs
new file mode 100644
index 00000000..238cbf11
--- /dev/null
+++ b/src/model/s3/mpu_table.rs
@@ -0,0 +1,254 @@
+use std::sync::Arc;
+
+use garage_db as db;
+
+use garage_util::crdt::Crdt;
+use garage_util::data::*;
+use garage_util::time::*;
+
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::index_counter::*;
+use crate::s3::version_table::*;
+
+pub const UPLOADS: &str = "uploads";
+pub const PARTS: &str = "parts";
+pub const BYTES: &str = "bytes";
+
+mod v09 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// A part of a multipart upload
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct MultipartUpload {
+ /// Partition key = Upload id = UUID of the object version
+ pub upload_id: Uuid,
+
+ /// The timestamp at which the multipart upload was created
+ pub timestamp: u64,
+ /// Is this multipart upload deleted
+ /// The MultipartUpload is marked as deleted as soon as the
+ /// multipart upload is either completed or aborted
+ pub deleted: crdt::Bool,
+ /// List of uploaded parts, key = (part number, timestamp)
+ /// In case of retries, all versions for each part are kept
+ /// Everything is cleaned up only once the MultipartUpload is marked deleted
+ pub parts: crdt::Map<MpuPartKey, MpuPart>,
+
+ // Back link to bucket+key so that we can find the object this mpu
+ // belongs to and check whether it is still valid
+ /// Bucket in which the related object is stored
+ pub bucket_id: Uuid,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct MpuPartKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Timestamp of part upload
+ pub timestamp: u64,
+ }
+
+ /// The version of an uploaded part
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct MpuPart {
+ /// Links to a Version in VersionTable
+ pub version: Uuid,
+ /// ETag of the content of this part (known only once done uploading)
+ pub etag: Option<String>,
+ /// Size of this part (known only once done uploading)
+ pub size: Option<u64>,
+ }
+
+ impl garage_util::migrate::InitialFormat for MultipartUpload {
+ const VERSION_MARKER: &'static [u8] = b"G09s3mpu";
+ }
+}
+
+pub use v09::*;
+
+impl Ord for MpuPartKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.timestamp.cmp(&other.timestamp))
+ }
+}
+
+impl PartialOrd for MpuPartKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl MultipartUpload {
+ pub fn new(
+ upload_id: Uuid,
+ timestamp: u64,
+ bucket_id: Uuid,
+ key: String,
+ deleted: bool,
+ ) -> Self {
+ Self {
+ upload_id,
+ timestamp,
+ deleted: crdt::Bool::new(deleted),
+ parts: crdt::Map::new(),
+ bucket_id,
+ key,
+ }
+ }
+
+ pub fn next_timestamp(&self, part_number: u64) -> u64 {
+ std::cmp::max(
+ now_msec(),
+ 1 + self
+ .parts
+ .items()
+ .iter()
+ .filter(|(x, _)| x.part_number == part_number)
+ .map(|(x, _)| x.timestamp)
+ .max()
+ .unwrap_or(0),
+ )
+ }
+}
+
+impl Entry<Uuid, EmptyKey> for MultipartUpload {
+ fn partition_key(&self) -> &Uuid {
+ &self.upload_id
+ }
+ fn sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn is_tombstone(&self) -> bool {
+ self.deleted.get()
+ }
+}
+
+impl Crdt for MultipartUpload {
+ fn merge(&mut self, other: &Self) {
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
+ self.parts.clear();
+ } else {
+ self.parts.merge(&other.parts);
+ }
+ }
+}
+
+impl Crdt for MpuPart {
+ fn merge(&mut self, other: &Self) {
+ self.etag = match (self.etag.take(), &other.etag) {
+ (None, Some(_)) => other.etag.clone(),
+ (Some(x), Some(y)) if x < *y => other.etag.clone(),
+ (x, _) => x,
+ };
+ self.size = match (self.size, other.size) {
+ (None, Some(_)) => other.size,
+ (Some(x), Some(y)) if x < y => other.size,
+ (x, _) => x,
+ };
+ }
+}
+
+pub struct MultipartUploadTable {
+ pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>,
+}
+
+impl TableSchema for MultipartUploadTable {
+ const TABLE_NAME: &'static str = "multipart_upload";
+
+ type P = Uuid;
+ type S = EmptyKey;
+ type E = MultipartUpload;
+ type Filter = DeletedFilter;
+
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.mpu_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update multipart object part counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Propagate deletions to version table
+ if let (Some(old_mpu), Some(new_mpu)) = (old, new) {
+ if new_mpu.deleted.get() && !old_mpu.deleted.get() {
+ let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| {
+ Version::new(
+ p.version,
+ VersionBacklink::MultipartUpload {
+ upload_id: old_mpu.upload_id,
+ },
+ true,
+ )
+ });
+ for version in deleted_versions {
+ let res = self.version_table.queue_insert(tx, &version);
+ if let Err(e) = db::unabort(res)? {
+ error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e);
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ filter.apply(entry.is_tombstone())
+ }
+}
+
+impl CountedItem for MultipartUpload {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_counter";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = nothing
+ type CS = EmptyKey;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+ fn counter_sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let uploads = if self.deleted.get() { 0 } else { 1 };
+ let mut parts = self
+ .parts
+ .items()
+ .iter()
+ .map(|(k, _)| k.part_number)
+ .collect::<Vec<_>>();
+ parts.dedup();
+ let bytes = self
+ .parts
+ .items()
+ .iter()
+ .map(|(_, p)| p.size.unwrap_or(0))
+ .sum::<u64>();
+ vec![
+ (UPLOADS, uploads),
+ (PARTS, parts.len() as i64),
+ (BYTES, bytes as i64),
+ ]
+ }
+}
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 518acc95..ebea04bd 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::index_counter::*;
+use crate::s3::mpu_table::*;
use crate::s3::version_table::*;
pub const OBJECTS: &str = "objects";
@@ -130,7 +131,86 @@ mod v08 {
}
}
-pub use v08::*;
+mod v09 {
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v08;
+
+ pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta};
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket_id: Uuid,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
+
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading {
+ /// Indicates whether this is a multipart upload
+ multipart: bool,
+ /// Headers to be included in the final object
+ headers: ObjectVersionHeaders,
+ },
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
+ impl garage_util::migrate::Migrate for Object {
+ const VERSION_MARKER: &'static [u8] = b"G09s3o";
+
+ type Previous = v08::Object;
+
+ fn migrate(old: v08::Object) -> Object {
+ let versions = old
+ .versions
+ .into_iter()
+ .map(|x| ObjectVersion {
+ uuid: x.uuid,
+ timestamp: x.timestamp,
+ state: match x.state {
+ v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading {
+ multipart: false,
+ headers: h,
+ },
+ v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d),
+ v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
+ },
+ })
+ .collect();
+ Object {
+ bucket_id: old.bucket_id,
+ key: old.key,
+ versions,
+ }
+ }
+ }
+}
+
+pub use v09::*;
impl Object {
/// Initialize an Object struct from parts
@@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState {
Complete(a) => {
a.merge(b);
}
- Uploading(_) => {
+ Uploading { .. } => {
*self = Complete(b.clone());
}
},
- Uploading(_) => {}
+ Uploading { .. } => {}
}
}
}
@@ -199,8 +279,17 @@ impl ObjectVersion {
}
/// Is the object version currently being uploaded
- pub fn is_uploading(&self) -> bool {
- matches!(self.state, ObjectVersionState::Uploading(_))
+ ///
+ /// matches only multipart uploads if check_multipart is Some(true)
+ /// matches only non-multipart uploads if check_multipart is Some(false)
+ /// matches both if check_multipart is None
+ pub fn is_uploading(&self, check_multipart: Option<bool>) -> bool {
+ match &self.state {
+ ObjectVersionState::Uploading { multipart, .. } => {
+ check_multipart.map(|x| x == *multipart).unwrap_or(true)
+ }
+ _ => false,
+ }
}
/// Is the object version completely received
@@ -267,13 +356,20 @@ impl Crdt for Object {
pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ObjectFilter {
+ /// Is the object version available (received and not a tombstone)
IsData,
- IsUploading,
+ /// Is the object version currently being uploaded
+ ///
+ /// matches only multipart uploads if check_multipart is Some(true)
+ /// matches only non-multipart uploads if check_multipart is Some(false)
+ /// matches both if check_multipart is None
+ IsUploading { check_multipart: Option<bool> },
}
impl TableSchema for ObjectTable {
@@ -301,21 +397,28 @@ impl TableSchema for ObjectTable {
// 2. Enqueue propagation deletions to version table
if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of old versions
for v in old_v.versions.iter() {
- let newly_deleted = match new_v
+ let new_v_id = new_v
.versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- {
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()));
+
+ // Propagate deletion of old versions to the Version table
+ let delete_version = match new_v_id {
Err(_) => true,
Ok(i) => {
new_v.versions[i].state == ObjectVersionState::Aborted
&& v.state != ObjectVersionState::Aborted
}
};
- if newly_deleted {
- let deleted_version =
- Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ if delete_version {
+ let deleted_version = Version::new(
+ v.uuid,
+ VersionBacklink::Object {
+ bucket_id: old_v.bucket_id,
+ key: old_v.key.clone(),
+ },
+ true,
+ );
let res = self.version_table.queue_insert(tx, &deleted_version);
if let Err(e) = db::unabort(res)? {
error!(
@@ -324,6 +427,39 @@ impl TableSchema for ObjectTable {
);
}
}
+
+ // After abortion or completion of multipart uploads, delete MPU table entry
+ if matches!(
+ v.state,
+ ObjectVersionState::Uploading {
+ multipart: true,
+ ..
+ }
+ ) {
+ let delete_mpu = match new_v_id {
+ Err(_) => true,
+ Ok(i) => !matches!(
+ new_v.versions[i].state,
+ ObjectVersionState::Uploading { .. }
+ ),
+ };
+ if delete_mpu {
+ let deleted_mpu = MultipartUpload::new(
+ v.uuid,
+ v.timestamp,
+ old_v.bucket_id,
+ old_v.key.clone(),
+ true,
+ );
+ let res = self.mpu_table.queue_insert(tx, &deleted_mpu);
+ if let Err(e) = db::unabort(res)? {
+ error!(
+ "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.",
+ e
+ );
+ }
+ }
+ }
}
}
@@ -333,7 +469,10 @@ impl TableSchema for ObjectTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
match filter {
ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()),
- ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
+ ObjectFilter::IsUploading { check_multipart } => entry
+ .versions
+ .iter()
+ .any(|v| v.is_uploading(*check_multipart)),
}
}
}
@@ -360,10 +499,7 @@ impl CountedItem for Object {
} else {
0
};
- let n_unfinished_uploads = versions
- .iter()
- .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
- .count();
+ let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count();
let n_bytes = versions
.iter()
.map(|v| match &v.state {
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 6edc83f4..5c032f9f 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use garage_db as db;
use garage_util::data::*;
+use garage_util::error::*;
use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
@@ -66,6 +67,8 @@ mod v08 {
use super::v05;
+ pub use v05::{VersionBlock, VersionBlockKey};
+
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
@@ -90,8 +93,6 @@ mod v08 {
pub key: String,
}
- pub use v05::{VersionBlock, VersionBlockKey};
-
impl garage_util::migrate::Migrate for Version {
type Previous = v05::Version;
@@ -110,32 +111,94 @@ mod v08 {
}
}
-pub use v08::*;
+pub(crate) mod v09 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v08;
+
+ pub use v08::{VersionBlock, VersionBlockKey};
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+
+ // Back link to owner of this version (either an object or a multipart
+ // upload), used to find whether it has been deleted and this version
+ // should in turn be deleted (see versions repair procedure)
+ pub backlink: VersionBacklink,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum VersionBacklink {
+ Object {
+ /// Bucket in which the related object is stored
+ bucket_id: Uuid,
+ /// Key in which the related object is stored
+ key: String,
+ },
+ MultipartUpload {
+ upload_id: Uuid,
+ },
+ }
+
+ impl garage_util::migrate::Migrate for Version {
+ const VERSION_MARKER: &'static [u8] = b"G09s3v";
+
+ type Previous = v08::Version;
+
+ fn migrate(old: v08::Version) -> Version {
+ Version {
+ uuid: old.uuid,
+ deleted: old.deleted,
+ blocks: old.blocks,
+ backlink: VersionBacklink::Object {
+ bucket_id: old.bucket_id,
+ key: old.key,
+ },
+ }
+ }
+ }
+}
+
+pub use v09::*;
impl Version {
- pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
+ pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self {
Self {
uuid,
deleted: deleted.into(),
blocks: crdt::Map::new(),
- parts_etags: crdt::Map::new(),
- bucket_id,
- key,
+ backlink,
}
}
pub fn has_part_number(&self, part_number: u64) -> bool {
- let case1 = self
- .parts_etags
+ self.blocks
.items()
- .binary_search_by(|(k, _)| k.cmp(&part_number))
- .is_ok();
- let case2 = self
+ .binary_search_by(|(k, _)| k.part_number.cmp(&part_number))
+ .is_ok()
+ }
+
+ pub fn n_parts(&self) -> Result<u64, Error> {
+ Ok(self
.blocks
.items()
- .binary_search_by(|(k, _)| k.part_number.cmp(&part_number))
- .is_ok();
- case1 || case2
+ .last()
+ .ok_or_message("version has no parts")?
+ .0
+ .part_number)
}
}
@@ -175,10 +238,8 @@ impl Crdt for Version {
if self.deleted.get() {
self.blocks.clear();
- self.parts_etags.clear();
} else {
self.blocks.merge(&other.blocks);
- self.parts_etags.merge(&other.parts_etags);
}
}
}