diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 2 | ||||
-rw-r--r-- | src/garage/admin.rs | 39 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 15 | ||||
-rw-r--r-- | src/garage/main.rs | 56 |
4 files changed, 96 insertions, 16 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index cbc0dc61..69852db7 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -30,9 +30,11 @@ garage_table = { version = "0.8.0", path = "../table" } garage_util = { version = "0.8.0", path = "../util" } garage_web = { version = "0.8.0", path = "../web" } +backtrace = "0.3" bytes = "1.0" bytesize = "1.1" timeago = "0.3" +parse_duration = "2.1" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 802a8261..e973cfe7 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -85,6 +85,9 @@ impl AdminRpcHandler { BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, BucketOperation::Website(query) => self.handle_bucket_website(query).await, BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, + BucketOperation::CleanupIncompleteUploads(query) => { + self.handle_bucket_cleanup_incomplete_uploads(query).await + } } } @@ -512,6 +515,42 @@ impl AdminRpcHandler { ))) } + async fn handle_bucket_cleanup_incomplete_uploads( + &self, + query: &CleanupIncompleteUploadsOpt, + ) -> Result<AdminRpc, Error> { + let mut bucket_ids = vec![]; + for b in query.buckets.iter() { + bucket_ids.push( + self.garage + .bucket_helper() + .resolve_global_bucket_name(b) + .await? + .ok_or_bad_request(format!("Bucket not found: {}", b))?, + ); + } + + let duration = parse_duration::parse::parse(&query.older_than) + .ok_or_bad_request("Invalid duration passed for --older-than parameter")?; + + let mut ret = String::new(); + for bucket in bucket_ids { + let count = self + .garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket, duration) + .await?; + writeln!( + &mut ret, + "Bucket {:?}: {} incomplete uploads aborted", + bucket, count + ) + .unwrap(); + } + + Ok(AdminRpc::Ok(ret)) + } + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => self.handle_list_keys().await, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 06548e89..cb085813 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -189,6 +189,10 @@ pub enum BucketOperation { /// Set the quotas for this bucket #[structopt(name = "set-quotas", version = garage_version())] SetQuotas(SetQuotasOpt), + + /// Clean up (abort) old incomplete multipart uploads + #[structopt(name = "cleanup-incomplete-uploads", version = garage_version())] + CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } #[derive(Serialize, Deserialize, StructOpt, Debug)] @@ -291,6 +295,17 @@ pub struct SetQuotasOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct CleanupIncompleteUploadsOpt { + /// Abort multipart uploads older than this value + #[structopt(long = "older-than", default_value = "1d")] + pub older_than: String, + + /// Name of bucket(s) to clean up + #[structopt(required = true)] + pub buckets: Vec<String>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] diff --git a/src/garage/main.rs b/src/garage/main.rs index 5b2a85c0..edda734b 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -65,21 +65,6 @@ struct Opt { #[tokio::main] async fn main() { - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "netapp=info,garage=info") - } - tracing_subscriber::fmt() - .with_writer(std::io::stderr) - .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env()) - .init(); - sodiumoxide::init().expect("Unable to init sodiumoxide"); - - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - // Initialize version and features info let features = &[ #[cfg(feature = "k2v")] @@ -108,12 +93,51 @@ async fn main() { } garage_util::version::init_features(features); - // Parse arguments let version = format!( "{} [features: {}]", garage_util::version::garage_version(), features.join(", ") ); + + // Initialize panic handler that aborts on panic and shows a nice message. + // By default, Tokio continues runing normally when a task panics. We want + // to avoid this behavior in Garage as this would risk putting the process in an + // unknown/uncontrollable state. We prefer to exit the process and restart it + // from scratch, so that it boots back into a fresh, known state. + let panic_version_info = version.clone(); + std::panic::set_hook(Box::new(move |panic_info| { + eprintln!("======== PANIC (internal Garage error) ========"); + eprintln!("{}", panic_info); + eprintln!(); + eprintln!("Panics are internal errors that Garage is unable to handle on its own."); + eprintln!("They can be caused by bugs in Garage's code, or by corrupted data in"); + eprintln!("the node's storage. If you feel that this error is likely to be a bug"); + eprintln!("in Garage, please report it on our issue tracker a the following address:"); + eprintln!(); + eprintln!(" https://git.deuxfleurs.fr/Deuxfleurs/garage/issues"); + eprintln!(); + eprintln!("Please include the last log messages and the the full backtrace below in"); + eprintln!("your bug report, as well as any relevant information on the context in"); + eprintln!("which Garage was running when this error occurred."); + eprintln!(); + eprintln!("GARAGE VERSION: {}", panic_version_info); + eprintln!(); + eprintln!("BACKTRACE:"); + eprintln!("{:?}", backtrace::Backtrace::new()); + std::process::abort(); + })); + + // Initialize logging as well as other libraries used in Garage + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "netapp=info,garage=info") + } + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env()) + .init(); + sodiumoxide::init().expect("Unable to init sodiumoxide"); + + // Parse arguments and dispatch command line let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches()); let res = match opt.cmd { |