diff options
author | Alex Auvolat <alex@adnab.me> | 2022-11-04 11:55:59 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-11-04 11:55:59 +0100 |
commit | 5b18fd8201af98798f0a47a95dfec2f3ab9777f8 (patch) | |
tree | 35693b8d63103e23673dccd91100d8fb6963c6b9 /src | |
parent | 043246c575d1ae85c7f375ef577b8fef6940a6d5 (diff) | |
download | garage-5b18fd8201af98798f0a47a95dfec2f3ab9777f8.tar.gz garage-5b18fd8201af98798f0a47a95dfec2f3ab9777f8.zip |
Add garage bucket cleanup-incomplete-uploads commandcleanup-uploads-command
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/admin.rs | 39 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 15 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 69 |
4 files changed, 123 insertions, 1 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index cbc0dc61..35caf2c1 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -33,6 +33,7 @@ garage_web = { version = "0.8.0", path = "../web" } bytes = "1.0" bytesize = "1.1" timeago = "0.3" +parse_duration = "2.1" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 802a8261..c21286e5 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -85,6 +85,9 @@ impl AdminRpcHandler { BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, BucketOperation::Website(query) => self.handle_bucket_website(query).await, BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, + BucketOperation::CleanupIncompleteUploads(query) => { + self.handle_bucket_cleanup_incomplete_uploads(query).await + } } } @@ -512,6 +515,42 @@ impl AdminRpcHandler { ))) } + async fn handle_bucket_cleanup_incomplete_uploads( + &self, + query: &CleanupIncompleteUploadsOpt, + ) -> Result<AdminRpc, Error> { + let mut bucket_ids = vec![]; + for b in query.buckets.iter() { + bucket_ids.push( + self.garage + .bucket_helper() + .resolve_global_bucket_name(b) + .await? + .ok_or_bad_request("Bucket not found")?, + ); + } + + let duration = parse_duration::parse::parse(&query.older_than) + .ok_or_bad_request("Invalid duration")?; + + let mut ret = String::new(); + for bucket in bucket_ids { + let count = self + .garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket, duration) + .await?; + writeln!( + &mut ret, + "Bucket {:?}: {} incomplete uploads aborted", + bucket, count + ) + .unwrap(); + } + + Ok(AdminRpc::Ok(ret)) + } + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => self.handle_list_keys().await, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 06548e89..cb085813 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -189,6 +189,10 @@ pub enum BucketOperation { /// Set the quotas for this bucket #[structopt(name = "set-quotas", version = garage_version())] SetQuotas(SetQuotasOpt), + + /// Clean up (abort) old incomplete multipart uploads + #[structopt(name = "cleanup-incomplete-uploads", version = garage_version())] + CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } #[derive(Serialize, Deserialize, StructOpt, Debug)] @@ -291,6 +295,17 @@ pub struct SetQuotasOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct CleanupIncompleteUploadsOpt { + /// Abort multipart uploads older than this value + #[structopt(long = "older-than", default_value = "1d")] + pub older_than: String, + + /// Name of bucket(s) to clean up + #[structopt(required = true)] + pub buckets: Vec<String>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 130ba5be..4a488d7f 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; @@ -12,7 +14,7 @@ use crate::helper::error::*; use crate::helper::key::KeyHelper; use crate::key_table::*; use crate::permission::BucketKeyPerm; -use crate::s3::object_table::ObjectFilter; +use crate::s3::object_table::*; pub struct BucketHelper<'a>(pub(crate) &'a Garage); @@ -472,4 +474,69 @@ impl<'a> BucketHelper<'a> { Ok(true) } + + // ---- + + /// Deletes all incomplete multipart uploads that are older than a certain time. + /// Returns the number of uploads aborted + pub async fn cleanup_incomplete_uploads( + &self, + bucket_id: &Uuid, + older_than: Duration, + ) -> Result<usize, Error> { + let older_than = now_msec() - older_than.as_millis() as u64; + + let mut ret = 0usize; + let mut start = None; + + loop { + let objects = self + .0 + .object_table + .get_range( + bucket_id, + start, + Some(ObjectFilter::IsUploading), + 1000, + EnumerationOrder::Forward, + ) + .await?; + + let abortions = objects + .iter() + .filter_map(|object| { + let aborted_versions = object + .versions() + .iter() + .filter(|v| v.is_uploading() && v.timestamp < older_than) + .map(|v| ObjectVersion { + state: ObjectVersionState::Aborted, + uuid: v.uuid, + timestamp: v.timestamp, + }) + .collect::<Vec<_>>(); + if !aborted_versions.is_empty() { + Some(Object::new( + object.bucket_id, + object.key.clone(), + aborted_versions, + )) + } else { + None + } + }) + .collect::<Vec<_>>(); + + ret += abortions.len(); + self.0.object_table.insert_many(abortions).await?; + + if objects.len() < 1000 { + break; + } else { + start = Some(objects.last().unwrap().key.clone()); + } + } + + Ok(ret) + } } |