aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-06-13 17:02:42 +0200
committerAlex Auvolat <alex@adnab.me>2023-06-13 17:14:11 +0200
commit90b2d43eb49d49c3aef4f501a30cf2f181adb183 (patch)
treef7f7c15547e2f2072a1841fcb2350c272faf3c9d /src
parentbf19a44fd93584d5250a2e98e5b1d3a2de6d59d1 (diff)
parent01346143ca09eab262f0d8f8a0a744c2f6d667cc (diff)
downloadgarage-90b2d43eb49d49c3aef4f501a30cf2f181adb183.tar.gz
garage-90b2d43eb49d49c3aef4f501a30cf2f181adb183.zip
Merge branch 'main' into next
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/api_server.rs2
-rw-r--r--src/api/generic_server.rs2
-rw-r--r--src/api/k2v/batch.rs4
-rw-r--r--src/api/signature/payload.rs44
-rw-r--r--src/block/Cargo.toml2
-rw-r--r--src/block/repair.rs14
-rw-r--r--src/format-table/Cargo.toml12
-rw-r--r--src/format-table/README.md13
-rw-r--r--src/format-table/lib.rs (renamed from src/util/formater.rs)17
-rw-r--r--src/garage/Cargo.toml3
-rw-r--r--src/garage/admin/mod.rs3
-rw-r--r--src/garage/cli/cmd.rs2
-rw-r--r--src/garage/cli/layout.rs2
-rw-r--r--src/garage/cli/util.rs9
-rw-r--r--src/garage/tests/common/custom_requester.rs1
-rw-r--r--src/garage/tests/common/mod.rs14
-rw-r--r--src/garage/tests/k2v_client/mod.rs1
-rw-r--r--src/garage/tests/k2v_client/simple.rs60
-rw-r--r--src/garage/tests/lib.rs2
-rw-r--r--src/k2v-client/Cargo.toml22
-rw-r--r--src/k2v-client/bin/k2v-cli.rs44
-rw-r--r--src/k2v-client/error.rs16
-rw-r--r--src/k2v-client/lib.rs316
-rw-r--r--src/model/k2v/seen.rs2
-rw-r--r--src/rpc/consul.rs171
-rw-r--r--src/table/data.rs16
-rw-r--r--src/table/util.rs9
-rw-r--r--src/util/config.rs25
-rw-r--r--src/util/forwarded_headers.rs2
-rw-r--r--src/util/lib.rs1
30 files changed, 560 insertions, 271 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 58dd38d8..b0dfdfb7 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -105,7 +105,7 @@ impl AdminApiServer {
let bucket_id = self
.garage
.bucket_helper()
- .resolve_global_bucket_name(&domain)
+ .resolve_global_bucket_name(domain)
.await?
.ok_or(HelperError::NoSuchBucket(domain.to_string()))?;
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index d0354d28..757b85ec 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -128,7 +128,7 @@ impl<A: ApiHandler> ApiServer<A> {
let uri = req.uri().clone();
if let Ok(forwarded_for_ip_addr) =
- forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ forwarded_headers::handle_forwarded_for_headers(req.headers())
{
info!(
"{} (via {}) {} {}",
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 26d678da..294380ea 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -282,8 +282,8 @@ pub(crate) async fn handle_poll_range(
if let Some((items, seen_marker)) = resp {
let resp = PollRangeResponse {
items: items
- .into_iter()
- .map(|(_k, i)| ReadBatchResponseItem::from(i))
+ .into_values()
+ .map(ReadBatchResponseItem::from)
.collect::<Vec<_>>(),
seen_marker,
};
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index 4c7934e5..b50fb3bb 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -19,7 +19,7 @@ use crate::signature::error::*;
pub async fn check_payload_signature(
garage: &Garage,
- service: &str,
+ service: &'static str,
request: &Request<Body>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
@@ -51,6 +51,7 @@ pub async fn check_payload_signature(
};
let canonical_request = canonical_request(
+ service,
request.method(),
request.uri(),
&headers,
@@ -184,7 +185,7 @@ fn parse_query_authorization(
if duration > 7 * 24 * 3600 {
return Err(Error::bad_request(
- "X-Amz-Exprires may not exceed a week".to_string(),
+ "X-Amz-Expires may not exceed a week".to_string(),
));
}
@@ -210,7 +211,7 @@ fn parse_query_authorization(
fn parse_credential(cred: &str) -> Result<(String, String), Error> {
let first_slash = cred
.find('/')
- .ok_or_bad_request("Credentials does not contain / in authorization field")?;
+ .ok_or_bad_request("Credentials does not contain '/' in authorization field")?;
let (key_id, scope) = cred.split_at(first_slash);
Ok((
key_id.to_string(),
@@ -231,15 +232,50 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re
}
pub fn canonical_request(
+ service: &'static str,
method: &Method,
uri: &hyper::Uri,
headers: &HashMap<String, String>,
signed_headers: &str,
content_sha256: &str,
) -> String {
+ // There seems to be evidence that in AWSv4 signatures, the path component is url-encoded
+ // a second time when building the canonical request, as specified in this documentation page:
+ // -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html
+ // However this documentation page is for a specific service ("roles anywhere"), and
+ // in the S3 service we know for a fact that there is no double-urlencoding, because all of
+ // the tests we made with external software work without it.
+ //
+ // The theory is that double-urlencoding occurs for all services except S3,
+ // which is what is implemented in rusoto_signature:
+ // -> https://docs.rs/rusoto_signature/latest/src/rusoto_signature/signature.rs.html#464
+ //
+ // Digging into the code of the official AWS Rust SDK, we learn that double-URI-encoding can
+ // be set or unset on a per-request basis (the signature crates, aws-sigv4 and aws-sig-auth,
+ // are agnostic to this). Grepping the codebase confirms that S3 is the only API for which
+ // double_uri_encode is set to false, meaning it is true (its default value) for all other
+ // AWS services. We will therefore implement this behavior in Garage as well.
+ //
+ // Note that this documentation page, which is touted as the "authoritative reference" on
+ // AWSv4 signatures, makes no mention of either single- or double-urlencoding:
+ // -> https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
+ // This page of the S3 documentation does also not mention anything specific:
+ // -> https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
+ //
+ // Note that there is also the issue of path normalization, which I hope is unrelated to the
+ // one of URI-encoding. At least in aws-sigv4 both parameters can be set independently,
+ // and rusoto_signature does not seem to do any effective path normalization, even though
+ // it mentions it in the comments (same link to the souce code as above).
+ // We make the explicit choice of NOT normalizing paths in the K2V API because doing so
+ // would make non-normalized paths invalid K2V partition keys, and we don't want that.
+ let path: std::borrow::Cow<str> = if service != "s3" {
+ uri_encode(uri.path(), false).into()
+ } else {
+ uri.path().into()
+ };
[
method.as_str(),
- uri.path(),
+ &path,
&canonical_query_string(uri),
&canonical_header_string(headers, signed_headers),
"",
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 3bf1c40a..1f8d738b 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -37,7 +37,7 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-util = { version = "0.6", features = ["io"] }
+tokio-util = { version = "0.7", features = ["io"] }
[features]
system-libs = [ "zstd/pkg-config" ]
diff --git a/src/block/repair.rs b/src/block/repair.rs
index c89484d9..71093d69 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -220,14 +220,12 @@ fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
// Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
// balance scrub load across different cluster nodes.
- let next_run_timestamp = timestamp
+ timestamp
+ SCRUB_INTERVAL
.saturating_add(Duration::from_secs(
rand::thread_rng().gen_range(0..3600 * 24 * 10),
))
- .as_millis() as u64;
-
- next_run_timestamp
+ .as_millis() as u64
}
impl Default for ScrubWorkerPersisted {
@@ -241,18 +239,14 @@ impl Default for ScrubWorkerPersisted {
}
}
+#[derive(Default)]
enum ScrubWorkerState {
Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ #[default]
Finished,
}
-impl Default for ScrubWorkerState {
- fn default() -> Self {
- ScrubWorkerState::Finished
- }
-}
-
#[derive(Debug)]
pub enum ScrubWorkerCommand {
Start,
diff --git a/src/format-table/Cargo.toml b/src/format-table/Cargo.toml
new file mode 100644
index 00000000..9e31e211
--- /dev/null
+++ b/src/format-table/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "format_table"
+version = "0.1.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Format tables with a stupid API"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "README.md"
+
+[lib]
+path = "lib.rs"
diff --git a/src/format-table/README.md b/src/format-table/README.md
new file mode 100644
index 00000000..d918bdf4
--- /dev/null
+++ b/src/format-table/README.md
@@ -0,0 +1,13 @@
+# `format_table`
+
+Format tables with a stupid API. [Documentation](https://docs.rs/format_table).
+
+Example:
+
+```rust
+let mut table = vec!["product\tquantity\tprice".to_string()];
+for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] {
+ table.push(format!("{}\t{}\t{}", p, q, r));
+}
+format_table::format_table(table);
+```
diff --git a/src/util/formater.rs b/src/format-table/lib.rs
index 2ea53ebb..55252ba9 100644
--- a/src/util/formater.rs
+++ b/src/format-table/lib.rs
@@ -1,3 +1,19 @@
+//! Format tables with a stupid API.
+//!
+//! Example:
+//!
+//! ```rust
+//! let mut table = vec!["product\tquantity\tprice".to_string()];
+//! for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] {
+//! table.push(format!("{}\t{}\t{}", p, q, r));
+//! }
+//! format_table::format_table(table);
+//! ```
+//!
+//! A table to be formatted is a `Vec<String>`, containing one string per line.
+//! Table columns in each line are separated by a `\t` character.
+
+/// Format a table and return the result as a string.
pub fn format_table_to_string(data: Vec<String>) -> String {
let data = data
.iter()
@@ -27,6 +43,7 @@ pub fn format_table_to_string(data: Vec<String>) -> String {
out
}
+/// Format a table and print the result to stdout.
pub fn format_table(data: Vec<String>) {
print!("{}", format_table_to_string(data));
}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 7f7c3287..f9fc206b 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -21,6 +21,7 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+format_table.workspace = true
garage_db.workspace = true
garage_api.workspace = true
garage_block.workspace = true
@@ -72,6 +73,8 @@ assert-json-diff = "2.0"
serde_json = "1.0"
base64 = "0.21"
+k2v-client.workspace = true
+
[features]
default = [ "bundled-libs", "metrics", "sled", "lmdb", "sqlite", "k2v" ]
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index 33c21eba..b6f9c426 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -9,10 +9,11 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use format_table::format_table_to_string;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
-use garage_util::formater::format_table_to_string;
use garage_table::replication::*;
use garage_table::*;
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 045f050c..48359614 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,8 +1,8 @@
use std::collections::HashSet;
use std::time::Duration;
+use format_table::format_table;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index cf8631a4..3932f115 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,8 +1,8 @@
use bytesize::ByteSize;
+use format_table::format_table;
use garage_util::crdt::Crdt;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index d87f9eab..2232d395 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::time::Duration;
+use format_table::format_table;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
@@ -383,13 +383,18 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
+ let next_try = if e.next_try > now {
+ tf2.convert(Duration::from_millis(e.next_try - now))
+ } else {
+ "asap".to_string()
+ };
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
- tf2.convert(Duration::from_millis(e.next_try - now))
+ next_try
));
}
format_table(table);
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 609eda97..4133bb8b 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -209,6 +209,7 @@ impl<'a> RequestBuilder<'a> {
all_headers.extend(self.unsigned_headers.clone());
let canonical_request = signature::payload::canonical_request(
+ self.service,
&self.method,
&Uri::try_from(&uri).unwrap(),
&all_headers,
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index eca3e42b..0b8c6755 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -1,5 +1,6 @@
use aws_sdk_s3::{Client, Region};
use ext::*;
+use k2v_client::K2vClient;
#[macro_use]
pub mod macros;
@@ -68,6 +69,19 @@ impl Context {
bucket_name
}
+
+ /// Build a K2vClient for a given bucket
+ pub fn k2v_client(&self, bucket: &str) -> K2vClient {
+ let config = k2v_client::K2vClientConfig {
+ region: REGION.to_string(),
+ endpoint: self.garage.k2v_uri().to_string(),
+ aws_access_key_id: self.key.id.clone(),
+ aws_secret_access_key: self.key.secret.clone(),
+ bucket: bucket.to_string(),
+ user_agent: None,
+ };
+ K2vClient::new(config).expect("Could not create K2V client")
+ }
}
pub fn context() -> Context {
diff --git a/src/garage/tests/k2v_client/mod.rs b/src/garage/tests/k2v_client/mod.rs
new file mode 100644
index 00000000..b252f36b
--- /dev/null
+++ b/src/garage/tests/k2v_client/mod.rs
@@ -0,0 +1 @@
+pub mod simple;
diff --git a/src/garage/tests/k2v_client/simple.rs b/src/garage/tests/k2v_client/simple.rs
new file mode 100644
index 00000000..1a3118ef
--- /dev/null
+++ b/src/garage/tests/k2v_client/simple.rs
@@ -0,0 +1,60 @@
+use std::time::Duration;
+
+use k2v_client::*;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ k2v_client
+ .insert_item("root", "test1", b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item("root", "test1").await.unwrap();
+
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+}
+
+#[tokio::test]
+async fn test_special_chars() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple-special-chars");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ let (pk, sk) = ("root@plépp", "≤≤««");
+ k2v_client
+ .insert_item(pk, sk, b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item(pk, sk).await.unwrap();
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+
+ // sleep a bit before read_index
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = k2v_client.read_index(Default::default()).await.unwrap();
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), pk);
+
+ let res = k2v_client
+ .read_batch(&[BatchReadOp {
+ partition_key: pk,
+ filter: Default::default(),
+ single_item: false,
+ conflicts_only: false,
+ tombstones: false,
+ }])
+ .await
+ .unwrap();
+ assert_eq!(res.len(), 1);
+ let res = &res[0];
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), sk);
+}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index 87be1327..e450baac 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -8,3 +8,5 @@ mod s3;
#[cfg(feature = "k2v")]
mod k2v;
+#[cfg(feature = "k2v")]
+mod k2v_client;
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 27e85651..2ccb9fe5 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "k2v-client"
-version = "0.1.1"
+version = "0.0.4"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -10,24 +10,28 @@ readme = "../../README.md"
[dependencies]
base64 = "0.21"
+sha2 = "0.10"
+hex = "0.4"
http = "0.2"
log = "0.4"
-rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
-rusoto_credential = "0.48.0"
-rusoto_signature = "0.48.0"
-hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
-serde = "1.0"
+aws-sigv4 = "0.55"
+percent-encoding = "2.2"
+hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] }
+hyper-rustls = { version = "0.24", features = ["http2"] }
+serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
thiserror = "1.0"
-tokio = "1.24"
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
# cli deps
clap = { version = "4.1", optional = true, features = ["derive", "env"] }
-garage_util = { workspace = true, optional = true }
+format_table = { workspace = true, optional = true }
+tracing = { version = "0.1", optional = true }
+tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] }
[features]
-cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"]
+cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"]
[lib]
path = "lib.rs"
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index cdd63cce..5a2422ab 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -2,12 +2,11 @@ use std::collections::BTreeMap;
use std::process::exit;
use std::time::Duration;
-use k2v_client::*;
+use base64::prelude::*;
-use garage_util::formater::format_table;
+use k2v_client::*;
-use rusoto_core::credential::AwsCredentials;
-use rusoto_core::Region;
+use format_table::format_table;
use clap::{Parser, Subcommand};
@@ -155,7 +154,9 @@ impl Value {
if let Some(ref text) = self.text {
Ok(text.as_bytes().to_vec())
} else if let Some(ref b64) = self.b64 {
- base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into()))
+ BASE64_STANDARD
+ .decode(b64)
+ .map_err(|_| Error::Message("invalid base64 input".into()))
} else if let Some(ref path) = self.file {
use tokio::io::AsyncReadExt;
if path == "-" {
@@ -230,7 +231,7 @@ impl ReadOutputKind {
for val in val.value {
match val {
K2vValue::Value(v) => {
- println!("{}", base64::encode(&v))
+ println!("{}", BASE64_STANDARD.encode(&v))
}
K2vValue::Tombstone => {
println!();
@@ -249,7 +250,7 @@ impl ReadOutputKind {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" utf-8: {}", string);
} else {
- println!(" base64: {}", base64::encode(&v));
+ println!(" base64: {}", BASE64_STANDARD.encode(&v));
}
}
K2vValue::Tombstone => {
@@ -275,7 +276,7 @@ struct BatchOutputKind {
impl BatchOutputKind {
fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
for (key, values) in values {
- println!("key: {}", key);
+ println!("sort_key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
@@ -284,7 +285,7 @@ impl BatchOutputKind {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
- println!(" value(base64): {}", base64::encode(&v));
+ println!(" value(base64): {}", BASE64_STANDARD.encode(&v));
}
}
K2vValue::Tombstone => {
@@ -393,16 +394,27 @@ impl Filter {
#[tokio::main]
async fn main() -> Result<(), Error> {
+ if std::env::var("RUST_LOG").is_err() {
+ std::env::set_var("RUST_LOG", "warn")
+ }
+
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
+ .init();
+
let args = Args::parse();
- let region = Region::Custom {
- name: args.region,
+ let config = K2vClientConfig {
endpoint: args.endpoint,
+ region: args.region,
+ aws_access_key_id: args.key_id,
+ aws_secret_access_key: args.secret,
+ bucket: args.bucket,
+ user_agent: None,
};
- let creds = AwsCredentials::new(args.key_id, args.secret, None, None);
-
- let client = K2vClient::new(region, args.bucket, creds, None)?;
+ let client = K2vClient::new(config)?;
match args.command {
Command::Insert {
@@ -520,7 +532,7 @@ async fn main() -> Result<(), Error> {
value
.as_object_mut()
.unwrap()
- .insert("sort_key".to_owned(), k.into());
+ .insert("partition_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>();
@@ -537,7 +549,7 @@ async fn main() -> Result<(), Error> {
}
let mut to_print = Vec::new();
- to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes"));
+ to_print.push(format!("partition_key\tentries\tconflicts\tvalues\tbytes"));
for (k, v) in res.items {
to_print.push(format!(
"{}\t{}\t{}\t{}\t{}",
diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs
index 37c221f2..564ce497 100644
--- a/src/k2v-client/error.rs
+++ b/src/k2v-client/error.rs
@@ -18,12 +18,20 @@ pub enum Error {
NotFound,
#[error("io error: {0}")]
IoError(#[from] std::io::Error),
- #[error("rusoto tls error: {0}")]
- RusotoTls(#[from] rusoto_core::request::TlsError),
- #[error("rusoto http error: {0}")]
- RusotoHttp(#[from] rusoto_core::HttpDispatchError),
+ #[error("http error: {0}")]
+ Http(#[from] http::Error),
+ #[error("hyper error: {0}")]
+ Hyper(#[from] hyper::Error),
+ #[error("invalid header: {0}")]
+ Header(#[from] hyper::header::ToStrError),
#[error("deserialization error: {0}")]
Deserialization(#[from] serde_json::Error),
+ #[error("invalid signature parameters: {0}")]
+ SignParameters(#[from] aws_sigv4::signing_params::BuildError),
+ #[error("could not sign request: {0}")]
+ SignRequest(#[from] aws_sigv4::http_request::SigningError),
+ #[error("request timed out")]
+ Timeout,
#[error("{0}")]
Message(Cow<'static, str>),
}
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index ca52d0cf..425c351f 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -1,20 +1,23 @@
use std::collections::BTreeMap;
-use std::time::Duration;
+use std::convert::TryInto;
+use std::time::{Duration, SystemTime};
-use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE};
-use http::status::StatusCode;
-use http::HeaderMap;
+use base64::prelude::*;
use log::{debug, error};
+use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
+
+use http::header::{ACCEPT, CONTENT_TYPE};
+use http::status::StatusCode;
+use http::{HeaderName, HeaderValue, Request};
+use hyper::{body::Bytes, Body};
+use hyper::{client::connect::HttpConnector, Client as HttpClient};
+use hyper_rustls::HttpsConnector;
+
+use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings};
-use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient};
-use rusoto_credential::AwsCredentials;
-use rusoto_signature::region::Region;
-use rusoto_signature::signature::SignedRequest;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use tokio::io::AsyncReadExt;
-
mod error;
pub use error::Error;
@@ -22,41 +25,57 @@ pub use error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
const SERVICE: &str = "k2v";
-const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+const AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
+const GARAGE_CAUSALITY_TOKEN: HeaderName = HeaderName::from_static("x-garage-causality-token");
+
+const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+ .remove(b'_')
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'~');
+const PATH_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+ .remove(b'/')
+ .remove(b'_')
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'~');
+
+pub struct K2vClientConfig {
+ pub endpoint: String,
+ pub region: String,
+ pub aws_access_key_id: String,
+ pub aws_secret_access_key: String,
+ pub bucket: String,
+ pub user_agent: Option<String>,
+}
/// Client used to query a K2V server.
pub struct K2vClient {
- region: Region,
- bucket: String,
- creds: AwsCredentials,
- client: HttpClient,
+ config: K2vClientConfig,
+ user_agent: HeaderValue,
+ client: HttpClient<HttpsConnector<HttpConnector>>,
}
impl K2vClient {
/// Create a new K2V client.
- pub fn new(
- region: Region,
- bucket: String,
- creds: AwsCredentials,
- user_agent: Option<String>,
- ) -> Result<Self, Error> {
+ pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
- let mut client = HttpClient::from_connector(connector);
- if let Some(ua) = user_agent {
- client.local_agent_prepend(ua);
- } else {
- client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION")));
- }
+ let client = HttpClient::builder().build(connector);
+ let user_agent: std::borrow::Cow<str> = match &config.user_agent {
+ Some(ua) => ua.into(),
+ None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
+ };
+ let user_agent = HeaderValue::from_str(&user_agent)
+ .map_err(|_| Error::Message("invalid user agent".into()))?;
Ok(K2vClient {
- region,
- bucket,
- creds,
+ config,
client,
+ user_agent,
})
}
@@ -66,15 +85,10 @@ impl K2vClient {
partition_key: &str,
sort_key: &str,
) -> Result<CausalValue, Error> {
- let mut req = SignedRequest::new(
- "GET",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.add_header(ACCEPT, "application/octet-stream, application/json");
-
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let req = Request::get(url)
+ .header(ACCEPT, "application/octet-stream, application/json")
+ .body(Bytes::new())?;
let res = self.dispatch(req, None).await?;
let causality = res
@@ -91,7 +105,7 @@ impl K2vClient {
match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(CausalValue {
causality,
- value: vec![K2vValue::Value(res.body)],
+ value: vec![K2vValue::Value(res.body.to_vec())],
}),
Some("application/json") => {
let value = serde_json::from_slice(&res.body)?;
@@ -115,16 +129,17 @@ impl K2vClient {
) -> Result<Option<CausalValue>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
- let mut req = SignedRequest::new(
- "GET",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
+ let url = self.build_url(
+ Some(partition_key),
+ &[
+ ("sort_key", sort_key),
+ ("causality_token", &causality.0),
+ ("timeout", &timeout.as_secs().to_string()),
+ ],
);
- req.add_param("sort_key", sort_key);
- req.add_param("causality_token", &causality.0);
- req.add_param("timeout", &timeout.as_secs().to_string());
- req.add_header(ACCEPT, "application/octet-stream, application/json");
+ let req = Request::get(url)
+ .header(ACCEPT, "application/octet-stream, application/json")
+ .body(Bytes::new())?;
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
@@ -146,7 +161,7 @@ impl K2vClient {
match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(Some(CausalValue {
causality,
- value: vec![K2vValue::Value(res.body)],
+ value: vec![K2vValue::Value(res.body.to_vec())],
})),
Some("application/json") => {
let value = serde_json::from_slice(&res.body)?;
@@ -176,16 +191,10 @@ impl K2vClient {
timeout: timeout.as_secs(),
};
- let mut req = SignedRequest::new(
- "POST",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("poll_range", "");
-
+ let url = self.build_url(Some(partition_key), &[("poll_range", "")]);
let payload = serde_json::to_vec(&request)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(Bytes::from(payload))?;
+
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
if res.status == StatusCode::NOT_MODIFIED {
@@ -219,18 +228,12 @@ impl K2vClient {
value: Vec<u8>,
causality: Option<CausalityToken>,
) -> Result<(), Error> {
- let mut req = SignedRequest::new(
- "PUT",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.set_payload(Some(value));
-
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let mut req = Request::put(url);
if let Some(causality) = causality {
- req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ req = req.header(GARAGE_CAUSALITY_TOKEN, &causality.0);
}
+ let req = req.body(Bytes::from(value))?;
self.dispatch(req, None).await?;
Ok(())
@@ -243,14 +246,10 @@ impl K2vClient {
sort_key: &str,
causality: CausalityToken,
) -> Result<(), Error> {
- let mut req = SignedRequest::new(
- "DELETE",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let req = Request::delete(url)
+ .header(GARAGE_CAUSALITY_TOKEN, &causality.0)
+ .body(Bytes::new())?;
self.dispatch(req, None).await?;
Ok(())
@@ -262,9 +261,9 @@ impl K2vClient {
&self,
filter: Filter<'_>,
) -> Result<PaginatedRange<PartitionInfo>, Error> {
- let mut req =
- SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket));
- filter.insert_params(&mut req);
+ let params = filter.query_params();
+ let url = self.build_url(None, &params);
+ let req = Request::get(url).body(Bytes::new())?;
let res = self.dispatch(req, None).await?;
@@ -286,11 +285,10 @@ impl K2vClient {
/// *not* atomic: it is possible for some sub-operations to fails and others to success. In
/// that case, failure is reported.
pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
-
+ let url = self.build_url::<&str>(None, &[]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
self.dispatch(req, None).await?;
Ok(())
}
@@ -300,12 +298,10 @@ impl K2vClient {
&self,
operations: &[BatchReadOp<'_>],
) -> Result<Vec<PaginatedRange<CausalValue>>, Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
- req.add_param("search", "");
-
+ let url = self.build_url(None, &[("search", "")]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
let res = self.dispatch(req, None).await?;
let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?;
@@ -334,12 +330,10 @@ impl K2vClient {
/// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
/// providing causality information.
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
- req.add_param("delete", "");
-
+ let url = self.build_url(None, &[("delete", "")]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
let res = self.dispatch(req, None).await?;
let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?;
@@ -349,33 +343,67 @@ impl K2vClient {
async fn dispatch(
&self,
- mut req: SignedRequest,
+ mut req: Request<Bytes>,
timeout: Option<Duration>,
) -> Result<Response, Error> {
- req.sign(&self.creds);
- let mut res = self
- .client
- .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT)))
- .await?;
-
- let causality_token = res
- .headers
- .remove(GARAGE_CAUSALITY_TOKEN)
- .map(CausalityToken);
- let content_type = res.headers.remove(CONTENT_TYPE);
+ req.headers_mut()
+ .insert(http::header::USER_AGENT, self.user_agent.clone());
+
+ use sha2::{Digest, Sha256};
+ let mut hasher = Sha256::new();
+ hasher.update(req.body());
+ let hash = hex::encode(&hasher.finalize());
+ req.headers_mut()
+ .insert(AMZ_CONTENT_SHA256, hash.try_into().unwrap());
+
+ debug!("request uri: {:?}", req.uri());
+
+ // Sign request
+ let signing_settings = SigningSettings::default();
+ let signing_params = SigningParams::builder()
+ .access_key(&self.config.aws_access_key_id)
+ .secret_key(&self.config.aws_secret_access_key)
+ .region(&self.config.region)
+ .service_name(SERVICE)
+ .time(SystemTime::now())
+ .settings(signing_settings)
+ .build()?;
+ // Convert the HTTP request into a signable request
+ let signable_request = SignableRequest::from(&req);
+
+ // Sign and then apply the signature to the request
+ let (signing_instructions, _signature) =
+ sign(signable_request, &signing_params)?.into_parts();
+ signing_instructions.apply_to_request(&mut req);
+
+ // Send and wait for timeout
+ let res = tokio::select! {
+ res = self.client.request(req.map(Body::from)) => res?,
+ _ = tokio::time::sleep(timeout.unwrap_or(DEFAULT_TIMEOUT)) => {
+ return Err(Error::Timeout);
+ }
+ };
+
+ let (mut res, body) = res.into_parts();
+ let causality_token = match res.headers.remove(GARAGE_CAUSALITY_TOKEN) {
+ Some(v) => Some(CausalityToken(v.to_str()?.to_string())),
+ None => None,
+ };
+ let content_type = match res.headers.remove(CONTENT_TYPE) {
+ Some(v) => Some(v.to_str()?.to_string()),
+ None => None,
+ };
let body = match res.status {
- StatusCode::OK => read_body(&mut res.headers, res.body).await?,
- StatusCode::NO_CONTENT => Vec::new(),
+ StatusCode::OK => hyper::body::to_bytes(body).await?,
+ StatusCode::NO_CONTENT => Bytes::new(),
StatusCode::NOT_FOUND => return Err(Error::NotFound),
- StatusCode::NOT_MODIFIED => Vec::new(),
+ StatusCode::NOT_MODIFIED => Bytes::new(),
s => {
- let err_body = read_body(&mut res.headers, res.body)
- .await
- .unwrap_or_default();
+ let err_body = hyper::body::to_bytes(body).await.unwrap_or_default();
let err_body_str = std::str::from_utf8(&err_body)
.map(String::from)
- .unwrap_or_else(|_| base64::encode(&err_body));
+ .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body));
if s.is_client_error() || s.is_server_error() {
error!("Error response {}: {}", res.status, err_body_str);
@@ -408,7 +436,7 @@ impl K2vClient {
"Response body: {}",
std::str::from_utf8(&body)
.map(String::from)
- .unwrap_or_else(|_| base64::encode(&body))
+ .unwrap_or_else(|_| BASE64_STANDARD.encode(&body))
);
Ok(Response {
@@ -418,16 +446,26 @@ impl K2vClient {
content_type,
})
}
-}
-async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> {
- let body_len = headers
- .get(CONTENT_LENGTH)
- .and_then(|h| h.parse().ok())
- .unwrap_or(0);
- let mut res = Vec::with_capacity(body_len);
- body.into_async_read().read_to_end(&mut res).await?;
- Ok(res)
+ fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String {
+ let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket);
+ if let Some(pk) = partition_key {
+ url.push('/');
+ url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET));
+ }
+ if !query.is_empty() {
+ url.push('?');
+ for (i, (k, v)) in query.iter().enumerate() {
+ if i > 0 {
+ url.push('&');
+ }
+ url.extend(utf8_percent_encode(k, &STRICT_ENCODE_SET));
+ url.push('=');
+ url.extend(utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET));
+ }
+ }
+ url
+ }
}
/// An opaque token used to convey causality between operations.
@@ -482,9 +520,11 @@ impl<'de> Deserialize<'de> for K2vValue {
{
let val: Option<&str> = Option::deserialize(d)?;
Ok(match val {
- Some(s) => {
- K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?)
- }
+ Some(s) => K2vValue::Value(
+ BASE64_STANDARD
+ .decode(s)
+ .map_err(|_| DeError::custom("invalid base64"))?,
+ ),
None => K2vValue::Tombstone,
})
}
@@ -498,7 +538,7 @@ impl Serialize for K2vValue {
match self {
K2vValue::Tombstone => serializer.serialize_none(),
K2vValue::Value(v) => {
- let b64 = base64::encode(v);
+ let b64 = BASE64_STANDARD.encode(v);
serializer.serialize_str(&b64)
}
}
@@ -554,22 +594,24 @@ struct PollRangeResponse {
}
impl<'a> Filter<'a> {
- fn insert_params(&self, req: &mut SignedRequest) {
- if let Some(start) = &self.start {
- req.add_param("start", start);
+ fn query_params(&self) -> Vec<(&'static str, std::borrow::Cow<str>)> {
+ let mut res = Vec::<(&'static str, std::borrow::Cow<str>)>::with_capacity(8);
+ if let Some(start) = self.start.as_deref() {
+ res.push(("start", start.into()));
}
- if let Some(end) = &self.end {
- req.add_param("end", end);
+ if let Some(end) = self.end.as_deref() {
+ res.push(("end", end.into()));
}
- if let Some(prefix) = &self.prefix {
- req.add_param("prefix", prefix);
+ if let Some(prefix) = self.prefix.as_deref() {
+ res.push(("prefix", prefix.into()));
}
if let Some(limit) = &self.limit {
- req.add_param("limit", &limit.to_string());
+ res.push(("limit", limit.to_string().into()));
}
if self.reverse {
- req.add_param("reverse", "true");
+ res.push(("reverse", "true".into()));
}
+ res
}
}
@@ -691,7 +733,7 @@ struct ErrorResponse {
}
struct Response {
- body: Vec<u8>,
+ body: Bytes,
status: StatusCode,
causality_token: Option<CausalityToken>,
content_type: Option<String>,
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
index 8fe3a582..59d4ca5b 100644
--- a/src/model/k2v/seen.rs
+++ b/src/model/k2v/seen.rs
@@ -79,7 +79,7 @@ impl RangeSeenMarker {
let bytes = nonversioned_encode(&self)?;
let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
- Ok(BASE64_STANDARD.encode(&bytes))
+ Ok(BASE64_STANDARD.encode(bytes))
}
/// Decode from msgpack+zstd+b64 representation, returns None on error.
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index f85f789c..ab8d1112 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -8,16 +8,26 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
+use garage_util::config::ConsulDiscoveryAPI;
use garage_util::config::ConsulDiscoveryConfig;
+const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
+
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "ServicePort")]
service_port: u16,
- #[serde(rename = "NodeMeta")]
- node_meta: HashMap<String, String>,
+ #[serde(rename = "ServiceMeta")]
+ meta: HashMap<String, String>,
+}
+
+#[derive(Serialize, Clone, Debug)]
+#[serde(untagged)]
+enum PublishRequest {
+ Catalog(ConsulPublishEntry),
+ Service(ConsulPublishService),
}
#[derive(Serialize, Clone, Debug)]
@@ -26,28 +36,43 @@ struct ConsulPublishEntry {
node: String,
#[serde(rename = "Address")]
address: IpAddr,
- #[serde(rename = "NodeMeta")]
- node_meta: HashMap<String, String>,
#[serde(rename = "Service")]
- service: ConsulPublishService,
+ service: ConsulPublishCatalogService,
}
#[derive(Serialize, Clone, Debug)]
-struct ConsulPublishService {
+struct ConsulPublishCatalogService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
+ #[serde(rename = "Meta")]
+ meta: HashMap<String, String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
}
-// ----
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishService {
+ #[serde(rename = "ID")]
+ service_id: String,
+ #[serde(rename = "Name")]
+ service_name: String,
+ #[serde(rename = "Tags")]
+ tags: Vec<String>,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "Port")]
+ port: u16,
+ #[serde(rename = "Meta")]
+ meta: HashMap<String, String>,
+}
+// ----
pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig,
client: reqwest::Client,
@@ -55,44 +80,48 @@ pub struct ConsulDiscovery {
impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
- let client = match (&config.client_cert, &config.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let identity = reqwest::Identity::from_pem(
- &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
- )?;
-
- if config.tls_skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(identity)
- .build()?
- } else if let Some(ca_cert) = &config.ca_cert {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
-
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- .identity(identity)
- .build()?
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(identity)
- .build()?
+ let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls();
+ if config.tls_skip_verify {
+ builder = builder.danger_accept_invalid_certs(true);
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+ builder =
+ builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
+ }
+
+ match &config.api {
+ ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ builder = builder.identity(identity);
+ }
+ (None, None) => {}
+ _ => return Err(ConsulError::InvalidTLSConfig),
+ },
+ ConsulDiscoveryAPI::Agent => {
+ if let Some(token) = &config.token {
+ let mut headers = reqwest::header::HeaderMap::new();
+ headers.insert(
+ "x-consul-token",
+ reqwest::header::HeaderValue::from_str(&token)?,
+ );
+ builder = builder.default_headers(headers);
}
}
- (None, None) => reqwest::Client::new(),
- _ => return Err(ConsulError::InvalidTLSConfig),
};
+ let client: reqwest::Client = builder.build()?;
+
Ok(Self { client, config })
}
@@ -111,8 +140,8 @@ impl ConsulDiscovery {
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
- .node_meta
- .get("pubkey")
+ .meta
+ .get(&format!("{}-pubkey", META_PREFIX))
.and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
@@ -138,29 +167,49 @@ impl ConsulDiscovery {
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
+ let tags = [
+ vec!["advertised-by-garage".into(), hostname.into()],
+ self.config.tags.clone(),
+ ]
+ .concat();
- let advertisement = ConsulPublishEntry {
- node: node.clone(),
- address: rpc_public_addr.ip(),
- node_meta: [
- ("pubkey".to_string(), hex::encode(node_id)),
- ("hostname".to_string(), hostname.to_string()),
- ]
- .iter()
- .cloned()
- .collect(),
- service: ConsulPublishService {
+ let mut meta = self.config.meta.clone().unwrap_or_default();
+ meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id));
+ meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string());
+
+ let url = format!(
+ "{}/v1/{}",
+ self.config.consul_http_addr,
+ (match &self.config.api {
+ ConsulDiscoveryAPI::Catalog => "catalog/register",
+ ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks",
+ })
+ );
+
+ let req = self.client.put(&url);
+ let advertisement: PublishRequest = match &self.config.api {
+ ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry {
+ node: node.clone(),
+ address: rpc_public_addr.ip(),
+ service: ConsulPublishCatalogService {
+ service_id: node.clone(),
+ service_name: self.config.service_name.clone(),
+ tags,
+ meta: meta.clone(),
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ }),
+ ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
- tags: vec!["advertised-by-garage".into(), hostname.into()],
+ tags,
+ meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
- },
+ }),
};
-
- let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
-
- let http = self.client.put(&url).json(&advertisement).send().await?;
+ let http = req.json(&advertisement).send().await?;
http.error_for_status()?;
Ok(())
@@ -176,4 +225,6 @@ pub enum ConsulError {
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig,
+ #[error(display = "Token error: {}", _0)]
+ Token(#[error(source)] reqwest::header::InvalidHeaderValue),
}
diff --git a/src/table/data.rs b/src/table/data.rs
index 73fa93c8..26101da4 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -44,22 +44,22 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
- .open_tree(&format!("{}:table", F::TABLE_NAME))
+ .open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
let merkle_tree = db
- .open_tree(&format!("{}:merkle_tree", F::TABLE_NAME))
+ .open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
- .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
+ .open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let insert_queue = db
- .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .open_tree(format!("{}:insert_queue", F::TABLE_NAME))
.expect("Unable to open insert queue DB tree");
let gc_todo = db
- .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
+ .open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
@@ -90,7 +90,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
let tree_key = self.tree_key(p, s);
- if let Some(bytes) = self.store.get(&tree_key)? {
+ if let Some(bytes) = self.store.get(tree_key)? {
Ok(Some(ByteBuf::from(bytes.to_vec())))
} else {
Ok(None)
@@ -132,10 +132,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
}
- fn read_range_aux<'a>(
+ fn read_range_aux(
&self,
partition_hash: Hash,
- range: db::ValueIter<'a>,
+ range: db::ValueIter,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
diff --git a/src/table/util.rs b/src/table/util.rs
index 0b10cf3f..663a7e11 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -34,8 +34,9 @@ impl DeletedFilter {
}
}
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub enum EnumerationOrder {
+ #[default]
Forward,
Reverse,
}
@@ -49,9 +50,3 @@ impl EnumerationOrder {
}
}
}
-
-impl Default for EnumerationOrder {
- fn default() -> Self {
- EnumerationOrder::Forward
- }
-}
diff --git a/src/util/config.rs b/src/util/config.rs
index 009f0574..eeb17e0e 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -142,8 +142,19 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
+#[derive(Deserialize, Debug, Clone, Default)]
+#[serde(rename_all = "lowercase")]
+pub enum ConsulDiscoveryAPI {
+ #[default]
+ Catalog,
+ Agent,
+}
+
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig {
+ /// The consul api to use when registering: either `catalog` (the default) or `agent`
+ #[serde(default)]
+ pub api: ConsulDiscoveryAPI,
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Consul service name to use
@@ -154,9 +165,17 @@ pub struct ConsulDiscoveryConfig {
pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul
pub client_key: Option<String>,
+ /// /// Token to use for connecting to consul
+ pub token: Option<String>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
+ /// Additional tags to add to the service
+ #[serde(default)]
+ pub tags: Vec<String>,
+ /// Additional service metadata to add
+ #[serde(default)]
+ pub meta: Option<std::collections::HashMap<String, String>>,
}
#[derive(Deserialize, Debug, Clone)]
@@ -230,7 +249,7 @@ fn secret_from_file(
#[cfg(unix)]
if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") {
use std::os::unix::fs::MetadataExt;
- let metadata = std::fs::metadata(&file_path)?;
+ let metadata = std::fs::metadata(file_path)?;
if metadata.mode() & 0o077 != 0 {
return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into());
}
@@ -351,7 +370,7 @@ mod tests {
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
-
+
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
@@ -395,7 +414,7 @@ mod tests {
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"
-
+
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs
index 6ae275aa..12f76434 100644
--- a/src/util/forwarded_headers.rs
+++ b/src/util/forwarded_headers.rs
@@ -13,7 +13,7 @@ pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<
.to_str()
.ok_or_message("Error parsing X-Forwarded-For header")?;
- let client_ip = IpAddr::from_str(&forwarded_for_ip_str)
+ let client_ip = IpAddr::from_str(forwarded_for_ip_str)
.ok_or_message("Valid IP address not found in X-Forwarded-For header")?;
Ok(client_ip.to_string())
diff --git a/src/util/lib.rs b/src/util/lib.rs
index c9110fb2..15f0f829 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -10,7 +10,6 @@ pub mod crdt;
pub mod data;
pub mod encode;
pub mod error;
-pub mod formater;
pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;