From 19639705e6242861bd43a123456793e2d8f2c69a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 17 May 2023 14:30:53 +0200 Subject: Mark sled as deprecated, make lmdb default, and improve sqlite and lmdb defaults --- src/model/Cargo.toml | 2 +- src/model/garage.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 6dc954d4..c4291c32 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -41,7 +41,7 @@ opentelemetry = "0.17" netapp = "0.5" [features] -default = [ "sled" ] +default = [ "sled", "lmdb", "sqlite" ] k2v = [ "garage_util/k2v" ] lmdb = [ "garage_db/lmdb" ] sled = [ "garage_db/sled" ] diff --git a/src/model/garage.rs b/src/model/garage.rs index 3daa1b33..0fbcf334 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -109,6 +109,11 @@ impl Garage { db_path.push("db.sqlite"); info!("Opening Sqlite database at: {}", db_path.display()); let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .and_then(|db| { + db.pragma_update(None, "journal_mode", &"WAL")?; + db.pragma_update(None, "synchronous", &"NORMAL")?; + Ok(db) + }) .ok_or_message("Unable to open sqlite DB")?; db::sqlite_adapter::SqliteDb::init(db) } @@ -133,7 +138,6 @@ impl Garage { env_builder.max_readers(500); env_builder.map_size(map_size); unsafe { - env_builder.flag(heed::flags::Flags::MdbNoSync); env_builder.flag(heed::flags::Flags::MdbNoMetaSync); } let db = match env_builder.open(&db_path) { -- cgit v1.2.3 From e7e164a280dfc1c4adf9d6da6f3b2a9674eca4bd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Jun 2023 16:23:21 +0200 Subject: Make fsync an option for meta and data --- src/model/garage.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 0fbcf334..9b7121db 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -91,6 +91,11 @@ impl Garage { // ---- Sled DB ---- #[cfg(feature = "sled")] "sled" => { + if config.metadata_fsync { + return Err(Error::Message(format!( + "`metadata_fsync = true` is not supported with the Sled database engine" + ))); + } db_path.push("db"); info!("Opening Sled database at: {}", db_path.display()); let db = db::sled_adapter::sled::Config::default() @@ -111,7 +116,11 @@ impl Garage { let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) .and_then(|db| { db.pragma_update(None, "journal_mode", &"WAL")?; - db.pragma_update(None, "synchronous", &"NORMAL")?; + if config.metadata_fsync { + db.pragma_update(None, "synchronous", &"NORMAL")?; + } else { + db.pragma_update(None, "synchronous", &"OFF")?; + } Ok(db) }) .ok_or_message("Unable to open sqlite DB")?; @@ -139,6 +148,9 @@ impl Garage { env_builder.map_size(map_size); unsafe { env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + if !config.metadata_fsync { + env_builder.flag(heed::flags::Flags::MdbNoSync); + } } let db = match env_builder.open(&db_path) { Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => { @@ -208,6 +220,7 @@ impl Garage { let block_manager = BlockManager::new( &db, config.data_dir.clone(), + config.data_fsync, config.compression_level, data_rep_param, system.clone(), -- cgit v1.2.3 From 38d6ac429506f9f488ac522581b12fa530442a59 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 27 Apr 2023 17:57:54 +0200 Subject: New multipart upload table layout --- src/model/garage.rs | 22 ++++ src/model/helper/bucket.rs | 6 +- src/model/s3/mod.rs | 1 + src/model/s3/mpu_table.rs | 231 ++++++++++++++++++++++++++++++++++++++++++ src/model/s3/object_table.rs | 143 +++++++++++++++++++++++--- src/model/s3/version_table.rs | 85 ++++++++++++---- 6 files changed, 455 insertions(+), 33 deletions(-) create mode 100644 src/model/s3/mpu_table.rs (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 9b7121db..31da1715 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -17,6 +17,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::s3::block_ref_table::*; +use crate::s3::mpu_table::*; use crate::s3::object_table::*; use crate::s3::version_table::*; @@ -57,6 +58,10 @@ pub struct Garage { pub object_table: Arc>, /// Counting table containing object counters pub object_counter_table: Arc>, + /// Table containing S3 multipart uploads + pub mpu_table: Arc>, + /// Counting table containing multipart object counters + pub mpu_counter_table: Arc>, /// Table containing S3 object versions pub version_table: Arc>, /// Table containing S3 block references (not blocks themselves) @@ -261,6 +266,20 @@ impl Garage { &db, ); + info!("Initialize multipart upload counter table..."); + let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + + info!("Initialize multipart upload table..."); + let mpu_table = Table::new( + MultipartUploadTable { + version_table: version_table.clone(), + mpu_counter_table: mpu_counter_table.clone(), + }, + meta_rep_param.clone(), + system.clone(), + &db, + ); + info!("Initialize object counter table..."); let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); @@ -269,6 +288,7 @@ impl Garage { let object_table = Table::new( ObjectTable { version_table: version_table.clone(), + mpu_table: mpu_table.clone(), object_counter_table: object_counter_table.clone(), }, meta_rep_param.clone(), @@ -297,6 +317,8 @@ impl Garage { key_table, object_table, object_counter_table, + mpu_table, + mpu_counter_table, version_table, block_ref_table, #[cfg(feature = "k2v")] diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 4a488d7f..ff711d29 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -496,7 +496,9 @@ impl<'a> BucketHelper<'a> { .get_range( bucket_id, start, - Some(ObjectFilter::IsUploading), + Some(ObjectFilter::IsUploading { + check_multipart: None, + }), 1000, EnumerationOrder::Forward, ) @@ -508,7 +510,7 @@ impl<'a> BucketHelper<'a> { let aborted_versions = object .versions() .iter() - .filter(|v| v.is_uploading() && v.timestamp < older_than) + .filter(|v| v.is_uploading(None) && v.timestamp < older_than) .map(|v| ObjectVersion { state: ObjectVersionState::Aborted, uuid: v.uuid, diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 4e94337d..36d67093 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -1,3 +1,4 @@ pub mod block_ref_table; +pub mod mpu_table; pub mod object_table; pub mod version_table; diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs new file mode 100644 index 00000000..dc5b5a82 --- /dev/null +++ b/src/model/s3/mpu_table.rs @@ -0,0 +1,231 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::index_counter::*; +use crate::s3::version_table::*; + +pub const UPLOADS: &str = "uploads"; +pub const PARTS: &str = "parts"; +pub const BYTES: &str = "bytes"; + +mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + pub use crate::s3::version_table::v09::VersionBlock; + + /// A part of a multipart upload + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MultipartUpload { + /// Partition key = Upload id = UUID of the object version + pub upload_id: Uuid, + + /// Is this multipart upload deleted + pub deleted: crdt::Bool, + /// List of uploaded parts, key = (part number, timestamp) + /// In case of retries, all versions for each part are kept + /// Everything is cleaned up only once the multipart upload is completed or + /// aborted + pub parts: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct MpuPartKey { + /// Number of the part + pub part_number: u64, + /// Timestamp of part upload + pub timestamp: u64, + } + + /// The version of an uploaded part + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MpuPart { + /// Links to a Version in VersionTable + pub version: Uuid, + /// ETag of the content of this part (known only once done uploading) + pub etag: Option, + /// Size of this part (known only once done uploading) + pub size: Option, + } + + impl garage_util::migrate::InitialFormat for MultipartUpload { + const VERSION_MARKER: &'static [u8] = b"G09s3mpu"; + } +} + +pub use v09::*; + +impl Ord for MpuPartKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.timestamp.cmp(&other.timestamp)) + } +} + +impl PartialOrd for MpuPartKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl MultipartUpload { + pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + Self { + upload_id, + deleted: crdt::Bool::new(deleted), + parts: crdt::Map::new(), + bucket_id, + key, + } + } +} + +impl Entry for MultipartUpload { + fn partition_key(&self) -> &Uuid { + &self.upload_id + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for MultipartUpload { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.parts.clear(); + } else { + self.parts.merge(&other.parts); + } + } +} + +impl Crdt for MpuPart { + fn merge(&mut self, other: &Self) { + self.etag = match (self.etag.take(), &other.etag) { + (None, Some(_)) => other.etag.clone(), + (Some(x), Some(y)) if x < *y => other.etag.clone(), + (x, _) => x, + }; + self.size = match (self.size, other.size) { + (None, Some(_)) => other.size, + (Some(x), Some(y)) if x < y => other.size, + (x, _) => x, + }; + } +} + +pub struct MultipartUploadTable { + pub version_table: Arc>, + pub mpu_counter_table: Arc>, +} + +impl TableSchema for MultipartUploadTable { + const TABLE_NAME: &'static str = "multipart_upload"; + + type P = Uuid; + type S = EmptyKey; + type E = MultipartUpload; + type Filter = DeletedFilter; + + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.mpu_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update multipart object part counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Propagate deletions to version table + if let (Some(old_mpu), Some(new_mpu)) = (old, new) { + if new_mpu.deleted.get() && !old_mpu.deleted.get() { + let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| { + Version::new( + p.version, + VersionBacklink::MultipartUpload { + upload_id: old_mpu.upload_id, + }, + true, + ) + }); + for version in deleted_versions { + let res = self.version_table.queue_insert(tx, &version); + if let Err(e) = db::unabort(res)? { + error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e); + } + } + } + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.is_tombstone()) + } +} + +impl CountedItem for MultipartUpload { + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let uploads = if self.deleted.get() { 0 } else { 1 }; + let mut parts = self + .parts + .items() + .iter() + .map(|(k, _)| k.part_number) + .collect::>(); + parts.dedup(); + let bytes = self + .parts + .items() + .iter() + .map(|(_, p)| p.size.unwrap_or(0)) + .sum::(); + vec![ + (UPLOADS, uploads), + (PARTS, parts.len() as i64), + (BYTES, bytes as i64), + ] + } +} diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 518acc95..a69069b2 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::index_counter::*; +use crate::s3::mpu_table::*; use crate::s3::version_table::*; pub const OBJECTS: &str = "objects"; @@ -130,7 +131,86 @@ mod v08 { } } -pub use v08::*; +mod v09 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v08; + + pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta}; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading { + /// Indicates whether this is a multipart upload + multipart: bool, + /// Headers to be included in the final object + headers: ObjectVersionHeaders, + }, + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + impl garage_util::migrate::Migrate for Object { + const VERSION_MARKER: &'static [u8] = b"G09s3o"; + + type Previous = v08::Object; + + fn migrate(old: v08::Object) -> Object { + let versions = old + .versions + .into_iter() + .map(|x| ObjectVersion { + uuid: x.uuid, + timestamp: x.timestamp, + state: match x.state { + v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading { + multipart: false, + headers: h, + }, + v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d), + v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + }) + .collect(); + Object { + bucket_id: old.bucket_id, + key: old.key, + versions, + } + } + } +} + +pub use v09::*; impl Object { /// Initialize an Object struct from parts @@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState { Complete(a) => { a.merge(b); } - Uploading(_) => { + Uploading { .. } => { *self = Complete(b.clone()); } }, - Uploading(_) => {} + Uploading { .. } => {} } } } @@ -199,8 +279,17 @@ impl ObjectVersion { } /// Is the object version currently being uploaded - pub fn is_uploading(&self) -> bool { - matches!(self.state, ObjectVersionState::Uploading(_)) + /// + /// matches only multipart uploads if check_multipart is Some(true) + /// matches only non-multipart uploads if check_multipart is Some(false) + /// matches both if check_multipart is None + pub fn is_uploading(&self, check_multipart: Option) -> bool { + match &self.state { + ObjectVersionState::Uploading { multipart, .. } => { + check_multipart.map(|x| x == *multipart).unwrap_or(true) + } + _ => false, + } } /// Is the object version completely received @@ -267,13 +356,20 @@ impl Crdt for Object { pub struct ObjectTable { pub version_table: Arc>, + pub mpu_table: Arc>, pub object_counter_table: Arc>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub enum ObjectFilter { + /// Is the object version available (received and not a tombstone) IsData, - IsUploading, + /// Is the object version currently being uploaded + /// + /// matches only multipart uploads if check_multipart is Some(true) + /// matches only non-multipart uploads if check_multipart is Some(false) + /// matches both if check_multipart is None + IsUploading { check_multipart: Option }, } impl TableSchema for ObjectTable { @@ -314,8 +410,29 @@ impl TableSchema for ObjectTable { } }; if newly_deleted { - let deleted_version = - Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + if let ObjectVersionState::Uploading { + multipart: true, .. + } = &v.state + { + let deleted_mpu = + MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.mpu_table.queue_insert(tx, &deleted_mpu); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", + e + ); + } + } + + let deleted_version = Version::new( + v.uuid, + VersionBacklink::Object { + bucket_id: old_v.bucket_id, + key: old_v.key.clone(), + }, + true, + ); let res = self.version_table.queue_insert(tx, &deleted_version); if let Err(e) = db::unabort(res)? { error!( @@ -333,7 +450,10 @@ impl TableSchema for ObjectTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { match filter { ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), - ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), + ObjectFilter::IsUploading { check_multipart } => entry + .versions + .iter() + .any(|v| v.is_uploading(*check_multipart)), } } } @@ -360,10 +480,7 @@ impl CountedItem for Object { } else { 0 }; - let n_unfinished_uploads = versions - .iter() - .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) - .count(); + let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count(); let n_bytes = versions .iter() .map(|v| match &v.state { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6edc83f4..6cf1cc75 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -66,6 +66,8 @@ mod v08 { use super::v05; + pub use v05::{VersionBlock, VersionBlockKey}; + /// A version of an object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { @@ -90,8 +92,6 @@ mod v08 { pub key: String, } - pub use v05::{VersionBlock, VersionBlockKey}; - impl garage_util::migrate::Migrate for Version { type Previous = v05::Version; @@ -110,32 +110,83 @@ mod v08 { } } -pub use v08::*; +pub(crate) mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v08; + + pub use v08::{VersionBlock, VersionBlockKey}; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + pub backlink: VersionBacklink, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum VersionBacklink { + Object { + /// Bucket in which the related object is stored + bucket_id: Uuid, + /// Key in which the related object is stored + key: String, + }, + MultipartUpload { + upload_id: Uuid, + }, + } + + impl garage_util::migrate::Migrate for Version { + const VERSION_MARKER: &'static [u8] = b"G09s3v"; + + type Previous = v08::Version; + + fn migrate(old: v08::Version) -> Version { + Version { + uuid: old.uuid, + deleted: old.deleted, + blocks: old.blocks, + backlink: VersionBacklink::Object { + bucket_id: old.bucket_id, + key: old.key, + }, + } + } + } +} + +pub use v09::*; impl Version { - pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self { Self { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), - parts_etags: crdt::Map::new(), - bucket_id, - key, + backlink, } } pub fn has_part_number(&self, part_number: u64) -> bool { - let case1 = self - .parts_etags - .items() - .binary_search_by(|(k, _)| k.cmp(&part_number)) - .is_ok(); - let case2 = self - .blocks + self.blocks .items() .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) - .is_ok(); - case1 || case2 + .is_ok() } } @@ -175,10 +226,8 @@ impl Crdt for Version { if self.deleted.get() { self.blocks.clear(); - self.parts_etags.clear(); } else { self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); } } } -- cgit v1.2.3 From 82e75c0e296c74c374f3d40feeb1aadcb58398f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 12:02:59 +0200 Subject: Adapt S3 API code to use new multipart upload models - Create and PutPart - completemultipartupload - upload part copy - list_parts --- src/model/s3/mpu_table.rs | 15 +++++++++++++++ src/model/s3/version_table.rs | 11 +++++++++++ 2 files changed, 26 insertions(+) (limited to 'src/model') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index dc5b5a82..7148be51 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::time::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; @@ -94,6 +95,20 @@ impl MultipartUpload { key, } } + + pub fn next_timestamp(&self, part_number: u64) -> u64 { + std::cmp::max( + now_msec(), + 1 + self + .parts + .items() + .iter() + .filter(|(x, _)| x.part_number == part_number) + .map(|(x, _)| x.timestamp) + .max() + .unwrap_or(0), + ) + } } impl Entry for MultipartUpload { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6cf1cc75..dcf4110a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::error::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; @@ -188,6 +189,16 @@ impl Version { .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) .is_ok() } + + pub fn n_parts(&self) -> Result { + Ok(self + .blocks + .items() + .last() + .ok_or_message("version has no parts")? + .0 + .part_number) + } } impl Ord for VersionBlockKey { -- cgit v1.2.3 From 511e07ecd489fa72040171fe908323873a57ac19 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 11:49:23 +0200 Subject: fix mpu counter (add missing workers) and report info at appropriate places --- src/model/garage.rs | 2 ++ src/model/helper/bucket.rs | 4 +++- src/model/s3/mpu_table.rs | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 31da1715..db2475ed 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -335,6 +335,8 @@ impl Garage { self.object_table.spawn_workers(bg); self.object_counter_table.spawn_workers(bg); + self.mpu_table.spawn_workers(bg); + self.mpu_counter_table.spawn_workers(bg); self.version_table.spawn_workers(bg); self.block_ref_table.spawn_workers(bg); diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index ff711d29..576d03f3 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -478,7 +478,9 @@ impl<'a> BucketHelper<'a> { // ---- /// Deletes all incomplete multipart uploads that are older than a certain time. - /// Returns the number of uploads aborted + /// Returns the number of uploads aborted. + /// This will also include non-multipart uploads, which may be lingering + /// after a node crash pub async fn cleanup_incomplete_uploads( &self, bucket_id: &Uuid, diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 7148be51..4764e8da 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -208,7 +208,7 @@ impl TableSchema for MultipartUploadTable { } impl CountedItem for MultipartUpload { - const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_counter"; // Partition key = bucket id type CP = Uuid; -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/model/s3/mpu_table.rs | 13 ++++++------- src/model/s3/version_table.rs | 5 +++-- 2 files changed, 9 insertions(+), 9 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 4764e8da..63a4f1af 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use garage_db as db; +use garage_util::crdt::Crdt; use garage_util::data::*; use garage_util::time::*; -use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -21,8 +21,6 @@ mod v09 { use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; - pub use crate::s3::version_table::v09::VersionBlock; - /// A part of a multipart upload #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct MultipartUpload { @@ -30,15 +28,16 @@ mod v09 { pub upload_id: Uuid, /// Is this multipart upload deleted + /// The MultipartUpload is marked as deleted as soon as the + /// multipart upload is either completed or aborted pub deleted: crdt::Bool, /// List of uploaded parts, key = (part number, timestamp) /// In case of retries, all versions for each part are kept - /// Everything is cleaned up only once the multipart upload is completed or - /// aborted + /// Everything is cleaned up only once the MultipartUpload is marked deleted pub parts: crdt::Map, - // Back link to bucket+key so that we can figure if - // this was deleted later on + // Back link to bucket+key so that we can find the object this mpu + // belongs to and check whether it is still valid /// Bucket in which the related object is stored pub bucket_id: Uuid, /// Key in which the related object is stored diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index dcf4110a..5c032f9f 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -134,8 +134,9 @@ pub(crate) mod v09 { /// list of blocks of data composing the version pub blocks: crdt::Map, - // Back link to bucket+key so that we can figure if - // this was deleted later on + // Back link to owner of this version (either an object or a multipart + // upload), used to find whether it has been deleted and this version + // should in turn be deleted (see versions repair procedure) pub backlink: VersionBacklink, } -- cgit v1.2.3 From 3d477906d4ff418de10973db7bd3e940f2e47b2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Jun 2023 17:13:27 +0200 Subject: properly delete multipart uploads after completion --- src/model/s3/object_table.rs | 54 ++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 20 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a69069b2..db5ccf96 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -397,34 +397,20 @@ impl TableSchema for ObjectTable { // 2. Enqueue propagation deletions to version table if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions for v in old_v.versions.iter() { - let newly_deleted = match new_v + let new_v_id = new_v .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())); + + // Propagate deletion of old versions to the Version table + let delete_version = match new_v_id { Err(_) => true, Ok(i) => { new_v.versions[i].state == ObjectVersionState::Aborted && v.state != ObjectVersionState::Aborted } }; - if newly_deleted { - if let ObjectVersionState::Uploading { - multipart: true, .. - } = &v.state - { - let deleted_mpu = - MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); - let res = self.mpu_table.queue_insert(tx, &deleted_mpu); - if let Err(e) = db::unabort(res)? { - error!( - "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", - e - ); - } - } - + if delete_version { let deleted_version = Version::new( v.uuid, VersionBacklink::Object { @@ -441,6 +427,34 @@ impl TableSchema for ObjectTable { ); } } + + // After abortion or completion of multipart uploads, delete MPU table entry + if matches!( + v.state, + ObjectVersionState::Uploading { + multipart: true, + .. + } + ) { + let delete_mpu = match new_v_id { + Err(_) => true, + Ok(i) => !matches!( + new_v.versions[i].state, + ObjectVersionState::Uploading { .. } + ), + }; + if delete_mpu { + let deleted_mpu = + MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.mpu_table.queue_insert(tx, &deleted_mpu); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", + e + ); + } + } + } } } -- cgit v1.2.3 From 942c1f1bfe138cbc4e49540cede852e4d462590e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 10:48:22 +0200 Subject: multipart uploads: save timestamp --- src/model/s3/mpu_table.rs | 11 ++++++++++- src/model/s3/object_table.rs | 9 +++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 63a4f1af..238cbf11 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -27,6 +27,8 @@ mod v09 { /// Partition key = Upload id = UUID of the object version pub upload_id: Uuid, + /// The timestamp at which the multipart upload was created + pub timestamp: u64, /// Is this multipart upload deleted /// The MultipartUpload is marked as deleted as soon as the /// multipart upload is either completed or aborted @@ -85,9 +87,16 @@ impl PartialOrd for MpuPartKey { } impl MultipartUpload { - pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + pub fn new( + upload_id: Uuid, + timestamp: u64, + bucket_id: Uuid, + key: String, + deleted: bool, + ) -> Self { Self { upload_id, + timestamp, deleted: crdt::Bool::new(deleted), parts: crdt::Map::new(), bucket_id, diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index db5ccf96..ebea04bd 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -444,8 +444,13 @@ impl TableSchema for ObjectTable { ), }; if delete_mpu { - let deleted_mpu = - MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let deleted_mpu = MultipartUpload::new( + v.uuid, + v.timestamp, + old_v.bucket_id, + old_v.key.clone(), + true, + ); let res = self.mpu_table.queue_insert(tx, &deleted_mpu); if let Err(e) = db::unabort(res)? { error!( -- cgit v1.2.3 From 8ef42c9609bcefc642cc9739acb921dffba49b89 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Jun 2023 17:13:41 +0200 Subject: admin docs: reformatting, key admin: add check --- src/model/key_table.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'src/model') diff --git a/src/model/key_table.rs b/src/model/key_table.rs index bb5334a3..a9762f1b 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -149,11 +149,19 @@ impl Key { } /// Import a key from it's parts - pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { - Self { + pub fn import(key_id: &str, secret_key: &str, name: &str) -> Result { + if key_id.len() != 26 || &key_id[..2] != "GK" || hex::decode(&key_id[2..]).is_err() { + return Err("The specified key ID is not a valid Garage key ID (starts with `GK`, followed by 12 hex-encoded bytes)"); + } + + if secret_key.len() != 64 || hex::decode(&secret_key).is_err() { + return Err("The specified secret key is not a valid Garage secret key (composed of 32 hex-encoded bytes)"); + } + + Ok(Self { key_id: key_id.to_string(), state: crdt::Deletable::present(KeyParams::new(secret_key, name)), - } + }) } /// Create a new Key which can me merged to mark an existing key deleted -- cgit v1.2.3 From 0b83e0558e5e3fee5237edac0ae6d9ba304bb073 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 29 Aug 2023 16:44:27 +0200 Subject: bucket_table: data model for lifecycle configuration --- src/model/bucket_table.rs | 40 ++++++++++++++++++++++++++++++++++++++++ src/model/migrate.rs | 1 + 2 files changed, 41 insertions(+) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index ac163736..dc4e4509 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -48,6 +48,9 @@ mod v08 { pub website_config: crdt::Lww>, /// CORS rules pub cors_config: crdt::Lww>>, + /// Lifecycle configuration + #[serde(default)] + pub lifecycle_config: crdt::Lww>>, /// Bucket quotas #[serde(default)] pub quotas: crdt::Lww, @@ -69,6 +72,42 @@ mod v08 { pub expose_headers: Vec, } + /// Lifecycle configuration rule + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct LifecycleRule { + /// The ID of the rule + pub id: Option, + /// Whether the rule is active + pub enabled: bool, + /// The filter to check whether rule applies to a given object + pub filter: LifecycleFilter, + /// Number of days after which incomplete multipart uploads are aborted + pub abort_incomplete_mpu_days: Option, + /// Expiration policy for stored objects + pub expiration: Option, + } + + /// A lifecycle filter is a set of conditions that must all be true. + /// For each condition, if it is None, it is not verified (always true), + /// and if it is Some(x), then it is verified for value x + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct LifecycleFilter { + /// If Some(x), object key has to start with prefix x + pub prefix: Option, + /// If Some(x), object size has to be more than x + pub size_gt: Option, + /// If Some(x), object size has to be less than x + pub size_lt: Option, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum LifecycleExpiration { + /// Objects expire x days after they were created + AfterDays(usize), + /// Objects expire at date x (must be in yyyy-mm-dd format) + AtDate(String), + } + #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct BucketQuotas { /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) @@ -96,6 +135,7 @@ impl BucketParams { local_aliases: crdt::LwwMap::new(), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), + lifecycle_config: crdt::Lww::new(None), quotas: crdt::Lww::new(BucketQuotas::default()), } } diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 6b4c3eed..4c74b43b 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -78,6 +78,7 @@ impl Migrate { local_aliases: LwwMap::new(), website_config: Lww::new(website), cors_config: Lww::new(None), + lifecycle_config: Lww::new(None), quotas: Lww::new(Default::default()), }), }) -- cgit v1.2.3 From abf011c2906d04200bb39d7bc82f7ed973215500 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 29 Aug 2023 18:22:03 +0200 Subject: lifecycle: implement validation into garage's internal data structure --- src/model/bucket_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index dc4e4509..fed20e05 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -90,7 +90,7 @@ mod v08 { /// A lifecycle filter is a set of conditions that must all be true. /// For each condition, if it is None, it is not verified (always true), /// and if it is Some(x), then it is verified for value x - #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)] pub struct LifecycleFilter { /// If Some(x), object key has to start with prefix x pub prefix: Option, -- cgit v1.2.3 From f7b409f1140addd508c626b1e80f0f8de52a5639 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 11:24:01 +0200 Subject: use a NaiveDate in data model, it serializes to string (iso 8601 format) --- src/model/Cargo.toml | 1 + src/model/bucket_table.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 69f7eea4..58d9fdb7 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -23,6 +23,7 @@ garage_util.workspace = true async-trait = "0.1.7" arc-swap = "1.0" blake2 = "0.10" +chrono = { version = "0.4", features = ["serde"] } err-derive = "0.3" hex = "0.4" base64 = "0.21" diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index fed20e05..306a58ab 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -105,7 +105,7 @@ mod v08 { /// Objects expire x days after they were created AfterDays(usize), /// Objects expire at date x (must be in yyyy-mm-dd format) - AtDate(String), + AtDate(chrono::naive::NaiveDate), } #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -- cgit v1.2.3 From a2e0e34db57b326ad5c9e7c9218fb9e29900e705 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 12:41:11 +0200 Subject: lifecycle: skeleton for lifecycle worker --- src/model/s3/lifecycle_worker.rs | 252 +++++++++++++++++++++++++++++++++++++++ src/model/s3/mod.rs | 2 + 2 files changed, 254 insertions(+) create mode 100644 src/model/s3/lifecycle_worker.rs (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs new file mode 100644 index 00000000..049fa2a3 --- /dev/null +++ b/src/model/s3/lifecycle_worker.rs @@ -0,0 +1,252 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::prelude::*; +use std::time::{Duration, Instant}; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::{Error, OkOrMessage}; +use garage_util::persister::PersisterShared; +use garage_util::time::*; + +use garage_table::EmptyKey; + +use crate::bucket_table::*; +use crate::s3::object_table::*; + +use crate::garage::Garage; + +mod v090 { + use chrono::naive::NaiveDate; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize, Default, Clone, Copy)] + pub struct LifecycleWorkerPersisted { + pub last_completed: Option, + } + + impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { + const VERSION_MARKER: &'static [u8] = b"G09lwp"; + } +} + +pub use v090::*; + +pub struct LifecycleWorker { + garage: Arc, + + state: State, + + persister: PersisterShared, +} + +enum State { + Completed(NaiveDate), + Running { + date: NaiveDate, + pos: Vec, + counter: usize, + objects_expired: usize, + mpu_aborted: usize, + last_bucket: Option, + }, +} + +pub fn register_bg_vars( + persister: &PersisterShared, + vars: &mut vars::BgVars, +) { + vars.register_ro(persister, "lifecycle-last-completed", |p| { + p.get_with(|x| { + x.last_completed + .map(|date| date.to_string()) + .unwrap_or("never".to_string()) + }) + }); +} + +impl LifecycleWorker { + pub fn new(garage: Arc, persister: PersisterShared) -> Self { + let today = today(); + let state = match persister.get_with(|x| x.last_completed) { + Some(d) if d >= today => State::Completed(d), + _ => State::Running { + date: today, + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + }, + }; + Self { + garage, + state, + persister, + } + } +} + +#[async_trait] +impl Worker for LifecycleWorker { + fn name(&self) -> String { + "object lifecycle worker".to_string() + } + + fn status(&self) -> WorkerStatus { + match &self.state { + State::Completed(d) => WorkerStatus { + freeform: vec![format!("Last completed: {}", d)], + ..Default::default() + }, + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + .. + } => { + let n_objects = self + .garage + .object_table + .data + .store + .fast_len() + .unwrap_or(None); + let progress = match n_objects { + None => "...".to_string(), + Some(total) => format!( + "~{:.2}%", + 100. * std::cmp::min(*counter, total) as f32 / total as f32 + ), + }; + WorkerStatus { + progress: Some(progress), + freeform: vec![ + format!("Started: {}", date), + format!("Objects expired: {}", objects_expired), + format!("Multipart uploads aborted: { }", mpu_aborted), + ], + ..Default::default() + } + } + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + match &mut self.state { + State::Completed(_) => Ok(WorkerState::Idle), + State::Running { + date, + counter, + objects_expired, + mpu_aborted, + pos, + last_bucket, + } => { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(*date))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + process_object( + &self.garage, + object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + *pos = next_pos; + + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + match &self.state { + State::Completed(d) => { + let now = now_msec(); + let next_start = midnight_ts(d.succ()); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } + self.state = State::Running { + date: today(), + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + }; + } + State::Running { .. } => (), + } + WorkerState::Busy + } +} + +async fn process_object( + garage: &Arc, + object: Object, + objects_expired: &mut usize, + mpu_aborted: &mut usize, + last_bucket: &mut Option, +) -> Result<(), Error> { + let bucket = match last_bucket.take() { + Some(b) if b.id == object.bucket_id => b, + _ => garage + .bucket_table + .get(&EmptyKey, &object.bucket_id) + .await? + .ok_or_message("object in non-existent bucket")?, + }; + + let lifecycle_policy: &[LifecycleRule] = bucket + .state + .as_option() + .and_then(|s| s.lifecycle_config.get().as_deref()) + .unwrap_or_default(); + + for rule in lifecycle_policy.iter() { + todo!() + } + + *last_bucket = Some(bucket); + Ok(()) +} + +fn midnight_ts(date: NaiveDate) -> u64 { + date.and_hms(0, 0, 0).timestamp_millis() as u64 +} + +fn next_date(ts: u64) -> NaiveDate { + NaiveDateTime::from_timestamp_millis(ts as i64) + .expect("bad timestamp") + .date() + .succ() +} + +fn today() -> NaiveDate { + Utc::today().naive_utc() +} diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 36d67093..5c776fb0 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -2,3 +2,5 @@ pub mod block_ref_table; pub mod mpu_table; pub mod object_table; pub mod version_table; + +pub mod lifecycle_worker; -- cgit v1.2.3 From 2996dc875fc378ec3597bfa3bdb8ba8951e1865c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:28:48 +0200 Subject: lifecycle worker: implement main functionality --- src/model/bucket_table.rs | 4 +- src/model/s3/lifecycle_worker.rs | 102 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 99 insertions(+), 7 deletions(-) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 306a58ab..e9d574c5 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -95,9 +95,9 @@ mod v08 { /// If Some(x), object key has to start with prefix x pub prefix: Option, /// If Some(x), object size has to be more than x - pub size_gt: Option, + pub size_gt: Option, /// If Some(x), object size has to be less than x - pub size_lt: Option, + pub size_lt: Option, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 049fa2a3..069f44a0 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -6,6 +6,7 @@ use std::time::{Duration, Instant}; use tokio::sync::watch; use garage_util::background::*; +use garage_util::data::*; use garage_util::error::{Error, OkOrMessage}; use garage_util::persister::PersisterShared; use garage_util::time::*; @@ -165,6 +166,7 @@ impl Worker for LifecycleWorker { let object = self.garage.object_table.data.decode_entry(&object_bytes)?; process_object( &self.garage, + *date, object, objects_expired, mpu_aborted, @@ -184,7 +186,7 @@ impl Worker for LifecycleWorker { match &self.state { State::Completed(d) => { let now = now_msec(); - let next_start = midnight_ts(d.succ()); + let next_start = midnight_ts(d.succ_opt().expect("no next day")); if now < next_start { tokio::time::sleep_until( (Instant::now() + Duration::from_millis(next_start - now)).into(), @@ -208,6 +210,7 @@ impl Worker for LifecycleWorker { async fn process_object( garage: &Arc, + now_date: NaiveDate, object: Object, objects_expired: &mut usize, mpu_aborted: &mut usize, @@ -229,24 +232,113 @@ async fn process_object( .unwrap_or_default(); for rule in lifecycle_policy.iter() { - todo!() + if let Some(pfx) = &rule.filter.prefix { + if !object.key.starts_with(pfx) { + continue; + } + } + + if let Some(expire) = &rule.expiration { + if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) { + let version_date = next_date(current_version.timestamp); + + let current_version_data = match ¤t_version.state { + ObjectVersionState::Complete(c) => c, + _ => unreachable!(), + }; + + let size_match = check_size_filter(current_version_data, &rule.filter); + let date_match = match expire { + LifecycleExpiration::AfterDays(n_days) => { + (now_date - version_date) >= chrono::Duration::days(*n_days as i64) + } + LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date, + }; + + if size_match && date_match { + // Delete expired version + let deleted_object = Object::new( + object.bucket_id, + object.key.clone(), + vec![ObjectVersion { + uuid: gen_uuid(), + timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1), + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + garage.object_table.insert(&deleted_object).await?; + *objects_expired += 1; + } + } + } + + if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days { + let aborted_versions = object + .versions() + .iter() + .filter_map(|v| { + let version_date = next_date(v.timestamp); + match &v.state { + ObjectVersionState::Uploading { .. } + if (now_date - version_date) + >= chrono::Duration::days(*abort_mpu_days as i64) => + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } + _ => None, + } + }) + .collect::>(); + if !aborted_versions.is_empty() { + // Insert aborted mpu info + let n_aborted = aborted_versions.len(); + let aborted_object = + Object::new(object.bucket_id, object.key.clone(), aborted_versions); + garage.object_table.insert(&aborted_object).await?; + *mpu_aborted += n_aborted; + } + } } *last_bucket = Some(bucket); Ok(()) } +fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { + let size = match version_data { + ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size, + _ => unreachable!(), + }; + if let Some(size_gt) = filter.size_gt { + if !(size > size_gt) { + return false; + } + } + if let Some(size_lt) = filter.size_lt { + if !(size < size_lt) { + return false; + } + } + return true; +} + fn midnight_ts(date: NaiveDate) -> u64 { - date.and_hms(0, 0, 0).timestamp_millis() as u64 + date.and_hms_opt(0, 0, 0) + .expect("midnight does not exist") + .timestamp_millis() as u64 } fn next_date(ts: u64) -> NaiveDate { NaiveDateTime::from_timestamp_millis(ts as i64) .expect("bad timestamp") .date() - .succ() + .succ_opt() + .expect("no next day") } fn today() -> NaiveDate { - Utc::today().naive_utc() + Utc::now().naive_utc().date() } -- cgit v1.2.3 From da8b224e241edad8cfe25f0b0256ebb0d60fa8dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:38:19 +0200 Subject: lifecycle worker: skip entire bucket when no lifecycle config is set --- src/model/s3/lifecycle_worker.rs | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 069f44a0..1981e0fd 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -54,6 +54,12 @@ enum State { }, } +#[derive(Clone, Copy, Eq, PartialEq)] +enum Skip { + SkipBucket, + NextObject, +} + pub fn register_bg_vars( persister: &PersisterShared, vars: &mut vars::BgVars, @@ -164,10 +170,10 @@ impl Worker for LifecycleWorker { }; let object = self.garage.object_table.data.decode_entry(&object_bytes)?; - process_object( + let skip = process_object( &self.garage, *date, - object, + &object, objects_expired, mpu_aborted, last_bucket, @@ -175,7 +181,13 @@ impl Worker for LifecycleWorker { .await?; *counter += 1; - *pos = next_pos; + if skip == Skip::SkipBucket { + let bucket_id_len = object.bucket_id.as_slice().len(); + assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); + *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + } else { + *pos = next_pos; + } Ok(WorkerState::Busy) } @@ -211,11 +223,11 @@ impl Worker for LifecycleWorker { async fn process_object( garage: &Arc, now_date: NaiveDate, - object: Object, + object: &Object, objects_expired: &mut usize, mpu_aborted: &mut usize, last_bucket: &mut Option, -) -> Result<(), Error> { +) -> Result { let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, _ => garage @@ -231,6 +243,10 @@ async fn process_object( .and_then(|s| s.lifecycle_config.get().as_deref()) .unwrap_or_default(); + if lifecycle_policy.is_empty() { + return Ok(Skip::SkipBucket); + } + for rule in lifecycle_policy.iter() { if let Some(pfx) = &rule.filter.prefix { if !object.key.starts_with(pfx) { @@ -304,7 +320,7 @@ async fn process_object( } *last_bucket = Some(bucket); - Ok(()) + Ok(Skip::NextObject) } fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { -- cgit v1.2.3 From 0f1849e1ac882f5f88fe341549f0e7f01a1a7b70 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:51:08 +0200 Subject: lifecycle worker: launch with the rest of Garage --- src/model/garage.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index db2475ed..981430fb 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,6 +7,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; +use garage_util::persister::PersisterShared; use garage_rpc::replication_mode::ReplicationMode; use garage_rpc::system::System; @@ -17,6 +18,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::s3::block_ref_table::*; +use crate::s3::lifecycle_worker; use crate::s3::mpu_table::*; use crate::s3::object_table::*; use crate::s3::version_table::*; @@ -67,6 +69,9 @@ pub struct Garage { /// Table containing S3 block references (not blocks themselves) pub block_ref_table: Arc>, + /// Persister for lifecycle worker info + pub lifecycle_persister: PersisterShared, + #[cfg(feature = "k2v")] pub k2v: GarageK2V, } @@ -199,6 +204,9 @@ impl Garage { let replication_mode = ReplicationMode::parse(&config.replication_mode) .ok_or_message("Invalid replication_mode in config file.")?; + info!("Initialize background variable system..."); + let mut bg_vars = vars::BgVars::new(); + info!("Initialize membership management system..."); let system = System::new(network_key, replication_mode, &config)?; @@ -230,6 +238,7 @@ impl Garage { data_rep_param, system.clone(), ); + block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- info!("Initialize bucket_table..."); @@ -296,14 +305,15 @@ impl Garage { &db, ); + info!("Load lifecycle worker state..."); + let lifecycle_persister = + PersisterShared::new(&system.metadata_dir, "lifecycle_worker_state"); + lifecycle_worker::register_bg_vars(&lifecycle_persister, &mut bg_vars); + // ---- K2V ---- #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - // Initialize bg vars - let mut bg_vars = vars::BgVars::new(); - block_manager.register_bg_vars(&mut bg_vars); - // -- done -- Ok(Arc::new(Self { config, @@ -321,12 +331,13 @@ impl Garage { mpu_counter_table, version_table, block_ref_table, + lifecycle_persister, #[cfg(feature = "k2v")] k2v, })) } - pub fn spawn_workers(&self, bg: &BackgroundRunner) { + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { self.block_manager.spawn_workers(bg); self.bucket_table.spawn_workers(bg); @@ -340,6 +351,11 @@ impl Garage { self.version_table.spawn_workers(bg); self.block_ref_table.spawn_workers(bg); + bg.spawn_worker(lifecycle_worker::LifecycleWorker::new( + self.clone(), + self.lifecycle_persister.clone(), + )); + #[cfg(feature = "k2v")] self.k2v.spawn_workers(bg); } -- cgit v1.2.3 From 7200954318a1b248b4194ee9273bcd2502b50d58 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:54:52 +0200 Subject: lifecycle worker: add logging --- src/model/s3/lifecycle_worker.rs | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 1981e0fd..02e296e7 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -282,6 +282,10 @@ async fn process_object( state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); + info!( + "Lifecycle: expiring 1 object in bucket {:?}", + object.bucket_id + ); garage.object_table.insert(&deleted_object).await?; *objects_expired += 1; } @@ -311,6 +315,10 @@ async fn process_object( if !aborted_versions.is_empty() { // Insert aborted mpu info let n_aborted = aborted_versions.len(); + info!( + "Lifecycle: aborting {} incomplete upload(s) in bucket {:?}", + n_aborted, object.bucket_id + ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); garage.object_table.insert(&aborted_object).await?; -- cgit v1.2.3 From 75ccc5a95c76f31235fcaab8a2c1795693733a4b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 20:02:07 +0200 Subject: lifecycle config: store date as given, try to debug --- src/model/bucket_table.rs | 16 +++++++++++++++- src/model/s3/lifecycle_worker.rs | 9 ++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index e9d574c5..df2e9b4a 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -105,7 +105,7 @@ mod v08 { /// Objects expire x days after they were created AfterDays(usize), /// Objects expire at date x (must be in yyyy-mm-dd format) - AtDate(chrono::naive::NaiveDate), + AtDate(String), } #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] @@ -155,6 +155,20 @@ impl Crdt for BucketParams { } } +pub fn parse_lifecycle_date(date: &str) -> Result { + use chrono::prelude::*; + + if let Ok(datetime) = NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%SZ") { + if datetime.time() == NaiveTime::MIN { + Ok(datetime.date()) + } else { + Err("date must be at midnight") + } + } else { + NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| "date has invalid format") + } +} + impl Default for Bucket { fn default() -> Self { Self::new() diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 02e296e7..5641b093 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -268,7 +268,14 @@ async fn process_object( LifecycleExpiration::AfterDays(n_days) => { (now_date - version_date) >= chrono::Duration::days(*n_days as i64) } - LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date, + LifecycleExpiration::AtDate(exp_date) => { + if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { + now_date >= exp_date + } else { + warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); + false + } + } }; if size_match && date_match { -- cgit v1.2.3 From d2e94e36d64d4062ebe1fabac65ac1a6f265de17 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 20:05:53 +0200 Subject: lifecycle config: add missing line in merge() and remove tracing --- src/model/bucket_table.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index df2e9b4a..0eefa0e5 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -151,6 +151,7 @@ impl Crdt for BucketParams { self.website_config.merge(&o.website_config); self.cors_config.merge(&o.cors_config); + self.lifecycle_config.merge(&o.lifecycle_config); self.quotas.merge(&o.quotas); } } -- cgit v1.2.3 From a1d57283c0b37baabfb624d3696cc6efbaa4a500 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 20:07:14 +0200 Subject: bucket_table: bucketparams::new doesn't need to be pub --- src/model/bucket_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 0eefa0e5..4c48a76f 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -127,7 +127,7 @@ impl AutoCrdt for BucketQuotas { impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss - pub fn new() -> Self { + fn new() -> Self { BucketParams { creation_date: now_msec(), authorized_keys: crdt::Map::new(), -- cgit v1.2.3 From 01c327a07a6045055fef6f923848fe6046e937c4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 23:46:15 +0200 Subject: lifecycle worker: avoid building chrono's serde feature --- src/model/Cargo.toml | 2 +- src/model/s3/lifecycle_worker.rs | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 58d9fdb7..3794cc59 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -23,7 +23,7 @@ garage_util.workspace = true async-trait = "0.1.7" arc-swap = "1.0" blake2 = "0.10" -chrono = { version = "0.4", features = ["serde"] } +chrono = "0.4" err-derive = "0.3" hex = "0.4" base64 = "0.21" diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 5641b093..02374bf0 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -19,12 +19,11 @@ use crate::s3::object_table::*; use crate::garage::Garage; mod v090 { - use chrono::naive::NaiveDate; use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize, Default, Clone, Copy)] + #[derive(Serialize, Deserialize, Default, Clone)] pub struct LifecycleWorkerPersisted { - pub last_completed: Option, + pub last_completed: Option, } impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted { @@ -65,18 +64,19 @@ pub fn register_bg_vars( vars: &mut vars::BgVars, ) { vars.register_ro(persister, "lifecycle-last-completed", |p| { - p.get_with(|x| { - x.last_completed - .map(|date| date.to_string()) - .unwrap_or("never".to_string()) - }) + p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string())) }); } impl LifecycleWorker { pub fn new(garage: Arc, persister: PersisterShared) -> Self { let today = today(); - let state = match persister.get_with(|x| x.last_completed) { + let last_completed = persister.get_with(|x| { + x.last_completed + .as_deref() + .and_then(|x| x.parse::().ok()) + }); + let state = match last_completed { Some(d) if d >= today => State::Completed(d), _ => State::Running { date: today, @@ -162,7 +162,7 @@ impl Worker for LifecycleWorker { None => { info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); self.persister - .set_with(|x| x.last_completed = Some(*date))?; + .set_with(|x| x.last_completed = Some(date.to_string()))?; self.state = State::Completed(*date); return Ok(WorkerState::Idle); } -- cgit v1.2.3 From b2f679675e3390bea6c6b3b9fb3632d0ed414a75 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 23:52:09 +0200 Subject: lifecycle worker: take into account disabled rules --- src/model/s3/lifecycle_worker.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 02374bf0..d46d70f3 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -243,11 +243,15 @@ async fn process_object( .and_then(|s| s.lifecycle_config.get().as_deref()) .unwrap_or_default(); - if lifecycle_policy.is_empty() { + if lifecycle_policy.iter().all(|x| !x.enabled) { return Ok(Skip::SkipBucket); } for rule in lifecycle_policy.iter() { + if !rule.enabled { + continue; + } + if let Some(pfx) = &rule.filter.prefix { if !object.key.starts_with(pfx) { continue; -- cgit v1.2.3 From 1cfcc61de83b832a78c8f93aaaf935a29845cd8b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 00:28:37 +0200 Subject: lifecycle worker: mitigate potential bugs + refactoring --- src/model/s3/lifecycle_worker.rs | 51 ++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 20 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index d46d70f3..670ed9fe 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -197,16 +197,21 @@ impl Worker for LifecycleWorker { async fn wait_for_work(&mut self) -> WorkerState { match &self.state { State::Completed(d) => { - let now = now_msec(); - let next_start = midnight_ts(d.succ_opt().expect("no next day")); - if now < next_start { - tokio::time::sleep_until( - (Instant::now() + Duration::from_millis(next_start - now)).into(), - ) - .await; + let next_day = d.succ_opt().expect("no next day"); + let next_start = midnight_ts(next_day); + loop { + let now = now_msec(); + if now < next_start { + tokio::time::sleep_until( + (Instant::now() + Duration::from_millis(next_start - now)).into(), + ) + .await; + } else { + break; + } } self.state = State::Running { - date: today(), + date: std::cmp::max(next_day, today()), pos: vec![], counter: 0, objects_expired: 0, @@ -228,6 +233,14 @@ async fn process_object( mpu_aborted: &mut usize, last_bucket: &mut Option, ) -> Result { + if !object + .versions() + .iter() + .any(|x| x.is_data() || x.is_uploading(None)) + { + return Ok(Skip::NextObject); + } + let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, _ => garage @@ -276,7 +289,7 @@ async fn process_object( if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { now_date >= exp_date } else { - warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); + warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); false } } @@ -309,17 +322,15 @@ async fn process_object( .iter() .filter_map(|v| { let version_date = next_date(v.timestamp); - match &v.state { - ObjectVersionState::Uploading { .. } - if (now_date - version_date) - >= chrono::Duration::days(*abort_mpu_days as i64) => - { - Some(ObjectVersion { - state: ObjectVersionState::Aborted, - ..*v - }) - } - _ => None, + if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64) + && matches!(&v.state, ObjectVersionState::Uploading { .. }) + { + Some(ObjectVersion { + state: ObjectVersionState::Aborted, + ..*v + }) + } else { + None } }) .collect::>(); -- cgit v1.2.3 From adbf5925de733484998c3a788c4ec7e8cda2cec4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:19:26 +0200 Subject: lifecycle worker: use queue_insert and process objects in batches --- src/model/s3/lifecycle_worker.rs | 81 ++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 36 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 670ed9fe..f99cc935 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -152,41 +152,44 @@ impl Worker for LifecycleWorker { pos, last_bucket, } => { - let (object_bytes, next_pos) = match self - .garage - .object_table - .data - .store - .get_gt(&pos)? - { - None => { - info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); - self.persister - .set_with(|x| x.last_completed = Some(date.to_string()))?; - self.state = State::Completed(*date); - return Ok(WorkerState::Idle); + // Process a batch of 100 items before yielding to bg task scheduler + for _ in 0..100 { + let (object_bytes, next_pos) = match self + .garage + .object_table + .data + .store + .get_gt(&pos)? + { + None => { + info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted); + self.persister + .set_with(|x| x.last_completed = Some(date.to_string()))?; + self.state = State::Completed(*date); + return Ok(WorkerState::Idle); + } + Some((k, v)) => (v, k), + }; + + let object = self.garage.object_table.data.decode_entry(&object_bytes)?; + let skip = process_object( + &self.garage, + *date, + &object, + objects_expired, + mpu_aborted, + last_bucket, + ) + .await?; + + *counter += 1; + if skip == Skip::SkipBucket { + let bucket_id_len = object.bucket_id.as_slice().len(); + assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); + *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + } else { + *pos = next_pos; } - Some((k, v)) => (v, k), - }; - - let object = self.garage.object_table.data.decode_entry(&object_bytes)?; - let skip = process_object( - &self.garage, - *date, - &object, - objects_expired, - mpu_aborted, - last_bucket, - ) - .await?; - - *counter += 1; - if skip == Skip::SkipBucket { - let bucket_id_len = object.bucket_id.as_slice().len(); - assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); - *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); - } else { - *pos = next_pos; } Ok(WorkerState::Busy) @@ -260,6 +263,8 @@ async fn process_object( return Ok(Skip::SkipBucket); } + let db = garage.object_table.data.store.db(); + for rule in lifecycle_policy.iter() { if !rule.enabled { continue; @@ -310,7 +315,9 @@ async fn process_object( "Lifecycle: expiring 1 object in bucket {:?}", object.bucket_id ); - garage.object_table.insert(&deleted_object).await?; + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &deleted_object) + })?; *objects_expired += 1; } } @@ -343,7 +350,9 @@ async fn process_object( ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); - garage.object_table.insert(&aborted_object).await?; + db.transaction(|mut tx| { + garage.object_table.queue_insert(&mut tx, &aborted_object) + })?; *mpu_aborted += n_aborted; } } -- cgit v1.2.3 From a00a52633f7846c3683da65a07266a03f88b0f74 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:25:14 +0200 Subject: lifecycle worker: add log message when starting --- src/model/s3/lifecycle_worker.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index f99cc935..53c84a17 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -78,14 +78,7 @@ impl LifecycleWorker { }); let state = match last_completed { Some(d) if d >= today => State::Completed(d), - _ => State::Running { - date: today, - pos: vec![], - counter: 0, - objects_expired: 0, - mpu_aborted: 0, - last_bucket: None, - }, + _ => State::start(today), }; Self { garage, @@ -95,6 +88,20 @@ impl LifecycleWorker { } } +impl State { + fn start(date: NaiveDate) -> Self { + info!("Starting lifecycle worker for {}", date); + State::Running { + date, + pos: vec![], + counter: 0, + objects_expired: 0, + mpu_aborted: 0, + last_bucket: None, + } + } +} + #[async_trait] impl Worker for LifecycleWorker { fn name(&self) -> String { @@ -213,14 +220,7 @@ impl Worker for LifecycleWorker { break; } } - self.state = State::Running { - date: std::cmp::max(next_day, today()), - pos: vec![], - counter: 0, - objects_expired: 0, - mpu_aborted: 0, - last_bucket: None, - }; + self.state = State::start(std::cmp::max(next_day, today())); } State::Running { .. } => (), } -- cgit v1.2.3 From f579d6d9b42ef03d639cc7356b2fa15265074120 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:29:54 +0200 Subject: lifecycle worker: fix potential inifinite loop --- src/model/s3/lifecycle_worker.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 53c84a17..0747ffb8 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -193,7 +193,10 @@ impl Worker for LifecycleWorker { if skip == Skip::SkipBucket { let bucket_id_len = object.bucket_id.as_slice().len(); assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); - *pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat(); + *pos = std::cmp::max( + next_pos, + [&pos[..bucket_id_len], &[0xFFu8][..]].concat(), + ); } else { *pos = next_pos; } -- cgit v1.2.3 From 1cdc321e28ccfbbe425365f3a03a526c3f456e3f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:36:30 +0200 Subject: lifecycle worker: don't get stuck on non-existent bucket --- src/model/s3/lifecycle_worker.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 0747ffb8..ed762413 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -249,11 +249,22 @@ async fn process_object( let bucket = match last_bucket.take() { Some(b) if b.id == object.bucket_id => b, - _ => garage - .bucket_table - .get(&EmptyKey, &object.bucket_id) - .await? - .ok_or_message("object in non-existent bucket")?, + _ => { + match garage + .bucket_table + .get(&EmptyKey, &object.bucket_id) + .await? + { + Some(b) => b, + None => { + warn!( + "Lifecycle worker: object in non-existent bucket {:?}", + object.bucket_id + ); + return Ok(Skip::SkipBucket); + } + } + } }; let lifecycle_policy: &[LifecycleRule] = bucket -- cgit v1.2.3 From 8e0c020bb95a05ea657fa75cf19f8e125d9c602d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 11:45:19 +0200 Subject: lifecycle worker: correct small clippy lints --- src/model/s3/lifecycle_worker.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index ed762413..4734742d 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -7,7 +7,7 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::{Error, OkOrMessage}; +use garage_util::error::Error; use garage_util::persister::PersisterShared; use garage_util::time::*; @@ -305,7 +305,7 @@ async fn process_object( (now_date - version_date) >= chrono::Duration::days(*n_days as i64) } LifecycleExpiration::AtDate(exp_date) => { - if let Ok(exp_date) = parse_lifecycle_date(&exp_date) { + if let Ok(exp_date) = parse_lifecycle_date(exp_date) { now_date >= exp_date } else { warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date); @@ -391,7 +391,7 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) return false; } } - return true; + true } fn midnight_ts(date: NaiveDate) -> u64 { -- cgit v1.2.3 From 4b4f2000f45a83b4dad3f2a8fd8392a245a30286 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 6 Sep 2023 16:34:07 +0200 Subject: lifecycle: fix SkipBucket bug --- src/model/s3/lifecycle_worker.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/model') diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 4734742d..42e661eb 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -192,11 +192,12 @@ impl Worker for LifecycleWorker { *counter += 1; if skip == Skip::SkipBucket { let bucket_id_len = object.bucket_id.as_slice().len(); - assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice())); - *pos = std::cmp::max( - next_pos, - [&pos[..bucket_id_len], &[0xFFu8][..]].concat(), + assert_eq!( + next_pos.get(..bucket_id_len), + Some(object.bucket_id.as_slice()) ); + let last_bucket_pos = [&next_pos[..bucket_id_len], &[0xFFu8][..]].concat(); + *pos = std::cmp::max(next_pos, last_bucket_pos); } else { *pos = next_pos; } -- cgit v1.2.3 From 71c0188055e25aa1c00d0226f0ca99ce323310a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 4 Sep 2023 14:49:49 +0200 Subject: block manager: skeleton for multi-hdd support --- src/model/garage.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 981430fb..d6eebfb0 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -92,8 +92,22 @@ impl Garage { // Create meta dir and data dir if they don't exist already std::fs::create_dir_all(&config.metadata_dir) .ok_or_message("Unable to create Garage metadata directory")?; - std::fs::create_dir_all(&config.data_dir) - .ok_or_message("Unable to create Garage data directory")?; + match &config.data_dir { + DataDirEnum::Single(data_dir) => { + std::fs::create_dir_all(data_dir).ok_or_message(format!( + "Unable to create Garage data directory: {}", + data_dir.to_string_lossy() + ))?; + } + DataDirEnum::Multiple(data_dirs) => { + for dir in data_dirs { + std::fs::create_dir_all(&dir.path).ok_or_message(format!( + "Unable to create Garage data directory: {}", + dir.path.to_string_lossy() + ))?; + } + } + } info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); -- cgit v1.2.3 From 93114a9747cefa441ebd274f206c09699d051b39 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:39:21 +0200 Subject: block manager: refactoring --- src/model/garage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index d6eebfb0..721d5e3a 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -251,7 +251,7 @@ impl Garage { config.compression_level, data_rep_param, system.clone(), - ); + )?; block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- -- cgit v1.2.3 From f97168f80567f43e15cf236092703e6ae5d8dc2e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Sep 2023 15:32:25 +0200 Subject: garage_db: refactor transactions and add on_commit mechanism --- src/model/index_counter.rs | 4 ++-- src/model/s3/lifecycle_worker.rs | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) (limited to 'src/model') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 35d6596d..a46c165f 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -294,7 +294,7 @@ impl IndexCounter { let counter_entry = local_counter.into_counter_entry(self.this_node); self.local_counter .db() - .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; + .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?; next_start = Some(local_counter_k); } @@ -360,7 +360,7 @@ impl IndexCounter { let counter_entry = local_counter.into_counter_entry(self.this_node); self.local_counter .db() - .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; + .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?; next_start = Some(counted_entry_k); } diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 42e661eb..50d4283f 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -330,9 +330,7 @@ async fn process_object( "Lifecycle: expiring 1 object in bucket {:?}", object.bucket_id ); - db.transaction(|mut tx| { - garage.object_table.queue_insert(&mut tx, &deleted_object) - })?; + db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?; *objects_expired += 1; } } @@ -365,9 +363,7 @@ async fn process_object( ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); - db.transaction(|mut tx| { - garage.object_table.queue_insert(&mut tx, &aborted_object) - })?; + db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?; *mpu_aborted += n_aborted; } } -- cgit v1.2.3 From 897cbf2c27e4477dde51eee5b73fa3267b4c7098 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Sep 2023 13:12:41 +0200 Subject: actually update rmp-serde to 1.1.2 for both garage and netapp dependency (fix #629) --- src/model/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index bb3e2b11..7db7f15d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -39,7 +39,7 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -netapp = "0.5" +netapp = "0.10" [features] default = [ "sled", "lmdb", "sqlite" ] -- cgit v1.2.3 From 952c9570c494468643353ee1ae9052b510353665 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Oct 2023 14:08:11 +0200 Subject: bump version to v0.9.0 --- src/model/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 7db7f15d..42b7ffdb 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.8.4" +version = "0.9.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" -- cgit v1.2.3