diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 7 | ||||
-rw-r--r-- | src/garage/admin.rs | 77 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 47 | ||||
-rw-r--r-- | src/garage/main.rs | 2 | ||||
-rw-r--r-- | src/garage/server.rs | 36 | ||||
-rw-r--r-- | src/garage/tracing_setup.rs | 37 |
6 files changed, 90 insertions, 116 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 3b69d7bc..902f67f8 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -27,10 +27,8 @@ garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } -garage_admin = { version = "0.7.0", path = "../admin" } bytes = "1.0" -git-version = "0.3.4" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" @@ -54,6 +52,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi #netapp = { version = "0.4", path = "../../../netapp" } netapp = "0.4" +opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } +opentelemetry-prometheus = "0.10" +opentelemetry-otlp = "0.10" +prometheus = "0.13" + [dev-dependencies] aws-sdk-s3 = "0.8" chrono = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index af0c3f22..bc1f494a 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -22,7 +22,6 @@ use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; use garage_model::permission::*; -use garage_model::s3::object_table::ObjectFilter; use crate::cli::*; use crate::repair::Repair; @@ -213,18 +212,7 @@ impl AdminRpcHandler { } // Check bucket is empty - let objects = self - .garage - .object_table - .get_range( - &bucket_id, - None, - Some(ObjectFilter::IsData), - 10, - EnumerationOrder::Forward, - ) - .await?; - if !objects.is_empty() { + if !helper.is_bucket_empty(bucket_id).await? { return Err(Error::BadRequest(format!( "Bucket {} is not empty", query.name @@ -261,6 +249,7 @@ impl AdminRpcHandler { async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); let bucket_id = helper .resolve_global_bucket_name(&query.existing_bucket) @@ -268,7 +257,7 @@ impl AdminRpcHandler { .ok_or_bad_request("Bucket not found")?; if let Some(key_pattern) = &query.local { - let key = helper.get_existing_matching_key(key_pattern).await?; + let key = key_helper.get_existing_matching_key(key_pattern).await?; helper .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name) @@ -290,9 +279,10 @@ impl AdminRpcHandler { async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); if let Some(key_pattern) = &query.local { - let key = helper.get_existing_matching_key(key_pattern).await?; + let key = key_helper.get_existing_matching_key(key_pattern).await?; let bucket_id = key .state @@ -331,12 +321,15 @@ impl AdminRpcHandler { async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); let bucket_id = helper .resolve_global_bucket_name(&query.bucket) .await? .ok_or_bad_request("Bucket not found")?; - let key = helper.get_existing_matching_key(&query.key_pattern).await?; + let key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; let allow_read = query.read || key.allow_read(&bucket_id); let allow_write = query.write || key.allow_write(&bucket_id); @@ -363,12 +356,15 @@ impl AdminRpcHandler { async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> { let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); let bucket_id = helper .resolve_global_bucket_name(&query.bucket) .await? .ok_or_bad_request("Bucket not found")?; - let key = helper.get_existing_matching_key(&query.key_pattern).await?; + let key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; let allow_read = !query.read && key.allow_read(&bucket_id); let allow_write = !query.write && key.allow_write(&bucket_id); @@ -469,7 +465,7 @@ impl AdminRpcHandler { async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> { let key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; self.key_info_result(key).await @@ -484,7 +480,7 @@ impl AdminRpcHandler { async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> { let mut key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; key.params_mut() @@ -496,9 +492,11 @@ impl AdminRpcHandler { } async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> { - let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); - let mut key = helper.get_existing_matching_key(&query.key_pattern).await?; + let mut key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; if !query.yes { return Err(Error::BadRequest( @@ -506,32 +504,7 @@ impl AdminRpcHandler { )); } - let state = key.state.as_option_mut().unwrap(); - - // --- done checking, now commit --- - // (the step at unset_local_bucket_alias will fail if a bucket - // does not have another alias, the deletion will be - // interrupted in the middle if that happens) - - // 1. Delete local aliases - for (alias, _, to) in state.local_aliases.items().iter() { - if let Some(bucket_id) = to { - helper - .unset_local_bucket_alias(*bucket_id, &key.key_id, alias) - .await?; - } - } - - // 2. Remove permissions on all authorized buckets - for (ab_id, _auth) in state.authorized_buckets.items().iter() { - helper - .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS) - .await?; - } - - // 3. Actually delete key - key.state = Deletable::delete(); - self.garage.key_table.insert(&key).await?; + key_helper.delete_key(&mut key).await?; Ok(AdminRpc::Ok(format!( "Key {} was deleted successfully.", @@ -542,7 +515,7 @@ impl AdminRpcHandler { async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> { let mut key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; if query.create_bucket { @@ -555,7 +528,7 @@ impl AdminRpcHandler { async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> { let mut key = self .garage - .bucket_helper() + .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; if query.create_bucket { @@ -696,11 +669,7 @@ impl AdminRpcHandler { writeln!( &mut ret, "\nGarage version: {}", - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - )) + self.garage.system.garage_version(), ) .unwrap(); diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 0247c32b..db0af57c 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,5 +1,4 @@ use garage_util::crdt::Crdt; -use garage_util::data::*; use garage_util::error::*; use garage_util::formater::format_table; @@ -212,31 +211,9 @@ pub async fn cmd_apply_layout( rpc_host: NodeID, apply_opt: ApplyLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match apply_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.roles.merge(&layout.staging); - - if !layout.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); - - layout.version += 1; + let layout = layout.apply_staged_changes(apply_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -251,25 +228,9 @@ pub async fn cmd_revert_layout( rpc_host: NodeID, revert_opt: RevertLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match revert_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.version += 1; + let layout = layout.revert_staged_changes(revert_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; diff --git a/src/garage/main.rs b/src/garage/main.rs index e898e680..bd09b6ea 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -8,6 +8,7 @@ mod admin; mod cli; mod repair; mod server; +mod tracing_setup; use std::net::SocketAddr; use std::path::PathBuf; @@ -141,6 +142,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await { Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))), Err(HelperError::BadRequest(b)) => Err(Error::Message(b)), + Err(e) => Err(Error::Message(format!("{}", e))), Ok(x) => Ok(x), } } diff --git a/src/garage/server.rs b/src/garage/server.rs index 24bb25b3..b58ad286 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -6,8 +6,7 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_admin::metrics::*; -use garage_admin::tracing_setup::*; +use garage_api::admin::api_server::AdminApiServer; use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::run_web_server; @@ -16,6 +15,7 @@ use garage_web::run_web_server; use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; +use crate::tracing_setup::*; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { @@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .open() .expect("Unable to open sled DB"); - info!("Initialize admin web server and metric backend..."); - let admin_server_init = AdminServer::init(); - info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); @@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { init_tracing(&export_to, garage.system.id)?; } + info!("Initialize Admin API server and metrics collector..."); + let admin_server = AdminApiServer::new(garage.clone()); + let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); @@ -80,39 +80,41 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { wait_from(watch_cancel.clone()), )); - let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr { - info!("Configure and run admin web server..."); - Some(tokio::spawn( - admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())), - )) - } else { - None - }; + info!("Launching Admin API server..."); + let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone()))); // Stuff runs // When a cancel signal is sent, stuff stops if let Err(e) = s3_api_server.await? { warn!("S3 API server exited with error: {}", e); + } else { + info!("S3 API server exited without error."); } #[cfg(feature = "k2v")] if let Err(e) = k2v_api_server.await? { warn!("K2V API server exited with error: {}", e); + } else { + info!("K2V API server exited without error."); } if let Err(e) = web_server.await? { warn!("Web server exited with error: {}", e); + } else { + info!("Web server exited without error."); } - if let Some(a) = admin_server { - if let Err(e) = a.await? { - warn!("Admin web server exited with error: {}", e); - } + if let Err(e) = admin_server.await? { + warn!("Admin web server exited with error: {}", e); + } else { + info!("Admin API server exited without error."); } // Remove RPC handlers for system to break reference cycles garage.system.netapp.drop_all_handlers(); + opentelemetry::global::shutdown_tracer_provider(); // Await for netapp RPC system to end run_system.await?; + info!("Netapp exited"); // Drop all references so that stuff can terminate properly drop(garage); diff --git a/src/garage/tracing_setup.rs b/src/garage/tracing_setup.rs new file mode 100644 index 00000000..55fc4094 --- /dev/null +++ b/src/garage/tracing_setup.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use opentelemetry::sdk::{ + trace::{self, IdGenerator, Sampler}, + Resource, +}; +use opentelemetry::KeyValue; +use opentelemetry_otlp::WithExportConfig; + +use garage_util::data::*; +use garage_util::error::*; + +pub fn init_tracing(export_to: &str, node_id: Uuid) -> Result<(), Error> { + let node_id = hex::encode(&node_id.as_slice()[..8]); + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(export_to) + .with_timeout(Duration::from_secs(3)), + ) + .with_trace_config( + trace::config() + .with_id_generator(IdGenerator::default()) + .with_sampler(Sampler::AlwaysOn) + .with_resource(Resource::new(vec![ + KeyValue::new("service.name", "garage"), + KeyValue::new("service.instance.id", node_id), + ])), + ) + .install_batch(opentelemetry::runtime::Tokio) + .ok_or_message("Unable to initialize tracing")?; + + Ok(()) +} |