diff options
author | Alex Auvolat <alex@adnab.me> | 2023-06-13 17:02:42 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-06-13 17:14:11 +0200 |
commit | 90b2d43eb49d49c3aef4f501a30cf2f181adb183 (patch) | |
tree | f7f7c15547e2f2072a1841fcb2350c272faf3c9d /src | |
parent | bf19a44fd93584d5250a2e98e5b1d3a2de6d59d1 (diff) | |
parent | 01346143ca09eab262f0d8f8a0a744c2f6d667cc (diff) | |
download | garage-90b2d43eb49d49c3aef4f501a30cf2f181adb183.tar.gz garage-90b2d43eb49d49c3aef4f501a30cf2f181adb183.zip |
Merge branch 'main' into next
Diffstat (limited to 'src')
30 files changed, 560 insertions, 271 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 58dd38d8..b0dfdfb7 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -105,7 +105,7 @@ impl AdminApiServer { let bucket_id = self .garage .bucket_helper() - .resolve_global_bucket_name(&domain) + .resolve_global_bucket_name(domain) .await? .ok_or(HelperError::NoSuchBucket(domain.to_string()))?; diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index d0354d28..757b85ec 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -128,7 +128,7 @@ impl<A: ApiHandler> ApiServer<A> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = - forwarded_headers::handle_forwarded_for_headers(&req.headers()) + forwarded_headers::handle_forwarded_for_headers(req.headers()) { info!( "{} (via {}) {} {}", diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 26d678da..294380ea 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -282,8 +282,8 @@ pub(crate) async fn handle_poll_range( if let Some((items, seen_marker)) = resp { let resp = PollRangeResponse { items: items - .into_iter() - .map(|(_k, i)| ReadBatchResponseItem::from(i)) + .into_values() + .map(ReadBatchResponseItem::from) .collect::<Vec<_>>(), seen_marker, }; diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 4c7934e5..b50fb3bb 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -19,7 +19,7 @@ use crate::signature::error::*; pub async fn check_payload_signature( garage: &Garage, - service: &str, + service: &'static str, request: &Request<Body>, ) -> Result<(Option<Key>, Option<Hash>), Error> { let mut headers = HashMap::new(); @@ -51,6 +51,7 @@ pub async fn check_payload_signature( }; let canonical_request = canonical_request( + service, request.method(), request.uri(), &headers, @@ -184,7 +185,7 @@ fn parse_query_authorization( if duration > 7 * 24 * 3600 { return Err(Error::bad_request( - "X-Amz-Exprires may not exceed a week".to_string(), + "X-Amz-Expires may not exceed a week".to_string(), )); } @@ -210,7 +211,7 @@ fn parse_query_authorization( fn parse_credential(cred: &str) -> Result<(String, String), Error> { let first_slash = cred .find('/') - .ok_or_bad_request("Credentials does not contain / in authorization field")?; + .ok_or_bad_request("Credentials does not contain '/' in authorization field")?; let (key_id, scope) = cred.split_at(first_slash); Ok(( key_id.to_string(), @@ -231,15 +232,50 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re } pub fn canonical_request( + service: &'static str, method: &Method, uri: &hyper::Uri, headers: &HashMap<String, String>, signed_headers: &str, content_sha256: &str, ) -> String { + // There seems to be evidence that in AWSv4 signatures, the path component is url-encoded + // a second time when building the canonical request, as specified in this documentation page: + // -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html + // However this documentation page is for a specific service ("roles anywhere"), and + // in the S3 service we know for a fact that there is no double-urlencoding, because all of + // the tests we made with external software work without it. + // + // The theory is that double-urlencoding occurs for all services except S3, + // which is what is implemented in rusoto_signature: + // -> https://docs.rs/rusoto_signature/latest/src/rusoto_signature/signature.rs.html#464 + // + // Digging into the code of the official AWS Rust SDK, we learn that double-URI-encoding can + // be set or unset on a per-request basis (the signature crates, aws-sigv4 and aws-sig-auth, + // are agnostic to this). Grepping the codebase confirms that S3 is the only API for which + // double_uri_encode is set to false, meaning it is true (its default value) for all other + // AWS services. We will therefore implement this behavior in Garage as well. + // + // Note that this documentation page, which is touted as the "authoritative reference" on + // AWSv4 signatures, makes no mention of either single- or double-urlencoding: + // -> https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html + // This page of the S3 documentation does also not mention anything specific: + // -> https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html + // + // Note that there is also the issue of path normalization, which I hope is unrelated to the + // one of URI-encoding. At least in aws-sigv4 both parameters can be set independently, + // and rusoto_signature does not seem to do any effective path normalization, even though + // it mentions it in the comments (same link to the souce code as above). + // We make the explicit choice of NOT normalizing paths in the K2V API because doing so + // would make non-normalized paths invalid K2V partition keys, and we don't want that. + let path: std::borrow::Cow<str> = if service != "s3" { + uri_encode(uri.path(), false).into() + } else { + uri.path().into() + }; [ method.as_str(), - uri.path(), + &path, &canonical_query_string(uri), &canonical_header_string(headers, signed_headers), "", diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 3bf1c40a..1f8d738b 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -37,7 +37,7 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -tokio-util = { version = "0.6", features = ["io"] } +tokio-util = { version = "0.7", features = ["io"] } [features] system-libs = [ "zstd/pkg-config" ] diff --git a/src/block/repair.rs b/src/block/repair.rs index c89484d9..71093d69 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -220,14 +220,12 @@ fn randomize_next_scrub_run_time(timestamp: u64) -> u64 { // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to // balance scrub load across different cluster nodes. - let next_run_timestamp = timestamp + timestamp + SCRUB_INTERVAL .saturating_add(Duration::from_secs( rand::thread_rng().gen_range(0..3600 * 24 * 10), )) - .as_millis() as u64; - - next_run_timestamp + .as_millis() as u64 } impl Default for ScrubWorkerPersisted { @@ -241,18 +239,14 @@ impl Default for ScrubWorkerPersisted { } } +#[derive(Default)] enum ScrubWorkerState { Running(BlockStoreIterator), Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + #[default] Finished, } -impl Default for ScrubWorkerState { - fn default() -> Self { - ScrubWorkerState::Finished - } -} - #[derive(Debug)] pub enum ScrubWorkerCommand { Start, diff --git a/src/format-table/Cargo.toml b/src/format-table/Cargo.toml new file mode 100644 index 00000000..9e31e211 --- /dev/null +++ b/src/format-table/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "format_table" +version = "0.1.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Format tables with a stupid API" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "README.md" + +[lib] +path = "lib.rs" diff --git a/src/format-table/README.md b/src/format-table/README.md new file mode 100644 index 00000000..d918bdf4 --- /dev/null +++ b/src/format-table/README.md @@ -0,0 +1,13 @@ +# `format_table` + +Format tables with a stupid API. [Documentation](https://docs.rs/format_table). + +Example: + +```rust +let mut table = vec!["product\tquantity\tprice".to_string()]; +for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] { + table.push(format!("{}\t{}\t{}", p, q, r)); +} +format_table::format_table(table); +``` diff --git a/src/util/formater.rs b/src/format-table/lib.rs index 2ea53ebb..55252ba9 100644 --- a/src/util/formater.rs +++ b/src/format-table/lib.rs @@ -1,3 +1,19 @@ +//! Format tables with a stupid API. +//! +//! Example: +//! +//! ```rust +//! let mut table = vec!["product\tquantity\tprice".to_string()]; +//! for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] { +//! table.push(format!("{}\t{}\t{}", p, q, r)); +//! } +//! format_table::format_table(table); +//! ``` +//! +//! A table to be formatted is a `Vec<String>`, containing one string per line. +//! Table columns in each line are separated by a `\t` character. + +/// Format a table and return the result as a string. pub fn format_table_to_string(data: Vec<String>) -> String { let data = data .iter() @@ -27,6 +43,7 @@ pub fn format_table_to_string(data: Vec<String>) -> String { out } +/// Format a table and print the result to stdout. pub fn format_table(data: Vec<String>) { print!("{}", format_table_to_string(data)); } diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 7f7c3287..f9fc206b 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -21,6 +21,7 @@ path = "tests/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +format_table.workspace = true garage_db.workspace = true garage_api.workspace = true garage_block.workspace = true @@ -72,6 +73,8 @@ assert-json-diff = "2.0" serde_json = "1.0" base64 = "0.21" +k2v-client.workspace = true + [features] default = [ "bundled-libs", "metrics", "sled", "lmdb", "sqlite", "k2v" ] diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 33c21eba..b6f9c426 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -9,10 +9,11 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use format_table::format_table_to_string; + use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error as GarageError; -use garage_util::formater::format_table_to_string; use garage_table::replication::*; use garage_table::*; diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 045f050c..48359614 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,8 +1,8 @@ use std::collections::HashSet; use std::time::Duration; +use format_table::format_table; use garage_util::error::*; -use garage_util::formater::format_table; use garage_rpc::layout::*; use garage_rpc::system::*; diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index cf8631a4..3932f115 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,8 +1,8 @@ use bytesize::ByteSize; +use format_table::format_table; use garage_util::crdt::Crdt; use garage_util::error::*; -use garage_util::formater::format_table; use garage_rpc::layout::*; use garage_rpc::system::*; diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index d87f9eab..2232d395 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; use std::time::Duration; +use format_table::format_table; use garage_util::background::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::formater::format_table; use garage_util::time::*; use garage_block::manager::BlockResyncErrorInfo; @@ -383,13 +383,18 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; for e in el { + let next_try = if e.next_try > now { + tf2.convert(Duration::from_millis(e.next_try - now)) + } else { + "asap".to_string() + }; table.push(format!( "{}\t{}\t{}\t{}\tin {}", hex::encode(e.hash.as_slice()), e.refcount, e.error_count, tf.convert(Duration::from_millis(now - e.last_try)), - tf2.convert(Duration::from_millis(e.next_try - now)) + next_try )); } format_table(table); diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 609eda97..4133bb8b 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -209,6 +209,7 @@ impl<'a> RequestBuilder<'a> { all_headers.extend(self.unsigned_headers.clone()); let canonical_request = signature::payload::canonical_request( + self.service, &self.method, &Uri::try_from(&uri).unwrap(), &all_headers, diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs index eca3e42b..0b8c6755 100644 --- a/src/garage/tests/common/mod.rs +++ b/src/garage/tests/common/mod.rs @@ -1,5 +1,6 @@ use aws_sdk_s3::{Client, Region}; use ext::*; +use k2v_client::K2vClient; #[macro_use] pub mod macros; @@ -68,6 +69,19 @@ impl Context { bucket_name } + + /// Build a K2vClient for a given bucket + pub fn k2v_client(&self, bucket: &str) -> K2vClient { + let config = k2v_client::K2vClientConfig { + region: REGION.to_string(), + endpoint: self.garage.k2v_uri().to_string(), + aws_access_key_id: self.key.id.clone(), + aws_secret_access_key: self.key.secret.clone(), + bucket: bucket.to_string(), + user_agent: None, + }; + K2vClient::new(config).expect("Could not create K2V client") + } } pub fn context() -> Context { diff --git a/src/garage/tests/k2v_client/mod.rs b/src/garage/tests/k2v_client/mod.rs new file mode 100644 index 00000000..b252f36b --- /dev/null +++ b/src/garage/tests/k2v_client/mod.rs @@ -0,0 +1 @@ +pub mod simple; diff --git a/src/garage/tests/k2v_client/simple.rs b/src/garage/tests/k2v_client/simple.rs new file mode 100644 index 00000000..1a3118ef --- /dev/null +++ b/src/garage/tests/k2v_client/simple.rs @@ -0,0 +1,60 @@ +use std::time::Duration; + +use k2v_client::*; + +use crate::common; + +#[tokio::test] +async fn test_simple() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-client-simple"); + let k2v_client = ctx.k2v_client(&bucket); + + k2v_client + .insert_item("root", "test1", b"Hello, world!".to_vec(), None) + .await + .unwrap(); + + let res = k2v_client.read_item("root", "test1").await.unwrap(); + + assert_eq!(res.value.len(), 1); + assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec())); +} + +#[tokio::test] +async fn test_special_chars() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-client-simple-special-chars"); + let k2v_client = ctx.k2v_client(&bucket); + + let (pk, sk) = ("root@plépp", "≤≤««"); + k2v_client + .insert_item(pk, sk, b"Hello, world!".to_vec(), None) + .await + .unwrap(); + + let res = k2v_client.read_item(pk, sk).await.unwrap(); + assert_eq!(res.value.len(), 1); + assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec())); + + // sleep a bit before read_index + tokio::time::sleep(Duration::from_secs(1)).await; + let res = k2v_client.read_index(Default::default()).await.unwrap(); + assert_eq!(res.items.len(), 1); + assert_eq!(res.items.keys().next().unwrap(), pk); + + let res = k2v_client + .read_batch(&[BatchReadOp { + partition_key: pk, + filter: Default::default(), + single_item: false, + conflicts_only: false, + tombstones: false, + }]) + .await + .unwrap(); + assert_eq!(res.len(), 1); + let res = &res[0]; + assert_eq!(res.items.len(), 1); + assert_eq!(res.items.keys().next().unwrap(), sk); +} diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs index 87be1327..e450baac 100644 --- a/src/garage/tests/lib.rs +++ b/src/garage/tests/lib.rs @@ -8,3 +8,5 @@ mod s3; #[cfg(feature = "k2v")] mod k2v; +#[cfg(feature = "k2v")] +mod k2v_client; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 27e85651..2ccb9fe5 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k2v-client" -version = "0.1.1" +version = "0.0.4" authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -10,24 +10,28 @@ readme = "../../README.md" [dependencies] base64 = "0.21" +sha2 = "0.10" +hex = "0.4" http = "0.2" log = "0.4" -rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } -rusoto_credential = "0.48.0" -rusoto_signature = "0.48.0" -hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] } -serde = "1.0" +aws-sigv4 = "0.55" +percent-encoding = "2.2" +hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] } +hyper-rustls = { version = "0.24", features = ["http2"] } +serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" thiserror = "1.0" -tokio = "1.24" +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } # cli deps clap = { version = "4.1", optional = true, features = ["derive", "env"] } -garage_util = { workspace = true, optional = true } +format_table = { workspace = true, optional = true } +tracing = { version = "0.1", optional = true } +tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] } [features] -cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"] +cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"] [lib] path = "lib.rs" diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs index cdd63cce..5a2422ab 100644 --- a/src/k2v-client/bin/k2v-cli.rs +++ b/src/k2v-client/bin/k2v-cli.rs @@ -2,12 +2,11 @@ use std::collections::BTreeMap; use std::process::exit; use std::time::Duration; -use k2v_client::*; +use base64::prelude::*; -use garage_util::formater::format_table; +use k2v_client::*; -use rusoto_core::credential::AwsCredentials; -use rusoto_core::Region; +use format_table::format_table; use clap::{Parser, Subcommand}; @@ -155,7 +154,9 @@ impl Value { if let Some(ref text) = self.text { Ok(text.as_bytes().to_vec()) } else if let Some(ref b64) = self.b64 { - base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) + BASE64_STANDARD + .decode(b64) + .map_err(|_| Error::Message("invalid base64 input".into())) } else if let Some(ref path) = self.file { use tokio::io::AsyncReadExt; if path == "-" { @@ -230,7 +231,7 @@ impl ReadOutputKind { for val in val.value { match val { K2vValue::Value(v) => { - println!("{}", base64::encode(&v)) + println!("{}", BASE64_STANDARD.encode(&v)) } K2vValue::Tombstone => { println!(); @@ -249,7 +250,7 @@ impl ReadOutputKind { if let Ok(string) = std::str::from_utf8(&v) { println!(" utf-8: {}", string); } else { - println!(" base64: {}", base64::encode(&v)); + println!(" base64: {}", BASE64_STANDARD.encode(&v)); } } K2vValue::Tombstone => { @@ -275,7 +276,7 @@ struct BatchOutputKind { impl BatchOutputKind { fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! { for (key, values) in values { - println!("key: {}", key); + println!("sort_key: {}", key); let causality: String = values.causality.into(); println!("causality: {}", causality); for value in values.value { @@ -284,7 +285,7 @@ impl BatchOutputKind { if let Ok(string) = std::str::from_utf8(&v) { println!(" value(utf-8): {}", string); } else { - println!(" value(base64): {}", base64::encode(&v)); + println!(" value(base64): {}", BASE64_STANDARD.encode(&v)); } } K2vValue::Tombstone => { @@ -393,16 +394,27 @@ impl Filter { #[tokio::main] async fn main() -> Result<(), Error> { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "warn") + } + + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env()) + .init(); + let args = Args::parse(); - let region = Region::Custom { - name: args.region, + let config = K2vClientConfig { endpoint: args.endpoint, + region: args.region, + aws_access_key_id: args.key_id, + aws_secret_access_key: args.secret, + bucket: args.bucket, + user_agent: None, }; - let creds = AwsCredentials::new(args.key_id, args.secret, None, None); - - let client = K2vClient::new(region, args.bucket, creds, None)?; + let client = K2vClient::new(config)?; match args.command { Command::Insert { @@ -520,7 +532,7 @@ async fn main() -> Result<(), Error> { value .as_object_mut() .unwrap() - .insert("sort_key".to_owned(), k.into()); + .insert("partition_key".to_owned(), k.into()); value }) .collect::<Vec<_>>(); @@ -537,7 +549,7 @@ async fn main() -> Result<(), Error> { } let mut to_print = Vec::new(); - to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes")); + to_print.push(format!("partition_key\tentries\tconflicts\tvalues\tbytes")); for (k, v) in res.items { to_print.push(format!( "{}\t{}\t{}\t{}\t{}", diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs index 37c221f2..564ce497 100644 --- a/src/k2v-client/error.rs +++ b/src/k2v-client/error.rs @@ -18,12 +18,20 @@ pub enum Error { NotFound, #[error("io error: {0}")] IoError(#[from] std::io::Error), - #[error("rusoto tls error: {0}")] - RusotoTls(#[from] rusoto_core::request::TlsError), - #[error("rusoto http error: {0}")] - RusotoHttp(#[from] rusoto_core::HttpDispatchError), + #[error("http error: {0}")] + Http(#[from] http::Error), + #[error("hyper error: {0}")] + Hyper(#[from] hyper::Error), + #[error("invalid header: {0}")] + Header(#[from] hyper::header::ToStrError), #[error("deserialization error: {0}")] Deserialization(#[from] serde_json::Error), + #[error("invalid signature parameters: {0}")] + SignParameters(#[from] aws_sigv4::signing_params::BuildError), + #[error("could not sign request: {0}")] + SignRequest(#[from] aws_sigv4::http_request::SigningError), + #[error("request timed out")] + Timeout, #[error("{0}")] Message(Cow<'static, str>), } diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index ca52d0cf..425c351f 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -1,20 +1,23 @@ use std::collections::BTreeMap; -use std::time::Duration; +use std::convert::TryInto; +use std::time::{Duration, SystemTime}; -use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; -use http::status::StatusCode; -use http::HeaderMap; +use base64::prelude::*; use log::{debug, error}; +use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC}; + +use http::header::{ACCEPT, CONTENT_TYPE}; +use http::status::StatusCode; +use http::{HeaderName, HeaderValue, Request}; +use hyper::{body::Bytes, Body}; +use hyper::{client::connect::HttpConnector, Client as HttpClient}; +use hyper_rustls::HttpsConnector; + +use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings}; -use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient}; -use rusoto_credential::AwsCredentials; -use rusoto_signature::region::Region; -use rusoto_signature::signature::SignedRequest; use serde::de::Error as DeError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tokio::io::AsyncReadExt; - mod error; pub use error::Error; @@ -22,41 +25,57 @@ pub use error::Error; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const SERVICE: &str = "k2v"; -const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; +const AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); +const GARAGE_CAUSALITY_TOKEN: HeaderName = HeaderName::from_static("x-garage-causality-token"); + +const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC + .remove(b'_') + .remove(b'-') + .remove(b'.') + .remove(b'~'); +const PATH_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC + .remove(b'/') + .remove(b'_') + .remove(b'-') + .remove(b'.') + .remove(b'~'); + +pub struct K2vClientConfig { + pub endpoint: String, + pub region: String, + pub aws_access_key_id: String, + pub aws_secret_access_key: String, + pub bucket: String, + pub user_agent: Option<String>, +} /// Client used to query a K2V server. pub struct K2vClient { - region: Region, - bucket: String, - creds: AwsCredentials, - client: HttpClient, + config: K2vClientConfig, + user_agent: HeaderValue, + client: HttpClient<HttpsConnector<HttpConnector>>, } impl K2vClient { /// Create a new K2V client. - pub fn new( - region: Region, - bucket: String, - creds: AwsCredentials, - user_agent: Option<String>, - ) -> Result<Self, Error> { + pub fn new(config: K2vClientConfig) -> Result<Self, Error> { let connector = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() .https_or_http() .enable_http1() .enable_http2() .build(); - let mut client = HttpClient::from_connector(connector); - if let Some(ua) = user_agent { - client.local_agent_prepend(ua); - } else { - client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); - } + let client = HttpClient::builder().build(connector); + let user_agent: std::borrow::Cow<str> = match &config.user_agent { + Some(ua) => ua.into(), + None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(), + }; + let user_agent = HeaderValue::from_str(&user_agent) + .map_err(|_| Error::Message("invalid user agent".into()))?; Ok(K2vClient { - region, - bucket, - creds, + config, client, + user_agent, }) } @@ -66,15 +85,10 @@ impl K2vClient { partition_key: &str, sort_key: &str, ) -> Result<CausalValue, Error> { - let mut req = SignedRequest::new( - "GET", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_header(ACCEPT, "application/octet-stream, application/json"); - + let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]); + let req = Request::get(url) + .header(ACCEPT, "application/octet-stream, application/json") + .body(Bytes::new())?; let res = self.dispatch(req, None).await?; let causality = res @@ -91,7 +105,7 @@ impl K2vClient { match res.content_type.as_deref() { Some("application/octet-stream") => Ok(CausalValue { causality, - value: vec![K2vValue::Value(res.body)], + value: vec![K2vValue::Value(res.body.to_vec())], }), Some("application/json") => { let value = serde_json::from_slice(&res.body)?; @@ -115,16 +129,17 @@ impl K2vClient { ) -> Result<Option<CausalValue>, Error> { let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); - let mut req = SignedRequest::new( - "GET", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), + let url = self.build_url( + Some(partition_key), + &[ + ("sort_key", sort_key), + ("causality_token", &causality.0), + ("timeout", &timeout.as_secs().to_string()), + ], ); - req.add_param("sort_key", sort_key); - req.add_param("causality_token", &causality.0); - req.add_param("timeout", &timeout.as_secs().to_string()); - req.add_header(ACCEPT, "application/octet-stream, application/json"); + let req = Request::get(url) + .header(ACCEPT, "application/octet-stream, application/json") + .body(Bytes::new())?; let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; @@ -146,7 +161,7 @@ impl K2vClient { match res.content_type.as_deref() { Some("application/octet-stream") => Ok(Some(CausalValue { causality, - value: vec![K2vValue::Value(res.body)], + value: vec![K2vValue::Value(res.body.to_vec())], })), Some("application/json") => { let value = serde_json::from_slice(&res.body)?; @@ -176,16 +191,10 @@ impl K2vClient { timeout: timeout.as_secs(), }; - let mut req = SignedRequest::new( - "POST", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("poll_range", ""); - + let url = self.build_url(Some(partition_key), &[("poll_range", "")]); let payload = serde_json::to_vec(&request)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(Bytes::from(payload))?; + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; if res.status == StatusCode::NOT_MODIFIED { @@ -219,18 +228,12 @@ impl K2vClient { value: Vec<u8>, causality: Option<CausalityToken>, ) -> Result<(), Error> { - let mut req = SignedRequest::new( - "PUT", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.set_payload(Some(value)); - + let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]); + let mut req = Request::put(url); if let Some(causality) = causality { - req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + req = req.header(GARAGE_CAUSALITY_TOKEN, &causality.0); } + let req = req.body(Bytes::from(value))?; self.dispatch(req, None).await?; Ok(()) @@ -243,14 +246,10 @@ impl K2vClient { sort_key: &str, causality: CausalityToken, ) -> Result<(), Error> { - let mut req = SignedRequest::new( - "DELETE", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]); + let req = Request::delete(url) + .header(GARAGE_CAUSALITY_TOKEN, &causality.0) + .body(Bytes::new())?; self.dispatch(req, None).await?; Ok(()) @@ -262,9 +261,9 @@ impl K2vClient { &self, filter: Filter<'_>, ) -> Result<PaginatedRange<PartitionInfo>, Error> { - let mut req = - SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); - filter.insert_params(&mut req); + let params = filter.query_params(); + let url = self.build_url(None, ¶ms); + let req = Request::get(url).body(Bytes::new())?; let res = self.dispatch(req, None).await?; @@ -286,11 +285,10 @@ impl K2vClient { /// *not* atomic: it is possible for some sub-operations to fails and others to success. In /// that case, failure is reported. pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - + let url = self.build_url::<&str>(None, &[]); let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(payload.into())?; + self.dispatch(req, None).await?; Ok(()) } @@ -300,12 +298,10 @@ impl K2vClient { &self, operations: &[BatchReadOp<'_>], ) -> Result<Vec<PaginatedRange<CausalValue>>, Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - req.add_param("search", ""); - + let url = self.build_url(None, &[("search", "")]); let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(payload.into())?; + let res = self.dispatch(req, None).await?; let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?; @@ -334,12 +330,10 @@ impl K2vClient { /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without /// providing causality information. pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - req.add_param("delete", ""); - + let url = self.build_url(None, &[("delete", "")]); let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(payload.into())?; + let res = self.dispatch(req, None).await?; let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?; @@ -349,33 +343,67 @@ impl K2vClient { async fn dispatch( &self, - mut req: SignedRequest, + mut req: Request<Bytes>, timeout: Option<Duration>, ) -> Result<Response, Error> { - req.sign(&self.creds); - let mut res = self - .client - .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT))) - .await?; - - let causality_token = res - .headers - .remove(GARAGE_CAUSALITY_TOKEN) - .map(CausalityToken); - let content_type = res.headers.remove(CONTENT_TYPE); + req.headers_mut() + .insert(http::header::USER_AGENT, self.user_agent.clone()); + + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(req.body()); + let hash = hex::encode(&hasher.finalize()); + req.headers_mut() + .insert(AMZ_CONTENT_SHA256, hash.try_into().unwrap()); + + debug!("request uri: {:?}", req.uri()); + + // Sign request + let signing_settings = SigningSettings::default(); + let signing_params = SigningParams::builder() + .access_key(&self.config.aws_access_key_id) + .secret_key(&self.config.aws_secret_access_key) + .region(&self.config.region) + .service_name(SERVICE) + .time(SystemTime::now()) + .settings(signing_settings) + .build()?; + // Convert the HTTP request into a signable request + let signable_request = SignableRequest::from(&req); + + // Sign and then apply the signature to the request + let (signing_instructions, _signature) = + sign(signable_request, &signing_params)?.into_parts(); + signing_instructions.apply_to_request(&mut req); + + // Send and wait for timeout + let res = tokio::select! { + res = self.client.request(req.map(Body::from)) => res?, + _ = tokio::time::sleep(timeout.unwrap_or(DEFAULT_TIMEOUT)) => { + return Err(Error::Timeout); + } + }; + + let (mut res, body) = res.into_parts(); + let causality_token = match res.headers.remove(GARAGE_CAUSALITY_TOKEN) { + Some(v) => Some(CausalityToken(v.to_str()?.to_string())), + None => None, + }; + let content_type = match res.headers.remove(CONTENT_TYPE) { + Some(v) => Some(v.to_str()?.to_string()), + None => None, + }; let body = match res.status { - StatusCode::OK => read_body(&mut res.headers, res.body).await?, - StatusCode::NO_CONTENT => Vec::new(), + StatusCode::OK => hyper::body::to_bytes(body).await?, + StatusCode::NO_CONTENT => Bytes::new(), StatusCode::NOT_FOUND => return Err(Error::NotFound), - StatusCode::NOT_MODIFIED => Vec::new(), + StatusCode::NOT_MODIFIED => Bytes::new(), s => { - let err_body = read_body(&mut res.headers, res.body) - .await - .unwrap_or_default(); + let err_body = hyper::body::to_bytes(body).await.unwrap_or_default(); let err_body_str = std::str::from_utf8(&err_body) .map(String::from) - .unwrap_or_else(|_| base64::encode(&err_body)); + .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body)); if s.is_client_error() || s.is_server_error() { error!("Error response {}: {}", res.status, err_body_str); @@ -408,7 +436,7 @@ impl K2vClient { "Response body: {}", std::str::from_utf8(&body) .map(String::from) - .unwrap_or_else(|_| base64::encode(&body)) + .unwrap_or_else(|_| BASE64_STANDARD.encode(&body)) ); Ok(Response { @@ -418,16 +446,26 @@ impl K2vClient { content_type, }) } -} -async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> { - let body_len = headers - .get(CONTENT_LENGTH) - .and_then(|h| h.parse().ok()) - .unwrap_or(0); - let mut res = Vec::with_capacity(body_len); - body.into_async_read().read_to_end(&mut res).await?; - Ok(res) + fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String { + let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket); + if let Some(pk) = partition_key { + url.push('/'); + url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET)); + } + if !query.is_empty() { + url.push('?'); + for (i, (k, v)) in query.iter().enumerate() { + if i > 0 { + url.push('&'); + } + url.extend(utf8_percent_encode(k, &STRICT_ENCODE_SET)); + url.push('='); + url.extend(utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET)); + } + } + url + } } /// An opaque token used to convey causality between operations. @@ -482,9 +520,11 @@ impl<'de> Deserialize<'de> for K2vValue { { let val: Option<&str> = Option::deserialize(d)?; Ok(match val { - Some(s) => { - K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) - } + Some(s) => K2vValue::Value( + BASE64_STANDARD + .decode(s) + .map_err(|_| DeError::custom("invalid base64"))?, + ), None => K2vValue::Tombstone, }) } @@ -498,7 +538,7 @@ impl Serialize for K2vValue { match self { K2vValue::Tombstone => serializer.serialize_none(), K2vValue::Value(v) => { - let b64 = base64::encode(v); + let b64 = BASE64_STANDARD.encode(v); serializer.serialize_str(&b64) } } @@ -554,22 +594,24 @@ struct PollRangeResponse { } impl<'a> Filter<'a> { - fn insert_params(&self, req: &mut SignedRequest) { - if let Some(start) = &self.start { - req.add_param("start", start); + fn query_params(&self) -> Vec<(&'static str, std::borrow::Cow<str>)> { + let mut res = Vec::<(&'static str, std::borrow::Cow<str>)>::with_capacity(8); + if let Some(start) = self.start.as_deref() { + res.push(("start", start.into())); } - if let Some(end) = &self.end { - req.add_param("end", end); + if let Some(end) = self.end.as_deref() { + res.push(("end", end.into())); } - if let Some(prefix) = &self.prefix { - req.add_param("prefix", prefix); + if let Some(prefix) = self.prefix.as_deref() { + res.push(("prefix", prefix.into())); } if let Some(limit) = &self.limit { - req.add_param("limit", &limit.to_string()); + res.push(("limit", limit.to_string().into())); } if self.reverse { - req.add_param("reverse", "true"); + res.push(("reverse", "true".into())); } + res } } @@ -691,7 +733,7 @@ struct ErrorResponse { } struct Response { - body: Vec<u8>, + body: Bytes, status: StatusCode, causality_token: Option<CausalityToken>, content_type: Option<String>, diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index 8fe3a582..59d4ca5b 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -79,7 +79,7 @@ impl RangeSeenMarker { let bytes = nonversioned_encode(&self)?; let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; - Ok(BASE64_STANDARD.encode(&bytes)) + Ok(BASE64_STANDARD.encode(bytes)) } /// Decode from msgpack+zstd+b64 representation, returns None on error. diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index f85f789c..ab8d1112 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -8,16 +8,26 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; +use garage_util::config::ConsulDiscoveryAPI; use garage_util::config::ConsulDiscoveryConfig; +const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; + #[derive(Deserialize, Clone, Debug)] struct ConsulQueryEntry { #[serde(rename = "Address")] address: String, #[serde(rename = "ServicePort")] service_port: u16, - #[serde(rename = "NodeMeta")] - node_meta: HashMap<String, String>, + #[serde(rename = "ServiceMeta")] + meta: HashMap<String, String>, +} + +#[derive(Serialize, Clone, Debug)] +#[serde(untagged)] +enum PublishRequest { + Catalog(ConsulPublishEntry), + Service(ConsulPublishService), } #[derive(Serialize, Clone, Debug)] @@ -26,28 +36,43 @@ struct ConsulPublishEntry { node: String, #[serde(rename = "Address")] address: IpAddr, - #[serde(rename = "NodeMeta")] - node_meta: HashMap<String, String>, #[serde(rename = "Service")] - service: ConsulPublishService, + service: ConsulPublishCatalogService, } #[derive(Serialize, Clone, Debug)] -struct ConsulPublishService { +struct ConsulPublishCatalogService { #[serde(rename = "ID")] service_id: String, #[serde(rename = "Service")] service_name: String, #[serde(rename = "Tags")] tags: Vec<String>, + #[serde(rename = "Meta")] + meta: HashMap<String, String>, #[serde(rename = "Address")] address: IpAddr, #[serde(rename = "Port")] port: u16, } -// ---- +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Name")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec<String>, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, + #[serde(rename = "Meta")] + meta: HashMap<String, String>, +} +// ---- pub struct ConsulDiscovery { config: ConsulDiscoveryConfig, client: reqwest::Client, @@ -55,44 +80,48 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { - let client = match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - if config.tls_skip_verify { - reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()? - } else if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - - reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()? - } else { - reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()? + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls(); + if config.tls_skip_verify { + builder = builder.danger_accept_invalid_certs(true); + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = + builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + } + + match &config.api { + ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + builder = builder.identity(identity); + } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + }, + ConsulDiscoveryAPI::Agent => { + if let Some(token) = &config.token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + "x-consul-token", + reqwest::header::HeaderValue::from_str(&token)?, + ); + builder = builder.default_headers(headers); } } - (None, None) => reqwest::Client::new(), - _ => return Err(ConsulError::InvalidTLSConfig), }; + let client: reqwest::Client = builder.build()?; + Ok(Self { client, config }) } @@ -111,8 +140,8 @@ impl ConsulDiscovery { for ent in entries { let ip = ent.address.parse::<IpAddr>().ok(); let pubkey = ent - .node_meta - .get("pubkey") + .meta + .get(&format!("{}-pubkey", META_PREFIX)) .and_then(|k| hex::decode(k).ok()) .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { @@ -138,29 +167,49 @@ impl ConsulDiscovery { rpc_public_addr: SocketAddr, ) -> Result<(), ConsulError> { let node = format!("garage:{}", hex::encode(&node_id[..8])); + let tags = [ + vec!["advertised-by-garage".into(), hostname.into()], + self.config.tags.clone(), + ] + .concat(); - let advertisement = ConsulPublishEntry { - node: node.clone(), - address: rpc_public_addr.ip(), - node_meta: [ - ("pubkey".to_string(), hex::encode(node_id)), - ("hostname".to_string(), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), - service: ConsulPublishService { + let mut meta = self.config.meta.clone().unwrap_or_default(); + meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id)); + meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string()); + + let url = format!( + "{}/v1/{}", + self.config.consul_http_addr, + (match &self.config.api { + ConsulDiscoveryAPI::Catalog => "catalog/register", + ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks", + }) + ); + + let req = self.client.put(&url); + let advertisement: PublishRequest = match &self.config.api { + ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + service: ConsulPublishCatalogService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + meta: meta.clone(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }), + ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), - tags: vec!["advertised-by-garage".into(), hostname.into()], + tags, + meta, address: rpc_public_addr.ip(), port: rpc_public_addr.port(), - }, + }), }; - - let url = format!("{}/v1/catalog/register", self.config.consul_http_addr); - - let http = self.client.put(&url).json(&advertisement).send().await?; + let http = req.json(&advertisement).send().await?; http.error_for_status()?; Ok(()) @@ -176,4 +225,6 @@ pub enum ConsulError { Reqwest(#[error(source)] reqwest::Error), #[error(display = "Invalid Consul TLS configuration")] InvalidTLSConfig, + #[error(display = "Token error: {}", _0)] + Token(#[error(source)] reqwest::header::InvalidHeaderValue), } diff --git a/src/table/data.rs b/src/table/data.rs index 73fa93c8..26101da4 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -44,22 +44,22 @@ pub struct TableData<F: TableSchema, R: TableReplication> { impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db - .open_tree(&format!("{}:table", F::TABLE_NAME)) + .open_tree(format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); let merkle_tree = db - .open_tree(&format!("{}:merkle_tree", F::TABLE_NAME)) + .open_tree(format!("{}:merkle_tree", F::TABLE_NAME)) .expect("Unable to open DB Merkle tree tree"); let merkle_todo = db - .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) + .open_tree(format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); let insert_queue = db - .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .open_tree(format!("{}:insert_queue", F::TABLE_NAME)) .expect("Unable to open insert queue DB tree"); let gc_todo = db - .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) + .open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); @@ -90,7 +90,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { + if let Some(bytes) = self.store.get(tree_key)? { Ok(Some(ByteBuf::from(bytes.to_vec()))) } else { Ok(None) @@ -132,10 +132,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { } } - fn read_range_aux<'a>( + fn read_range_aux( &self, partition_hash: Hash, - range: db::ValueIter<'a>, + range: db::ValueIter, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { diff --git a/src/table/util.rs b/src/table/util.rs index 0b10cf3f..663a7e11 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -34,8 +34,9 @@ impl DeletedFilter { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] pub enum EnumerationOrder { + #[default] Forward, Reverse, } @@ -49,9 +50,3 @@ impl EnumerationOrder { } } } - -impl Default for EnumerationOrder { - fn default() -> Self { - EnumerationOrder::Forward - } -} diff --git a/src/util/config.rs b/src/util/config.rs index 009f0574..eeb17e0e 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -142,8 +142,19 @@ pub struct AdminConfig { pub trace_sink: Option<String>, } +#[derive(Deserialize, Debug, Clone, Default)] +#[serde(rename_all = "lowercase")] +pub enum ConsulDiscoveryAPI { + #[default] + Catalog, + Agent, +} + #[derive(Deserialize, Debug, Clone)] pub struct ConsulDiscoveryConfig { + /// The consul api to use when registering: either `catalog` (the default) or `agent` + #[serde(default)] + pub api: ConsulDiscoveryAPI, /// Consul http or https address to connect to to discover more peers pub consul_http_addr: String, /// Consul service name to use @@ -154,9 +165,17 @@ pub struct ConsulDiscoveryConfig { pub client_cert: Option<String>, /// Client TLS key to use when connecting to Consul pub client_key: Option<String>, + /// /// Token to use for connecting to consul + pub token: Option<String>, /// Skip TLS hostname verification #[serde(default)] pub tls_skip_verify: bool, + /// Additional tags to add to the service + #[serde(default)] + pub tags: Vec<String>, + /// Additional service metadata to add + #[serde(default)] + pub meta: Option<std::collections::HashMap<String, String>>, } #[derive(Deserialize, Debug, Clone)] @@ -230,7 +249,7 @@ fn secret_from_file( #[cfg(unix)] if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") { use std::os::unix::fs::MetadataExt; - let metadata = std::fs::metadata(&file_path)?; + let metadata = std::fs::metadata(file_path)?; if metadata.mode() & 0o077 != 0 { return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into()); } @@ -351,7 +370,7 @@ mod tests { replication_mode = "3" rpc_bind_addr = "[::]:3901" rpc_secret_file = "{}" - + [s3_api] s3_region = "garage" api_bind_addr = "[::]:3900" @@ -395,7 +414,7 @@ mod tests { rpc_bind_addr = "[::]:3901" rpc_secret= "dummy" rpc_secret_file = "dummy" - + [s3_api] s3_region = "garage" api_bind_addr = "[::]:3900" diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs index 6ae275aa..12f76434 100644 --- a/src/util/forwarded_headers.rs +++ b/src/util/forwarded_headers.rs @@ -13,7 +13,7 @@ pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result< .to_str() .ok_or_message("Error parsing X-Forwarded-For header")?; - let client_ip = IpAddr::from_str(&forwarded_for_ip_str) + let client_ip = IpAddr::from_str(forwarded_for_ip_str) .ok_or_message("Valid IP address not found in X-Forwarded-For header")?; Ok(client_ip.to_string()) diff --git a/src/util/lib.rs b/src/util/lib.rs index c9110fb2..15f0f829 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -10,7 +10,6 @@ pub mod crdt; pub mod data; pub mod encode; pub mod error; -pub mod formater; pub mod forwarded_headers; pub mod metrics; pub mod migrate; |