diff options
-rw-r--r-- | Cargo.lock | 70 | ||||
-rw-r--r-- | Cargo.nix | 105 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/admin.rs | 39 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 15 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 69 |
6 files changed, 297 insertions, 2 deletions
@@ -1046,6 +1046,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry-prometheus", + "parse_duration", "prometheus", "rand 0.8.5", "rmp-serde", @@ -2113,6 +2114,41 @@ dependencies = [ ] [[package]] +name = "num" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg 1.1.0", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" +dependencies = [ + "autocfg 1.1.0", + "num-traits", +] + +[[package]] name = "num-integer" version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2123,6 +2159,29 @@ dependencies = [ ] [[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg 1.1.0", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" +dependencies = [ + "autocfg 1.1.0", + "num-bigint", + "num-integer", + "num-traits", +] + +[[package]] name = "num-traits" version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2304,6 +2363,17 @@ dependencies = [ ] [[package]] +name = "parse_duration" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d" +dependencies = [ + "lazy_static", + "num", + "regex", +] + +[[package]] name = "pem" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "45b04ec226341a714068633cf7a3a6b421aa70ac3f78c72931d43ab8301b3c7b"; + nixifiedLockHash = "05d37ca8de8610a5fdcd202dc216af648542d780536ddcbb2492ad0c513267ef"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1496,6 +1496,7 @@ in opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" then "opentelemetry_otlp" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-otlp."0.10.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage/opentelemetry-prometheus" then "opentelemetry_prometheus" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-prometheus."0.10.0" { inherit profileName; }).out; + parse_duration = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage/prometheus" then "prometheus" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".prometheus."0.13.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; @@ -2954,6 +2955,59 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".num."0.2.1" = overridableMkRustCrate (profileName: rec { + name = "num"; + version = "0.2.1"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"; }; + features = builtins.concatLists [ + [ "default" ] + [ "num-bigint" ] + [ "std" ] + ]; + dependencies = { + num_bigint = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-bigint."0.2.6" { inherit profileName; }).out; + num_complex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-complex."0.2.4" { inherit profileName; }).out; + num_integer = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-integer."0.1.44" { inherit profileName; }).out; + num_iter = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-iter."0.1.43" { inherit profileName; }).out; + num_rational = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-rational."0.2.4" { inherit profileName; }).out; + num_traits = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-traits."0.2.14" { inherit profileName; }).out; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".num-bigint."0.2.6" = overridableMkRustCrate (profileName: rec { + name = "num-bigint"; + version = "0.2.6"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + dependencies = { + num_integer = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-integer."0.1.44" { inherit profileName; }).out; + num_traits = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-traits."0.2.14" { inherit profileName; }).out; + }; + buildDependencies = { + autocfg = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" { profileName = "__noProfile"; }).out; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".num-complex."0.2.4" = overridableMkRustCrate (profileName: rec { + name = "num-complex"; + version = "0.2.4"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + dependencies = { + num_traits = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-traits."0.2.14" { inherit profileName; }).out; + }; + buildDependencies = { + autocfg = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" { profileName = "__noProfile"; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".num-integer."0.1.44" = overridableMkRustCrate (profileName: rec { name = "num-integer"; version = "0.1.44"; @@ -2971,6 +3025,43 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".num-iter."0.1.43" = overridableMkRustCrate (profileName: rec { + name = "num-iter"; + version = "0.1.43"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + dependencies = { + num_integer = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-integer."0.1.44" { inherit profileName; }).out; + num_traits = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-traits."0.2.14" { inherit profileName; }).out; + }; + buildDependencies = { + autocfg = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" { profileName = "__noProfile"; }).out; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".num-rational."0.2.4" = overridableMkRustCrate (profileName: rec { + name = "num-rational"; + version = "0.2.4"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"; }; + features = builtins.concatLists [ + [ "bigint" ] + [ "num-bigint" ] + [ "std" ] + ]; + dependencies = { + num_bigint = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-bigint."0.2.6" { inherit profileName; }).out; + num_integer = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-integer."0.1.44" { inherit profileName; }).out; + num_traits = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num-traits."0.2.14" { inherit profileName; }).out; + }; + buildDependencies = { + autocfg = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" { profileName = "__noProfile"; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".num-traits."0.2.14" = overridableMkRustCrate (profileName: rec { name = "num-traits"; version = "0.2.14"; @@ -3220,6 +3311,18 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" = overridableMkRustCrate (profileName: rec { + name = "parse_duration"; + version = "2.1.1"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"; }; + dependencies = { + lazy_static = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }).out; + num = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".num."0.2.1" { inherit profileName; }).out; + regex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".regex."1.5.5" { inherit profileName; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".pem."1.1.0" = overridableMkRustCrate (profileName: rec { name = "pem"; version = "1.1.0"; diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index cbc0dc61..35caf2c1 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -33,6 +33,7 @@ garage_web = { version = "0.8.0", path = "../web" } bytes = "1.0" bytesize = "1.1" timeago = "0.3" +parse_duration = "2.1" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 802a8261..c21286e5 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -85,6 +85,9 @@ impl AdminRpcHandler { BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, BucketOperation::Website(query) => self.handle_bucket_website(query).await, BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, + BucketOperation::CleanupIncompleteUploads(query) => { + self.handle_bucket_cleanup_incomplete_uploads(query).await + } } } @@ -512,6 +515,42 @@ impl AdminRpcHandler { ))) } + async fn handle_bucket_cleanup_incomplete_uploads( + &self, + query: &CleanupIncompleteUploadsOpt, + ) -> Result<AdminRpc, Error> { + let mut bucket_ids = vec![]; + for b in query.buckets.iter() { + bucket_ids.push( + self.garage + .bucket_helper() + .resolve_global_bucket_name(b) + .await? + .ok_or_bad_request("Bucket not found")?, + ); + } + + let duration = parse_duration::parse::parse(&query.older_than) + .ok_or_bad_request("Invalid duration")?; + + let mut ret = String::new(); + for bucket in bucket_ids { + let count = self + .garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket, duration) + .await?; + writeln!( + &mut ret, + "Bucket {:?}: {} incomplete uploads aborted", + bucket, count + ) + .unwrap(); + } + + Ok(AdminRpc::Ok(ret)) + } + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => self.handle_list_keys().await, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 06548e89..cb085813 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -189,6 +189,10 @@ pub enum BucketOperation { /// Set the quotas for this bucket #[structopt(name = "set-quotas", version = garage_version())] SetQuotas(SetQuotasOpt), + + /// Clean up (abort) old incomplete multipart uploads + #[structopt(name = "cleanup-incomplete-uploads", version = garage_version())] + CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } #[derive(Serialize, Deserialize, StructOpt, Debug)] @@ -291,6 +295,17 @@ pub struct SetQuotasOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct CleanupIncompleteUploadsOpt { + /// Abort multipart uploads older than this value + #[structopt(long = "older-than", default_value = "1d")] + pub older_than: String, + + /// Name of bucket(s) to clean up + #[structopt(required = true)] + pub buckets: Vec<String>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 130ba5be..4a488d7f 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; @@ -12,7 +14,7 @@ use crate::helper::error::*; use crate::helper::key::KeyHelper; use crate::key_table::*; use crate::permission::BucketKeyPerm; -use crate::s3::object_table::ObjectFilter; +use crate::s3::object_table::*; pub struct BucketHelper<'a>(pub(crate) &'a Garage); @@ -472,4 +474,69 @@ impl<'a> BucketHelper<'a> { Ok(true) } + + // ---- + + /// Deletes all incomplete multipart uploads that are older than a certain time. + /// Returns the number of uploads aborted + pub async fn cleanup_incomplete_uploads( + &self, + bucket_id: &Uuid, + older_than: Duration, + ) -> Result<usize, Error> { + let older_than = now_msec() - older_than.as_millis() as u64; + + let mut ret = 0usize; + let mut start = None; + + loop { + let objects = self + .0 + .object_table + .get_range( + bucket_id, + start, + Some(ObjectFilter::IsUploading), + 1000, + EnumerationOrder::Forward, + ) + .await?; + + let abortions = objects + .iter() + .filter_map(|object| { + let aborted_versions = object + .versions() + .iter() + .filter(|v| v.is_uploading() && v.timestamp < older_than) + .map(|v| ObjectVersion { + state: ObjectVersionState::Aborted, + uuid: v.uuid, + timestamp: v.timestamp, + }) + .collect::<Vec<_>>(); + if !aborted_versions.is_empty() { + Some(Object::new( + object.bucket_id, + object.key.clone(), + aborted_versions, + )) + } else { + None + } + }) + .collect::<Vec<_>>(); + + ret += abortions.len(); + self.0.object_table.insert_many(abortions).await?; + + if objects.len() < 1000 { + break; + } else { + start = Some(objects.last().unwrap().key.clone()); + } + } + + Ok(ret) + } } |