diff options
Diffstat (limited to 'src')
37 files changed, 952 insertions, 425 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 901cb959..cdfabcb8 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,11 +14,11 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.7.0", path = "../model" } -garage_table = { version = "0.7.0", path = "../table" } -garage_block = { version = "0.7.0", path = "../block" } -garage_util = { version = "0.7.0", path = "../util" } -garage_rpc = { version = "0.7.0", path = "../rpc" } +garage_model = { version = "0.8.0", path = "../model" } +garage_table = { version = "0.8.0", path = "../table" } +garage_block = { version = "0.8.0", path = "../block" } +garage_util = { version = "0.8.0", path = "../util" } +garage_rpc = { version = "0.8.0", path = "../rpc" } async-trait = "0.1.7" base64 = "0.13" @@ -54,9 +54,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] } url = "2.1" opentelemetry = "0.17" -opentelemetry-prometheus = "0.10" -opentelemetry-otlp = "0.10" -prometheus = "0.13" +opentelemetry-prometheus = { version = "0.10", optional = true } +prometheus = { version = "0.13", optional = true } [features] k2v = [ "garage_util/k2v", "garage_model/k2v" ] +metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index c3b16715..fb0078cc 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,15 +1,17 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; -use http::header::{ - ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, CONTENT_TYPE, -}; +use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{Body, Request, Response}; -use opentelemetry::trace::{SpanRef, Tracer}; +use opentelemetry::trace::SpanRef; + +#[cfg(feature = "metrics")] use opentelemetry_prometheus::PrometheusExporter; +#[cfg(feature = "metrics")] use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; @@ -25,6 +27,7 @@ use crate::admin::router::{Authorization, Endpoint}; pub struct AdminApiServer { garage: Arc<Garage>, + #[cfg(feature = "metrics")] exporter: PrometheusExporter, metrics_token: Option<String>, admin_token: Option<String>, @@ -32,7 +35,6 @@ pub struct AdminApiServer { impl AdminApiServer { pub fn new(garage: Arc<Garage>) -> Self { - let exporter = opentelemetry_prometheus::exporter().init(); let cfg = &garage.config.admin; let metrics_token = cfg .metrics_token @@ -44,21 +46,22 @@ impl AdminApiServer { .map(|tok| format!("Bearer {}", tok)); Self { garage, - exporter, + #[cfg(feature = "metrics")] + exporter: opentelemetry_prometheus::exporter().init(), metrics_token, admin_token, } } - pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> { - if let Some(bind_addr) = self.garage.config.admin.api_bind_addr { - let region = self.garage.config.s3_api.s3_region.clone(); - ApiServer::new(region, self) - .run_server(bind_addr, shutdown_signal) - .await - } else { - Ok(()) - } + pub async fn run( + self, + bind_addr: SocketAddr, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let region = self.garage.config.s3_api.s3_region.clone(); + ApiServer::new(region, self) + .run_server(bind_addr, shutdown_signal) + .await } fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { @@ -71,22 +74,31 @@ impl AdminApiServer { } fn handle_metrics(&self) -> Result<Response<Body>, Error> { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let tracer = opentelemetry::global::tracer("garage"); - let metric_families = tracer.in_span("admin/gather_metrics", |_| { - self.exporter.registry().gather() - }); - - encoder - .encode(&metric_families, &mut buffer) - .ok_or_internal_error("Could not serialize metrics")?; - - Ok(Response::builder() - .status(200) - .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer))?) + #[cfg(feature = "metrics")] + { + use opentelemetry::trace::Tracer; + + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + self.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(200) + .header(http::header::CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer))?) + } + #[cfg(not(feature = "metrics"))] + Err(Error::bad_request( + "Garage was built without the metrics feature".to_string(), + )) } } diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 4b7716a3..99c6e332 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -18,7 +18,8 @@ use crate::helpers::{json_ok_response, parse_json_body}; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), - garage_version: garage.system.garage_version(), + garage_version: garage_util::version::garage_version(), + garage_features: garage_util::version::garage_features(), db_engine: garage.db.engine(), known_nodes: garage .system @@ -99,6 +100,7 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse { struct GetClusterStatusResponse { node: String, garage_version: &'static str, + garage_features: Option<&'static [&'static str]>, db_engine: String, known_nodes: HashMap<String, KnownNodeResp>, layout: GetClusterLayoutResponse, diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index eb0fbdd7..084867b5 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -36,20 +37,13 @@ pub(crate) struct K2VApiEndpoint { impl K2VApiServer { pub async fn run( garage: Arc<Garage>, + bind_addr: SocketAddr, + s3_region: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { - if let Some(cfg) = &garage.config.k2v_api { - let bind_addr = cfg.api_bind_addr; - - ApiServer::new( - garage.config.s3_api.s3_region.clone(), - K2VApiServer { garage }, - ) + ApiServer::new(s3_region, K2VApiServer { garage }) .run_server(bind_addr, shutdown_signal) .await - } else { - Ok(()) - } } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 78dfeeac..27837297 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -43,16 +44,13 @@ pub(crate) struct S3ApiEndpoint { impl S3ApiServer { pub async fn run( garage: Arc<Garage>, + addr: SocketAddr, + s3_region: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { - let addr = garage.config.s3_api.api_bind_addr; - - ApiServer::new( - garage.config.s3_api.s3_region.clone(), - S3ApiServer { garage }, - ) - .run_server(addr, shutdown_signal) - .await + ApiServer::new(s3_region, S3ApiServer { garage }) + .run_server(addr, shutdown_signal) + .await } async fn handle_request_without_bucket( diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 3e6f7bc0..cd409001 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_block" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -15,9 +15,9 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_util = { version = "0.7.0", path = "../util" } -garage_table = { version = "0.7.0", path = "../table" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_util = { version = "0.8.0", path = "../util" } +garage_table = { version = "0.8.0", path = "../table" } opentelemetry = "0.17" @@ -39,3 +39,6 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio-util = { version = "0.6", features = ["io"] } + +[features] +system-libs = [ "zstd/pkg-config" ] diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index f697054b..62dda2ca 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -21,9 +21,9 @@ err-derive = "0.3" hexdump = "0.1" tracing = "0.1.30" -heed = "0.11" -rusqlite = { version = "0.27", features = ["bundled"] } -sled = "0.34" +heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true } +rusqlite = { version = "0.27", optional = true } +sled = { version = "0.34", optional = true } # cli deps clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } @@ -33,4 +33,7 @@ pretty_env_logger = { version = "0.4", optional = true } mktemp = "0.4" [features] +bundled-libs = [ "rusqlite/bundled" ] cli = ["clap", "pretty_env_logger"] +lmdb = [ "heed" ] +sqlite = [ "rusqlite" ] diff --git a/src/db/lib.rs b/src/db/lib.rs index f185114e..d96586be 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -1,8 +1,15 @@ #[macro_use] +#[cfg(feature = "sqlite")] extern crate tracing; +#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))] +compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); + +#[cfg(feature = "lmdb")] pub mod lmdb_adapter; +#[cfg(feature = "sled")] pub mod sled_adapter; +#[cfg(feature = "sqlite")] pub mod sqlite_adapter; pub mod counted_tree_hack; diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 31aa270d..dcb3b78e 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -22,13 +22,13 @@ path = "tests/lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } -garage_api = { version = "0.7.0", path = "../api" } -garage_block = { version = "0.7.0", path = "../block" } -garage_model = { version = "0.7.0", path = "../model" } -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_table = { version = "0.7.0", path = "../table" } -garage_util = { version = "0.7.0", path = "../util" } -garage_web = { version = "0.7.0", path = "../web" } +garage_api = { version = "0.8.0", path = "../api" } +garage_block = { version = "0.8.0", path = "../block" } +garage_model = { version = "0.8.0", path = "../model" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_table = { version = "0.8.0", path = "../table" } +garage_util = { version = "0.8.0", path = "../util" } +garage_web = { version = "0.8.0", path = "../web" } bytes = "1.0" bytesize = "1.1" @@ -54,9 +54,9 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } -opentelemetry-prometheus = "0.10" -opentelemetry-otlp = "0.10" -prometheus = "0.13" +opentelemetry-prometheus = { version = "0.10", optional = true } +opentelemetry-otlp = { version = "0.10", optional = true } +prometheus = { version = "0.13", optional = true } [dev-dependencies] aws-sdk-s3 = "0.8" @@ -73,5 +73,26 @@ base64 = "0.13" [features] -kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] +default = [ "bundled-libs", "metrics", "sled" ] + k2v = [ "garage_util/k2v", "garage_api/k2v" ] + +# Database engines, Sled is still our default even though we don't like it +sled = [ "garage_model/sled" ] +lmdb = [ "garage_model/lmdb" ] +sqlite = [ "garage_model/sqlite" ] + +# Automatic registration and discovery via Kubernetes API +kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] +# Prometheus exporter (/metrics endpoint). +metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ] +# Exporter for the OpenTelemetry Collector. +telemetry-otlp = [ "opentelemetry-otlp" ] + +# NOTE: bundled-libs and system-libs should be treat as mutually exclusive; +# exactly one of them should be enabled. + +# Use bundled libsqlite instead of linking against system-provided. +bundled-libs = [ "garage_db/bundled-libs" ] +# Link against system-provided libsodium and libzstd. +system-libs = [ "garage_block/system-libs", "garage_rpc/system-libs", "sodiumoxide/use-pkg-config" ] diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 7ba9330c..802a8261 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -741,8 +741,11 @@ impl AdminRpcHandler { let mut ret = String::new(); writeln!( &mut ret, - "\nGarage version: {}", - self.garage.system.garage_version(), + "\nGarage version: {} [features: {}]", + garage_util::version::garage_version(), + garage_util::version::garage_features() + .map(|list| list.join(", ")) + .unwrap_or_else(|| "(unknown)".into()), ) .unwrap(); writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 0388cef5..06548e89 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -1,65 +1,65 @@ use serde::{Deserialize, Serialize}; - -use garage_util::version; use structopt::StructOpt; +use garage_util::version::garage_version; + #[derive(StructOpt, Debug)] pub enum Command { /// Run Garage server - #[structopt(name = "server", version = version::garage())] + #[structopt(name = "server", version = garage_version())] Server, /// Get network status - #[structopt(name = "status", version = version::garage())] + #[structopt(name = "status", version = garage_version())] Status, /// Operations on individual Garage nodes - #[structopt(name = "node", version = version::garage())] + #[structopt(name = "node", version = garage_version())] Node(NodeOperation), /// Operations on the assignation of node roles in the cluster layout - #[structopt(name = "layout", version = version::garage())] + #[structopt(name = "layout", version = garage_version())] Layout(LayoutOperation), /// Operations on buckets - #[structopt(name = "bucket", version = version::garage())] + #[structopt(name = "bucket", version = garage_version())] Bucket(BucketOperation), /// Operations on S3 access keys - #[structopt(name = "key", version = version::garage())] + #[structopt(name = "key", version = garage_version())] Key(KeyOperation), /// Run migrations from previous Garage version /// (DO NOT USE WITHOUT READING FULL DOCUMENTATION) - #[structopt(name = "migrate", version = version::garage())] + #[structopt(name = "migrate", version = garage_version())] Migrate(MigrateOpt), /// Start repair of node data on remote node - #[structopt(name = "repair", version = version::garage())] + #[structopt(name = "repair", version = garage_version())] Repair(RepairOpt), /// Offline reparation of node data (these repairs must be run offline /// directly on the server node) - #[structopt(name = "offline-repair", version = version::garage())] + #[structopt(name = "offline-repair", version = garage_version())] OfflineRepair(OfflineRepairOpt), /// Gather node statistics - #[structopt(name = "stats", version = version::garage())] + #[structopt(name = "stats", version = garage_version())] Stats(StatsOpt), /// Manage background workers - #[structopt(name = "worker", version = version::garage())] + #[structopt(name = "worker", version = garage_version())] Worker(WorkerOpt), } #[derive(StructOpt, Debug)] pub enum NodeOperation { /// Print identifier (public key) of this Garage node - #[structopt(name = "id", version = version::garage())] + #[structopt(name = "id", version = garage_version())] NodeId(NodeIdOpt), /// Connect to Garage node that is currently isolated from the system - #[structopt(name = "connect", version = version::garage())] + #[structopt(name = "connect", version = garage_version())] Connect(ConnectNodeOpt), } @@ -80,23 +80,23 @@ pub struct ConnectNodeOpt { #[derive(StructOpt, Debug)] pub enum LayoutOperation { /// Assign role to Garage node - #[structopt(name = "assign", version = version::garage())] + #[structopt(name = "assign", version = garage_version())] Assign(AssignRoleOpt), /// Remove role from Garage cluster node - #[structopt(name = "remove", version = version::garage())] + #[structopt(name = "remove", version = garage_version())] Remove(RemoveRoleOpt), /// Show roles currently assigned to nodes and changes staged for commit - #[structopt(name = "show", version = version::garage())] + #[structopt(name = "show", version = garage_version())] Show, /// Apply staged changes to cluster layout - #[structopt(name = "apply", version = version::garage())] + #[structopt(name = "apply", version = garage_version())] Apply(ApplyLayoutOpt), /// Revert staged changes to cluster layout - #[structopt(name = "revert", version = version::garage())] + #[structopt(name = "revert", version = garage_version())] Revert(RevertLayoutOpt), } @@ -151,43 +151,43 @@ pub struct RevertLayoutOpt { #[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum BucketOperation { /// List buckets - #[structopt(name = "list", version = version::garage())] + #[structopt(name = "list", version = garage_version())] List, /// Get bucket info - #[structopt(name = "info", version = version::garage())] + #[structopt(name = "info", version = garage_version())] Info(BucketOpt), /// Create bucket - #[structopt(name = "create", version = version::garage())] + #[structopt(name = "create", version = garage_version())] Create(BucketOpt), /// Delete bucket - #[structopt(name = "delete", version = version::garage())] + #[structopt(name = "delete", version = garage_version())] Delete(DeleteBucketOpt), /// Alias bucket under new name - #[structopt(name = "alias", version = version::garage())] + #[structopt(name = "alias", version = garage_version())] Alias(AliasBucketOpt), /// Remove bucket alias - #[structopt(name = "unalias", version = version::garage())] + #[structopt(name = "unalias", version = garage_version())] Unalias(UnaliasBucketOpt), /// Allow key to read or write to bucket - #[structopt(name = "allow", version = version::garage())] + #[structopt(name = "allow", version = garage_version())] Allow(PermBucketOpt), /// Deny key from reading or writing to bucket - #[structopt(name = "deny", version = version::garage())] + #[structopt(name = "deny", version = garage_version())] Deny(PermBucketOpt), /// Expose as website or not - #[structopt(name = "website", version = version::garage())] + #[structopt(name = "website", version = garage_version())] Website(WebsiteOpt), /// Set the quotas for this bucket - #[structopt(name = "set-quotas", version = version::garage())] + #[structopt(name = "set-quotas", version = garage_version())] SetQuotas(SetQuotasOpt), } @@ -293,35 +293,35 @@ pub struct SetQuotasOpt { #[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys - #[structopt(name = "list", version = version::garage())] + #[structopt(name = "list", version = garage_version())] List, /// Get key info - #[structopt(name = "info", version = version::garage())] + #[structopt(name = "info", version = garage_version())] Info(KeyOpt), /// Create new key - #[structopt(name = "new", version = version::garage())] + #[structopt(name = "new", version = garage_version())] New(KeyNewOpt), /// Rename key - #[structopt(name = "rename", version = version::garage())] + #[structopt(name = "rename", version = garage_version())] Rename(KeyRenameOpt), /// Delete key - #[structopt(name = "delete", version = version::garage())] + #[structopt(name = "delete", version = garage_version())] Delete(KeyDeleteOpt), /// Set permission flags for key - #[structopt(name = "allow", version = version::garage())] + #[structopt(name = "allow", version = garage_version())] Allow(KeyPermOpt), /// Unset permission flags for key - #[structopt(name = "deny", version = version::garage())] + #[structopt(name = "deny", version = garage_version())] Deny(KeyPermOpt), /// Import key - #[structopt(name = "import", version = version::garage())] + #[structopt(name = "import", version = garage_version())] Import(KeyImportOpt), } @@ -393,7 +393,7 @@ pub struct MigrateOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum MigrateWhat { /// Migrate buckets and permissions from v0.5.0 - #[structopt(name = "buckets050", version = version::garage())] + #[structopt(name = "buckets050", version = garage_version())] Buckets050, } @@ -414,19 +414,19 @@ pub struct RepairOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { /// Only do a full sync of metadata tables - #[structopt(name = "tables", version = version::garage())] + #[structopt(name = "tables", version = garage_version())] Tables, /// Only repair (resync/rebalance) the set of stored blocks - #[structopt(name = "blocks", version = version::garage())] + #[structopt(name = "blocks", version = garage_version())] Blocks, /// Only redo the propagation of object deletions to the version table (slow) - #[structopt(name = "versions", version = version::garage())] + #[structopt(name = "versions", version = garage_version())] Versions, /// Only redo the propagation of version deletions to the block ref table (extremely slow) - #[structopt(name = "block_refs", version = version::garage())] + #[structopt(name = "block_refs", version = garage_version())] BlockRefs, /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) - #[structopt(name = "scrub", version = version::garage())] + #[structopt(name = "scrub", version = garage_version())] Scrub { #[structopt(subcommand)] cmd: ScrubCmd, @@ -436,19 +436,19 @@ pub enum RepairWhat { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum ScrubCmd { /// Start scrub - #[structopt(name = "start", version = version::garage())] + #[structopt(name = "start", version = garage_version())] Start, /// Pause scrub (it will resume automatically after 24 hours) - #[structopt(name = "pause", version = version::garage())] + #[structopt(name = "pause", version = garage_version())] Pause, /// Resume paused scrub - #[structopt(name = "resume", version = version::garage())] + #[structopt(name = "resume", version = garage_version())] Resume, /// Cancel scrub in progress - #[structopt(name = "cancel", version = version::garage())] + #[structopt(name = "cancel", version = garage_version())] Cancel, /// Set tranquility level for in-progress and future scrubs - #[structopt(name = "set-tranquility", version = version::garage())] + #[structopt(name = "set-tranquility", version = garage_version())] SetTranquility { #[structopt()] tranquility: u32, @@ -469,10 +469,10 @@ pub struct OfflineRepairOpt { pub enum OfflineRepairWhat { /// Repair K2V item counters #[cfg(feature = "k2v")] - #[structopt(name = "k2v_item_counters", version = version::garage())] + #[structopt(name = "k2v_item_counters", version = garage_version())] K2VItemCounters, /// Repair object counters - #[structopt(name = "object_counters", version = version::garage())] + #[structopt(name = "object_counters", version = garage_version())] ObjectCounters, } @@ -496,13 +496,13 @@ pub struct WorkerOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum WorkerCmd { /// List all workers on Garage node - #[structopt(name = "list", version = version::garage())] + #[structopt(name = "list", version = garage_version())] List { #[structopt(flatten)] opt: WorkerListOpt, }, /// Set worker parameter - #[structopt(name = "set", version = version::garage())] + #[structopt(name = "set", version = garage_version())] Set { #[structopt(subcommand)] opt: WorkerSetCmd, @@ -522,12 +522,12 @@ pub struct WorkerListOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum WorkerSetCmd { /// Set tranquility of scrub operations - #[structopt(name = "scrub-tranquility", version = version::garage())] + #[structopt(name = "scrub-tranquility", version = garage_version())] ScrubTranquility { tranquility: u32 }, /// Set number of concurrent block resync workers - #[structopt(name = "resync-n-workers", version = version::garage())] + #[structopt(name = "resync-n-workers", version = garage_version())] ResyncNWorkers { n_workers: usize }, /// Set tranquility of block resync operations - #[structopt(name = "resync-tranquility", version = version::garage())] + #[structopt(name = "resync-tranquility", version = garage_version())] ResyncTranquility { tranquility: u32 }, } diff --git a/src/garage/main.rs b/src/garage/main.rs index f6e694f3..0eca24ae 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -8,8 +8,15 @@ mod admin; mod cli; mod repair; mod server; +#[cfg(feature = "telemetry-otlp")] mod tracing_setup; +#[cfg(not(any(feature = "bundled-libs", feature = "system-libs")))] +compile_error!("Either bundled-libs or system-libs Cargo feature must be enabled"); + +#[cfg(all(feature = "bundled-libs", feature = "system-libs"))] +compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled"); + use std::net::SocketAddr; use std::path::PathBuf; @@ -22,7 +29,6 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use garage_util::version; use garage_model::helper::error::Error as HelperError; @@ -30,7 +36,10 @@ use admin::*; use cli::*; #[derive(StructOpt, Debug)] -#[structopt(name = "garage", version = version::garage(), about = "S3-compatible object store for self-hosted geo-distributed deployments")] +#[structopt( + name = "garage", + about = "S3-compatible object store for self-hosted geo-distributed deployments" +)] struct Opt { /// Host to connect to for admin operations, in the format: /// <public-key>@<ip>:<port> @@ -71,7 +80,40 @@ async fn main() { std::process::abort(); })); - let opt = Opt::from_args(); + // Initialize version and features info + let features = &[ + #[cfg(feature = "k2v")] + "k2v", + #[cfg(feature = "sled")] + "sled", + #[cfg(feature = "lmdb")] + "lmdb", + #[cfg(feature = "sqlite")] + "sqlite", + #[cfg(feature = "kubernetes-discovery")] + "kubernetes-discovery", + #[cfg(feature = "metrics")] + "metrics", + #[cfg(feature = "telemetry-otlp")] + "telemetry-otlp", + #[cfg(feature = "bundled-libs")] + "bundled-libs", + #[cfg(feature = "system-libs")] + "system-libs", + ][..]; + if let Some(git_version) = option_env!("GIT_VERSION") { + garage_util::version::init_version(git_version); + } + garage_util::version::init_features(features); + + // Parse arguments + let version = format!( + "{} [features: {}]", + garage_util::version::garage_version(), + features.join(", ") + ); + let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches()); + let res = match opt.cmd { Command::Server => server::run_server(opt.config_file).await, Command::OfflineRepair(repair_opt) => { diff --git a/src/garage/server.rs b/src/garage/server.rs index 6321357a..aeef02a2 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -9,12 +9,13 @@ use garage_util::error::Error; use garage_api::admin::api_server::AdminApiServer; use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; -use garage_web::run_web_server; +use garage_web::WebServer; #[cfg(feature = "k2v")] use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; +#[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; async fn wait_from(mut chan: watch::Receiver<bool>) { @@ -29,6 +30,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); let config = read_config(config_file)?; + // ---- Initialize Garage internals ---- + info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); @@ -36,9 +39,14 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing Garage main data store..."); let garage = Garage::new(config.clone(), background)?; - info!("Initialize tracing..."); - if let Some(export_to) = config.admin.trace_sink { - init_tracing(&export_to, garage.system.id)?; + if config.admin.trace_sink.is_some() { + info!("Initialize tracing..."); + + #[cfg(feature = "telemetry-otlp")] + init_tracing(config.admin.trace_sink.as_ref().unwrap(), garage.system.id)?; + + #[cfg(not(feature = "telemetry-otlp"))] + error!("Garage was built without OTLP exporter, admin.trace_sink is ignored."); } info!("Initialize Admin API server and metrics collector..."); @@ -50,53 +58,78 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Create admin RPC handler..."); AdminRpcHandler::new(garage.clone()); - info!("Initializing S3 API server..."); - let s3_api_server = tokio::spawn(S3ApiServer::run( - garage.clone(), - wait_from(watch_cancel.clone()), - )); - - #[cfg(feature = "k2v")] - let k2v_api_server = { - info!("Initializing K2V API server..."); - tokio::spawn(K2VApiServer::run( - garage.clone(), - wait_from(watch_cancel.clone()), - )) - }; - - info!("Initializing web server..."); - let web_server = tokio::spawn(run_web_server( - garage.clone(), - wait_from(watch_cancel.clone()), - )); - - info!("Launching Admin API server..."); - let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone()))); + // ---- Launch public-facing API servers ---- + + let mut servers = vec![]; + + if let Some(s3_bind_addr) = &config.s3_api.api_bind_addr { + info!("Initializing S3 API server..."); + servers.push(( + "S3 API", + tokio::spawn(S3ApiServer::run( + garage.clone(), + *s3_bind_addr, + config.s3_api.s3_region.clone(), + wait_from(watch_cancel.clone()), + )), + )); + } - // Stuff runs + if config.k2v_api.is_some() { + #[cfg(feature = "k2v")] + { + info!("Initializing K2V API server..."); + servers.push(( + "K2V API", + tokio::spawn(K2VApiServer::run( + garage.clone(), + config.k2v_api.as_ref().unwrap().api_bind_addr, + config.s3_api.s3_region.clone(), + wait_from(watch_cancel.clone()), + )), + )); + } + #[cfg(not(feature = "k2v"))] + error!("K2V is not enabled in this build, cannot start K2V API server"); + } - // When a cancel signal is sent, stuff stops - if let Err(e) = s3_api_server.await? { - warn!("S3 API server exited with error: {}", e); - } else { - info!("S3 API server exited without error."); + if let Some(web_config) = &config.s3_web { + info!("Initializing web server..."); + servers.push(( + "Web", + tokio::spawn(WebServer::run( + garage.clone(), + web_config.bind_addr, + web_config.root_domain.clone(), + wait_from(watch_cancel.clone()), + )), + )); } - #[cfg(feature = "k2v")] - if let Err(e) = k2v_api_server.await? { - warn!("K2V API server exited with error: {}", e); - } else { - info!("K2V API server exited without error."); + + if let Some(admin_bind_addr) = &config.admin.api_bind_addr { + info!("Launching Admin API server..."); + servers.push(( + "Admin", + tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))), + )); } - if let Err(e) = web_server.await? { - warn!("Web server exited with error: {}", e); - } else { - info!("Web server exited without error."); + + #[cfg(not(feature = "metrics"))] + if config.admin.metrics_token.is_some() { + warn!("This Garage version is built without the metrics feature"); } - if let Err(e) = admin_server.await? { - warn!("Admin web server exited with error: {}", e); - } else { - info!("Admin API server exited without error."); + + // Stuff runs + + // When a cancel signal is sent, stuff stops + + // Collect stuff + for (desc, join_handle) in servers { + if let Err(e) = join_handle.await? { + error!("{} server exited with error: {}", desc, e); + } else { + info!("{} server exited without error.", desc); + } } // Remove RPC handlers for system to break reference cycles diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs index 24d794c3..87be1327 100644 --- a/src/garage/tests/lib.rs +++ b/src/garage/tests/lib.rs @@ -4,7 +4,7 @@ mod common; mod admin; mod bucket; +mod s3; + #[cfg(feature = "k2v")] mod k2v; - -mod s3; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 2f8a2679..0f0b76ae 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -22,7 +22,7 @@ tokio = "1.17.0" # cli deps clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } -garage_util = { version = "0.7.0", path = "../util", optional = true } +garage_util = { version = "0.8.0", path = "../util", optional = true } [features] diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 73011e0d..d6e2adfe 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -15,11 +15,10 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_table = { version = "0.7.0", path = "../table" } -garage_block = { version = "0.7.0", path = "../block" } -garage_util = { version = "0.7.0", path = "../util" } -garage_model_050 = { package = "garage_model", version = "0.5.1" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_table = { version = "0.8.0", path = "../table" } +garage_block = { version = "0.8.0", path = "../block" } +garage_util = { version = "0.8.0", path = "../util" } async-trait = "0.1.7" arc-swap = "1.0" @@ -45,3 +44,6 @@ netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch [features] k2v = [ "garage_util/k2v" ] +lmdb = [ "garage_db/lmdb" ] +sled = [ "garage_db/sled" ] +sqlite = [ "garage_db/sqlite" ] diff --git a/src/model/garage.rs b/src/model/garage.rs index c67f1fe0..66c359e7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -80,6 +80,8 @@ impl Garage { let mut db_path = config.metadata_dir.clone(); std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); let db = match config.db_engine.as_str() { + // ---- Sled DB ---- + #[cfg(feature = "sled")] "sled" => { db_path.push("db"); info!("Opening Sled database at: {}", db_path.display()); @@ -91,6 +93,10 @@ impl Garage { .expect("Unable to open sled DB"); db::sled_adapter::SledDb::init(db) } + #[cfg(not(feature = "sled"))] + "sled" => return Err(Error::Message("sled db not available in this build".into())), + // ---- Sqlite DB ---- + #[cfg(feature = "sqlite")] "sqlite" | "sqlite3" | "rusqlite" => { db_path.push("db.sqlite"); info!("Opening Sqlite database at: {}", db_path.display()); @@ -98,6 +104,14 @@ impl Garage { .expect("Unable to open sqlite DB"); db::sqlite_adapter::SqliteDb::init(db) } + #[cfg(not(feature = "sqlite"))] + "sqlite" | "sqlite3" | "rusqlite" => { + return Err(Error::Message( + "sqlite db not available in this build".into(), + )) + } + // ---- LMDB DB ---- + #[cfg(feature = "lmdb")] "lmdb" | "heed" => { db_path.push("db.lmdb"); info!("Opening LMDB database at: {}", db_path.display()); @@ -116,10 +130,22 @@ impl Garage { let db = env_builder.open(&db_path).expect("Unable to open LMDB DB"); db::lmdb_adapter::LmdbDb::init(db) } + #[cfg(not(feature = "lmdb"))] + "lmdb" | "heed" => return Err(Error::Message("lmdb db not available in this build".into())), + // ---- Unavailable DB engine ---- e => { return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e + "Unsupported DB engine: {} (options: {})", + e, + vec![ + #[cfg(feature = "sled")] + "sled", + #[cfg(feature = "sqlite")] + "sqlite", + #[cfg(feature = "lmdb")] + "lmdb", + ] + .join(", ") ))); } }; diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 330e83f0..7288f6e4 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -6,7 +6,7 @@ use garage_util::data::*; use crate::permission::BucketKeyPerm; -use garage_model_050::key_table as old; +use crate::prev::v051::key_table as old; /// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] diff --git a/src/model/lib.rs b/src/model/lib.rs index 7c9d9270..4f20ea46 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -1,6 +1,9 @@ #[macro_use] extern crate tracing; +// For migration from previous versions +pub(crate) mod prev; + pub mod permission; pub mod index_counter; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 5fc67069..cd6ad26a 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -5,7 +5,7 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model_050::bucket_table as old_bucket; +use crate::prev::v051::bucket_table as old_bucket; use crate::bucket_alias_table::*; use crate::bucket_table::*; diff --git a/src/model/prev/mod.rs b/src/model/prev/mod.rs new file mode 100644 index 00000000..68bb1502 --- /dev/null +++ b/src/model/prev/mod.rs @@ -0,0 +1 @@ +pub(crate) mod v051; diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs new file mode 100644 index 00000000..0c52b6ea --- /dev/null +++ b/src/model/prev/v051/bucket_table.rs @@ -0,0 +1,63 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::Crdt; +use garage_table::*; + +use super::key_table::PermissionSet; + +/// A bucket is a collection of objects +/// +/// Its parameters are not directly accessible as: +/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. +/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Bucket { + /// Name of the bucket + pub name: String, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Lww<BucketState>, +} + +/// State of a bucket +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + /// The bucket is deleted + Deleted, + /// The bucket exists + Present(BucketParams), +} + +impl Crdt for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_params) => { + if let BucketState::Present(params) = self { + params.merge(other_params); + } + } + } + } +} + +/// Configuration for a bucket +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::LwwMap<String, PermissionSet>, + /// Is the bucket served as http + pub website: crdt::Lww<bool>, +} + +impl Crdt for BucketParams { + fn merge(&mut self, o: &Self) { + self.authorized_keys.merge(&o.authorized_keys); + self.website.merge(&o.website); + } +} + +impl Crdt for Bucket { + fn merge(&mut self, other: &Self) { + self.state.merge(&other.state); + } +} diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs new file mode 100644 index 00000000..fee24741 --- /dev/null +++ b/src/model/prev/v051/key_table.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::*; +use garage_table::*; + +/// An api key +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, + + /// The secret_key associated + pub secret_key: String, + + /// Name for the key + pub name: crdt::Lww<String>, + + /// Is the key deleted + pub deleted: crdt::Bool, + + /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LwwMap<String, PermissionSet>, +} + +/// Permission given to a key in a bucket +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, +} + +impl AutoCrdt for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for Key { + fn merge(&mut self, other: &Self) { + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.authorized_buckets.clear(); + } else { + self.authorized_buckets.merge(&other.authorized_buckets); + } + } +} diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs new file mode 100644 index 00000000..7a954752 --- /dev/null +++ b/src/model/prev/v051/mod.rs @@ -0,0 +1,4 @@ +pub(crate) mod bucket_table; +pub(crate) mod key_table; +pub(crate) mod object_table; +pub(crate) mod version_table; diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs new file mode 100644 index 00000000..cb59b309 --- /dev/null +++ b/src/model/prev/v051/object_table.rs @@ -0,0 +1,149 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +use garage_util::data::*; + +use garage_table::crdt::*; + +/// An object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + versions: Vec<ObjectVersion>, +} + +impl Object { + /// Get a list of currently stored versions of `Object` + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +/// Informations about a version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, +} + +/// State of an object version +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, +} + +impl Crdt for ObjectVersionState { + fn merge(&mut self, other: &Self) { + use ObjectVersionState::*; + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } + } +} + +/// Data stored in object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), +} + +impl AutoCrdt for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +/// Metadata about the object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, +} + +/// Additional headers for an object +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap<String, String>, +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, Uuid) { + (self.timestamp, self.uuid) + } + + /// Is the object version completely received + pub fn is_complete(&self) -> bool { + matches!(self.state, ObjectVersionState::Complete(_)) + } +} + +impl Crdt for Object { + fn merge(&mut self, other: &Self) { + // Merge versions from other into here + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + self.versions[i].state.merge(&other_v.state); + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .find(|(_, v)| v.is_complete()) + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); + } + } +} diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs new file mode 100644 index 00000000..1e658f91 --- /dev/null +++ b/src/model/prev/v051/version_table.rs @@ -0,0 +1,79 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +/// A version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map<u64, String>, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket: String, + /// Key in which the related object is stored + pub key: String, +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) + } +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +/// Informations about a single block +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, +} + +impl AutoCrdt for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for Version { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.blocks.clear(); + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); + } + } +} diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a3914c36..a151f1b1 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -14,7 +14,7 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use garage_model_050::object_table as old; +use crate::prev::v051::object_table as old; pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 881c245a..b545e66a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -12,7 +12,7 @@ use garage_table::*; use crate::s3::block_ref_table::*; -use garage_model_050::version_table as old; +use crate::prev::v051::version_table as old; /// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 1a2ce954..d7136401 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_rpc" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,7 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.7.0", path = "../util" } +garage_util = { version = "0.8.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" @@ -53,3 +53,4 @@ hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } [features] kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ] +system-libs = [ "sodiumoxide/use-pkg-config" ] diff --git a/src/rpc/system.rs b/src/rpc/system.rs index d7ef2140..c0e70c61 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -27,7 +27,6 @@ use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; -use garage_util::version; use crate::consul::*; #[cfg(feature = "kubernetes-discovery")] @@ -40,8 +39,10 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15); -/// Version tag used for version check upon Netapp connection -pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007 +/// Version tag used for version check upon Netapp connection. +/// Cluster nodes with different version tags are deemed +/// incompatible and will refuse to connect. +pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; @@ -320,10 +321,6 @@ impl System { // ---- Administrative operations (directly available and // also available through RPC) ---- - pub fn garage_version(&self) -> &'static str { - version::garage() - } - pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { let node_status = self.node_status.read().unwrap(); let known_nodes = self diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6de37cda..ae52e8d7 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -15,8 +15,8 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_util = { version = "0.7.0", path = "../util" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 73d38fcf..c648e13b 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_util" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -16,17 +16,19 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } +arc-swap = "1.0" async-trait = "0.1" blake2 = "0.9" bytes = "1.0" digest = "0.10" err-derive = "0.3" +git-version = "0.3.4" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" +lazy_static = "1.4" tracing = "0.1.30" rand = "0.8" sha2 = "0.10" -git-version = "0.3.4" chrono = "0.4" rmp-serde = "0.15" diff --git a/src/util/config.rs b/src/util/config.rs index a2bb8fb3..cccad101 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -77,11 +77,10 @@ pub struct Config { pub s3_api: S3ApiConfig, /// Configuration for K2V api - #[cfg(feature = "k2v")] pub k2v_api: Option<K2VApiConfig>, /// Configuration for serving files as normal web server - pub s3_web: WebConfig, + pub s3_web: Option<WebConfig>, /// Configuration for the admin API endpoint #[serde(default = "Default::default")] @@ -92,7 +91,7 @@ pub struct Config { #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: SocketAddr, + pub api_bind_addr: Option<SocketAddr>, /// S3 region to use pub s3_region: String, /// Suffix to remove from domain name to find bucket. If None, @@ -101,7 +100,6 @@ pub struct S3ApiConfig { } /// Configuration for K2V api -#[cfg(feature = "k2v")] #[derive(Deserialize, Debug, Clone)] pub struct K2VApiConfig { /// Address and port to bind for api serving diff --git a/src/util/version.rs b/src/util/version.rs index 8882d035..b515dccc 100644 --- a/src/util/version.rs +++ b/src/util/version.rs @@ -1,7 +1,28 @@ -pub fn garage() -> &'static str { - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( +use std::sync::Arc; + +use arc_swap::{ArcSwap, ArcSwapOption}; + +lazy_static::lazy_static! { + static ref VERSION: ArcSwap<&'static str> = ArcSwap::new(Arc::new(git_version::git_version!( prefix = "git:", cargo_prefix = "cargo:", fallback = "unknown" - )) + ))); + static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None); +} + +pub fn garage_version() -> &'static str { + &VERSION.load() +} + +pub fn garage_features() -> Option<&'static [&'static str]> { + FEATURES.load().as_ref().map(|f| &f[..]) +} + +pub fn init_version(version: &'static str) { + VERSION.store(Arc::new(version)); +} + +pub fn init_features(features: &'static [&'static str]) { + FEATURES.store(Some(Arc::new(features))); } diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 59a1231d..7bf70c55 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_web" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"] edition = "2018" license = "AGPL-3.0" @@ -14,10 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.7.0", path = "../api" } -garage_model = { version = "0.7.0", path = "../model" } -garage_util = { version = "0.7.0", path = "../util" } -garage_table = { version = "0.7.0", path = "../table" } +garage_api = { version = "0.8.0", path = "../api" } +garage_model = { version = "0.8.0", path = "../model" } +garage_util = { version = "0.8.0", path = "../util" } +garage_table = { version = "0.8.0", path = "../table" } err-derive = "0.3" tracing = "0.1.30" diff --git a/src/web/lib.rs b/src/web/lib.rs index 9b7c8573..7207c365 100644 --- a/src/web/lib.rs +++ b/src/web/lib.rs @@ -6,4 +6,4 @@ mod error; pub use error::Error; mod web_server; -pub use web_server::run_web_server; +pub use web_server::WebServer; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index c30d8957..c2322073 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -57,90 +57,226 @@ impl WebMetrics { } } -/// Run a web server -pub async fn run_web_server( +pub struct WebServer { garage: Arc<Garage>, - shutdown_signal: impl Future<Output = ()>, -) -> Result<(), GarageError> { - let addr = &garage.config.s3_web.bind_addr; + metrics: Arc<WebMetrics>, + root_domain: String, +} - let metrics = Arc::new(WebMetrics::new()); +impl WebServer { + /// Run a web server + pub async fn run( + garage: Arc<Garage>, + addr: SocketAddr, + root_domain: String, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let metrics = Arc::new(WebMetrics::new()); + let web_server = Arc::new(WebServer { + garage, + metrics, + root_domain, + }); + + let service = make_service_fn(|conn: &AddrStream| { + let web_server = web_server.clone(); + + let client_addr = conn.remote_addr(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let web_server = web_server.clone(); + + web_server.handle_request(req, client_addr) + })) + } + }); - let service = make_service_fn(|conn: &AddrStream| { - let garage = garage.clone(); - let metrics = metrics.clone(); + let server = Server::bind(&addr).serve(service); + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!("Web server listening on http://{}", addr); - let client_addr = conn.remote_addr(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - let metrics = metrics.clone(); + graceful.await?; + Ok(()) + } - handle_request(garage, metrics, req, client_addr) - })) + async fn handle_request( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, Infallible> { + info!("{} {} {}", addr, req.method(), req.uri()); + + // Lots of instrumentation + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("Web {} request", req.method())) + .with_trace_id(gen_trace_id()) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().to_string()), + ]) + .start(&tracer); + + let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; + + // The actual handler + let res = self + .serve_file(&req) + .with_context(Context::current_with_span(span)) + .record_duration(&self.metrics.request_duration, &metrics_tags[..]) + .await; + + // More instrumentation + self.metrics.request_counter.add(1, &metrics_tags[..]); + + // Returning the result + match res { + Ok(res) => { + debug!("{} {} {}", req.method(), res.status(), req.uri()); + Ok(res) + } + Err(error) => { + info!( + "{} {} {} {}", + req.method(), + error.http_status_code(), + req.uri(), + error + ); + self.metrics.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", error.http_status_code().to_string()), + ], + ); + Ok(error_to_res(error)) + } } - }); + } - let server = Server::bind(addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Web server listening on http://{}", addr); + async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> { + // Get http authority string (eg. [::1]:3902 or garage.tld:80) + let authority = req + .headers() + .get(HOST) + .ok_or_bad_request("HOST header required")? + .to_str()?; + + // Get bucket + let host = authority_to_host(authority)?; + + let bucket_name = host_to_bucket(&host, &self.root_domain).unwrap_or(&host); + let bucket_id = self + .garage + .bucket_alias_table + .get(&EmptyKey, &bucket_name.to_string()) + .await? + .and_then(|x| x.state.take()) + .ok_or(Error::NotFound)?; + + // Check bucket isn't deleted and has website access enabled + let bucket = self + .garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .ok_or(Error::NotFound)?; + + let website_config = bucket + .params() + .ok_or(Error::NotFound)? + .website_config + .get() + .as_ref() + .ok_or(Error::NotFound)?; + + // Get path + let path = req.uri().path().to_string(); + let index = &website_config.index_document; + let key = path_to_key(&path, index)?; + + debug!( + "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", + bucket_name, bucket_id, key + ); + + let ret_doc = match *req.method() { + Method::OPTIONS => handle_options_for_bucket(req, &bucket), + Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await, + Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await, + _ => Err(ApiError::bad_request("HTTP method not supported")), + } + .map_err(Error::from); + + match ret_doc { + Err(error) => { + // For a HEAD or OPTIONS method, and for non-4xx errors, + // we don't return the error document as content, + // we return above and just return the error message + // by relying on err_to_res that is called when we return an Err. + if *req.method() == Method::HEAD + || *req.method() == Method::OPTIONS + || !error.http_status_code().is_client_error() + { + return Err(error); + } - graceful.await?; - Ok(()) -} + // If no error document is set: just return the error directly + let error_document = match &website_config.error_document { + Some(ed) => ed.trim_start_matches('/').to_owned(), + None => return Err(error), + }; + + // We want to return the error document + // Create a fake HTTP request with path = the error document + let req2 = Request::builder() + .uri(format!("http://{}/{}", host, &error_document)) + .body(Body::empty()) + .unwrap(); + + match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await + { + Ok(mut error_doc) => { + // The error won't be logged back in handle_request, + // so log it here + info!( + "{} {} {} {}", + req.method(), + req.uri(), + error.http_status_code(), + error + ); + + *error_doc.status_mut() = error.http_status_code(); + error.add_headers(error_doc.headers_mut()); + + // Preserve error message in a special header + for error_line in error.to_string().split('\n') { + if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) { + error_doc.headers_mut().append("X-Garage-Error", v); + } + } -async fn handle_request( - garage: Arc<Garage>, - metrics: Arc<WebMetrics>, - req: Request<Body>, - addr: SocketAddr, -) -> Result<Response<Body>, Infallible> { - info!("{} {} {}", addr, req.method(), req.uri()); - - // Lots of instrumentation - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("Web {} request", req.method())) - .with_trace_id(gen_trace_id()) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().to_string()), - ]) - .start(&tracer); - - let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; - - // The actual handler - let res = serve_file(garage, &req) - .with_context(Context::current_with_span(span)) - .record_duration(&metrics.request_duration, &metrics_tags[..]) - .await; - - // More instrumentation - metrics.request_counter.add(1, &metrics_tags[..]); - - // Returning the result - match res { - Ok(res) => { - debug!("{} {} {}", req.method(), res.status(), req.uri()); - Ok(res) - } - Err(error) => { - info!( - "{} {} {} {}", - req.method(), - error.http_status_code(), - req.uri(), - error - ); - metrics.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", error.http_status_code().to_string()), - ], - ); - Ok(error_to_res(error)) + Ok(error_doc) + } + Err(error_doc_error) => { + warn!( + "Couldn't get error document {} for bucket {:?}: {}", + error_document, bucket_id, error_doc_error + ); + Err(error) + } + } + } + Ok(mut resp) => { + // Maybe add CORS headers + if let Some(rule) = find_matching_cors_rule(&bucket, req)? { + add_cors_headers(&mut resp, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + Ok(resp) + } } } } @@ -160,129 +296,6 @@ fn error_to_res(e: Error) -> Response<Body> { http_error } -async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response<Body>, Error> { - // Get http authority string (eg. [::1]:3902 or garage.tld:80) - let authority = req - .headers() - .get(HOST) - .ok_or_bad_request("HOST header required")? - .to_str()?; - - // Get bucket - let host = authority_to_host(authority)?; - let root = &garage.config.s3_web.root_domain; - - let bucket_name = host_to_bucket(&host, root).unwrap_or(&host); - let bucket_id = garage - .bucket_alias_table - .get(&EmptyKey, &bucket_name.to_string()) - .await? - .and_then(|x| x.state.take()) - .ok_or(Error::NotFound)?; - - // Check bucket isn't deleted and has website access enabled - let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NotFound)?; - - let website_config = bucket - .params() - .ok_or(Error::NotFound)? - .website_config - .get() - .as_ref() - .ok_or(Error::NotFound)?; - - // Get path - let path = req.uri().path().to_string(); - let index = &website_config.index_document; - let key = path_to_key(&path, index)?; - - debug!( - "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", - bucket_name, bucket_id, key - ); - - let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket), - Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await, - Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await, - _ => Err(ApiError::bad_request("HTTP method not supported")), - } - .map_err(Error::from); - - match ret_doc { - Err(error) => { - // For a HEAD or OPTIONS method, and for non-4xx errors, - // we don't return the error document as content, - // we return above and just return the error message - // by relying on err_to_res that is called when we return an Err. - if *req.method() == Method::HEAD - || *req.method() == Method::OPTIONS - || !error.http_status_code().is_client_error() - { - return Err(error); - } - - // If no error document is set: just return the error directly - let error_document = match &website_config.error_document { - Some(ed) => ed.trim_start_matches('/').to_owned(), - None => return Err(error), - }; - - // We want to return the error document - // Create a fake HTTP request with path = the error document - let req2 = Request::builder() - .uri(format!("http://{}/{}", host, &error_document)) - .body(Body::empty()) - .unwrap(); - - match handle_get(garage, &req2, bucket_id, &error_document, None).await { - Ok(mut error_doc) => { - // The error won't be logged back in handle_request, - // so log it here - info!( - "{} {} {} {}", - req.method(), - req.uri(), - error.http_status_code(), - error - ); - - *error_doc.status_mut() = error.http_status_code(); - error.add_headers(error_doc.headers_mut()); - - // Preserve error message in a special header - for error_line in error.to_string().split('\n') { - if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) { - error_doc.headers_mut().append("X-Garage-Error", v); - } - } - - Ok(error_doc) - } - Err(error_doc_error) => { - warn!( - "Couldn't get error document {} for bucket {:?}: {}", - error_document, bucket_id, error_doc_error - ); - Err(error) - } - } - } - Ok(mut resp) => { - // Maybe add CORS headers - if let Some(rule) = find_matching_cors_rule(&bucket, req)? { - add_cors_headers(&mut resp, rule) - .ok_or_internal_error("Invalid bucket CORS configuration")?; - } - Ok(resp) - } - } -} - /// Path to key /// /// Convert the provided path to the internal key |