aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml7
-rw-r--r--src/garage/admin.rs77
-rw-r--r--src/garage/cli/layout.rs47
-rw-r--r--src/garage/main.rs2
-rw-r--r--src/garage/server.rs36
-rw-r--r--src/garage/tracing_setup.rs37
6 files changed, 90 insertions, 116 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 3b69d7bc..902f67f8 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -27,10 +27,8 @@ garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
-garage_admin = { version = "0.7.0", path = "../admin" }
bytes = "1.0"
-git-version = "0.3.4"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
@@ -54,6 +52,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
+opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
+opentelemetry-prometheus = "0.10"
+opentelemetry-otlp = "0.10"
+prometheus = "0.13"
+
[dev-dependencies]
aws-sdk-s3 = "0.8"
chrono = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index af0c3f22..bc1f494a 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -22,7 +22,6 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
-use garage_model::s3::object_table::ObjectFilter;
use crate::cli::*;
use crate::repair::Repair;
@@ -213,18 +212,7 @@ impl AdminRpcHandler {
}
// Check bucket is empty
- let objects = self
- .garage
- .object_table
- .get_range(
- &bucket_id,
- None,
- Some(ObjectFilter::IsData),
- 10,
- EnumerationOrder::Forward,
- )
- .await?;
- if !objects.is_empty() {
+ if !helper.is_bucket_empty(bucket_id).await? {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name
@@ -261,6 +249,7 @@ impl AdminRpcHandler {
async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.existing_bucket)
@@ -268,7 +257,7 @@ impl AdminRpcHandler {
.ok_or_bad_request("Bucket not found")?;
if let Some(key_pattern) = &query.local {
- let key = helper.get_existing_matching_key(key_pattern).await?;
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
helper
.set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
@@ -290,9 +279,10 @@ impl AdminRpcHandler {
async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
if let Some(key_pattern) = &query.local {
- let key = helper.get_existing_matching_key(key_pattern).await?;
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
let bucket_id = key
.state
@@ -331,12 +321,15 @@ impl AdminRpcHandler {
async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
- let key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
let allow_read = query.read || key.allow_read(&bucket_id);
let allow_write = query.write || key.allow_write(&bucket_id);
@@ -363,12 +356,15 @@ impl AdminRpcHandler {
async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
- let key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
let allow_read = !query.read && key.allow_read(&bucket_id);
let allow_write = !query.write && key.allow_write(&bucket_id);
@@ -469,7 +465,7 @@ impl AdminRpcHandler {
async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
let key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
self.key_info_result(key).await
@@ -484,7 +480,7 @@ impl AdminRpcHandler {
async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
key.params_mut()
@@ -496,9 +492,11 @@ impl AdminRpcHandler {
}
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
- let mut key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
if !query.yes {
return Err(Error::BadRequest(
@@ -506,32 +504,7 @@ impl AdminRpcHandler {
));
}
- let state = key.state.as_option_mut().unwrap();
-
- // --- done checking, now commit ---
- // (the step at unset_local_bucket_alias will fail if a bucket
- // does not have another alias, the deletion will be
- // interrupted in the middle if that happens)
-
- // 1. Delete local aliases
- for (alias, _, to) in state.local_aliases.items().iter() {
- if let Some(bucket_id) = to {
- helper
- .unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
- .await?;
- }
- }
-
- // 2. Remove permissions on all authorized buckets
- for (ab_id, _auth) in state.authorized_buckets.items().iter() {
- helper
- .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
- }
-
- // 3. Actually delete key
- key.state = Deletable::delete();
- self.garage.key_table.insert(&key).await?;
+ key_helper.delete_key(&mut key).await?;
Ok(AdminRpc::Ok(format!(
"Key {} was deleted successfully.",
@@ -542,7 +515,7 @@ impl AdminRpcHandler {
async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
@@ -555,7 +528,7 @@ impl AdminRpcHandler {
async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
@@ -696,11 +669,7 @@ impl AdminRpcHandler {
writeln!(
&mut ret,
"\nGarage version: {}",
- option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
- prefix = "git:",
- cargo_prefix = "cargo:",
- fallback = "unknown"
- ))
+ self.garage.system.garage_version(),
)
.unwrap();
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 0247c32b..db0af57c 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,5 +1,4 @@
use garage_util::crdt::Crdt;
-use garage_util::data::*;
use garage_util::error::*;
use garage_util::formater::format_table;
@@ -212,31 +211,9 @@ pub async fn cmd_apply_layout(
rpc_host: NodeID,
apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match apply_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.roles.merge(&layout.staging);
-
- if !layout.calculate_partition_assignation() {
- return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
- }
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
-
- layout.version += 1;
+ let layout = layout.apply_staged_changes(apply_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -251,25 +228,9 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match revert_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.version += 1;
+ let layout = layout.revert_staged_changes(revert_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e898e680..bd09b6ea 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -8,6 +8,7 @@ mod admin;
mod cli;
mod repair;
mod server;
+mod tracing_setup;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -141,6 +142,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await {
Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))),
Err(HelperError::BadRequest(b)) => Err(Error::Message(b)),
+ Err(e) => Err(Error::Message(format!("{}", e))),
Ok(x) => Ok(x),
}
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 24bb25b3..b58ad286 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,8 +6,7 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_admin::metrics::*;
-use garage_admin::tracing_setup::*;
+use garage_api::admin::api_server::AdminApiServer;
use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::run_web_server;
@@ -16,6 +15,7 @@ use garage_web::run_web_server;
use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
+use crate::tracing_setup::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
- info!("Initialize admin web server and metric backend...");
- let admin_server_init = AdminServer::init();
-
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
init_tracing(&export_to, garage.system.id)?;
}
+ info!("Initialize Admin API server and metrics collector...");
+ let admin_server = AdminApiServer::new(garage.clone());
+
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
@@ -80,39 +80,41 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
wait_from(watch_cancel.clone()),
));
- let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr {
- info!("Configure and run admin web server...");
- Some(tokio::spawn(
- admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())),
- ))
- } else {
- None
- };
+ info!("Launching Admin API server...");
+ let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone())));
// Stuff runs
// When a cancel signal is sent, stuff stops
if let Err(e) = s3_api_server.await? {
warn!("S3 API server exited with error: {}", e);
+ } else {
+ info!("S3 API server exited without error.");
}
#[cfg(feature = "k2v")]
if let Err(e) = k2v_api_server.await? {
warn!("K2V API server exited with error: {}", e);
+ } else {
+ info!("K2V API server exited without error.");
}
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
+ } else {
+ info!("Web server exited without error.");
}
- if let Some(a) = admin_server {
- if let Err(e) = a.await? {
- warn!("Admin web server exited with error: {}", e);
- }
+ if let Err(e) = admin_server.await? {
+ warn!("Admin web server exited with error: {}", e);
+ } else {
+ info!("Admin API server exited without error.");
}
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();
+ opentelemetry::global::shutdown_tracer_provider();
// Await for netapp RPC system to end
run_system.await?;
+ info!("Netapp exited");
// Drop all references so that stuff can terminate properly
drop(garage);
diff --git a/src/garage/tracing_setup.rs b/src/garage/tracing_setup.rs
new file mode 100644
index 00000000..55fc4094
--- /dev/null
+++ b/src/garage/tracing_setup.rs
@@ -0,0 +1,37 @@
+use std::time::Duration;
+
+use opentelemetry::sdk::{
+ trace::{self, IdGenerator, Sampler},
+ Resource,
+};
+use opentelemetry::KeyValue;
+use opentelemetry_otlp::WithExportConfig;
+
+use garage_util::data::*;
+use garage_util::error::*;
+
+pub fn init_tracing(export_to: &str, node_id: Uuid) -> Result<(), Error> {
+ let node_id = hex::encode(&node_id.as_slice()[..8]);
+
+ opentelemetry_otlp::new_pipeline()
+ .tracing()
+ .with_exporter(
+ opentelemetry_otlp::new_exporter()
+ .tonic()
+ .with_endpoint(export_to)
+ .with_timeout(Duration::from_secs(3)),
+ )
+ .with_trace_config(
+ trace::config()
+ .with_id_generator(IdGenerator::default())
+ .with_sampler(Sampler::AlwaysOn)
+ .with_resource(Resource::new(vec![
+ KeyValue::new("service.name", "garage"),
+ KeyValue::new("service.instance.id", node_id),
+ ])),
+ )
+ .install_batch(opentelemetry::runtime::Tokio)
+ .ok_or_message("Unable to initialize tracing")?;
+
+ Ok(())
+}