aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml2
-rw-r--r--src/api/admin/api_server.rs6
-rw-r--r--src/api/admin/cluster.rs6
-rw-r--r--src/api/k2v/batch.rs2
-rw-r--r--src/api/k2v/item.rs2
-rw-r--r--src/db/test.rs13
-rw-r--r--src/garage/Cargo.toml6
-rw-r--r--src/garage/admin.rs39
-rw-r--r--src/garage/cli/structs.rs15
-rw-r--r--src/garage/main.rs58
-rw-r--r--src/garage/tests/k2v/batch.rs22
-rw-r--r--src/garage/tests/k2v/errorcodes.rs20
-rw-r--r--src/garage/tests/k2v/item.rs58
-rw-r--r--src/garage/tests/k2v/poll.rs10
-rw-r--r--src/garage/tests/k2v/simple.rs6
-rw-r--r--src/garage/tests/s3/website.rs20
-rw-r--r--src/k2v-client/Cargo.toml2
-rw-r--r--src/model/helper/bucket.rs69
-rw-r--r--src/rpc/Cargo.toml13
-rw-r--r--src/rpc/consul.rs234
-rw-r--r--src/rpc/kubernetes.rs16
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/system.rs73
-rw-r--r--src/util/config.rs45
-rw-r--r--src/web/web_server.rs2
25 files changed, 468 insertions, 272 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 7c3ed43b..4d9a6ab6 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -36,7 +36,7 @@ sha2 = "0.10"
futures = "0.3"
futures-util = "0.3"
-pin-project = "1.0"
+pin-project = "1.0.11"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = "0.1"
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 0816bda1..2896d058 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -5,7 +5,7 @@ use async_trait::async_trait;
use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
-use hyper::{Body, Request, Response};
+use hyper::{Body, Request, Response, StatusCode};
use opentelemetry::trace::SpanRef;
@@ -69,7 +69,7 @@ impl AdminApiServer {
fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
Ok(Response::builder()
- .status(204)
+ .status(StatusCode::NO_CONTENT)
.header(ALLOW, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
@@ -94,7 +94,7 @@ impl AdminApiServer {
.ok_or_internal_error("Could not serialize metrics")?;
Ok(Response::builder()
- .status(200)
+ .status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))?)
}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index da3d8c44..61bfb8c5 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -151,7 +151,7 @@ pub async fn handle_update_cluster_layout(
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
- .status(StatusCode::OK)
+ .status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
@@ -171,7 +171,7 @@ pub async fn handle_apply_cluster_layout(
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
- .status(StatusCode::OK)
+ .status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
@@ -186,7 +186,7 @@ pub async fn handle_revert_cluster_layout(
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
- .status(StatusCode::OK)
+ .status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index db9901cf..78035362 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -42,7 +42,7 @@ pub async fn handle_insert_batch(
garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
Ok(Response::builder()
- .status(StatusCode::OK)
+ .status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 836d386f..f85138c7 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -153,7 +153,7 @@ pub async fn handle_insert_item(
.await?;
Ok(Response::builder()
- .status(StatusCode::OK)
+ .status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
diff --git a/src/db/test.rs b/src/db/test.rs
index cfcee643..40e6c41e 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -1,9 +1,5 @@
use crate::*;
-use crate::lmdb_adapter::LmdbDb;
-use crate::sled_adapter::SledDb;
-use crate::sqlite_adapter::SqliteDb;
-
fn test_suite(db: Db) {
let tree = db.open_tree("tree").unwrap();
@@ -80,7 +76,10 @@ fn test_suite(db: Db) {
}
#[test]
+#[cfg(feature = "lmdb")]
fn test_lmdb_db() {
+ use crate::lmdb_adapter::LmdbDb;
+
let path = mktemp::Temp::new_dir().unwrap();
let db = heed::EnvOpenOptions::new()
.max_dbs(100)
@@ -92,7 +91,10 @@ fn test_lmdb_db() {
}
#[test]
+#[cfg(feature = "sled")]
fn test_sled_db() {
+ use crate::sled_adapter::SledDb;
+
let path = mktemp::Temp::new_dir().unwrap();
let db = SledDb::init(sled::open(path.to_path_buf()).unwrap());
test_suite(db);
@@ -100,7 +102,10 @@ fn test_sled_db() {
}
#[test]
+#[cfg(feature = "sqlite")]
fn test_sqlite_db() {
+ use crate::sqlite_adapter::SqliteDb;
+
let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
test_suite(db);
}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 5ce40ff2..69852db7 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -30,9 +30,11 @@ garage_table = { version = "0.8.0", path = "../table" }
garage_util = { version = "0.8.0", path = "../util" }
garage_web = { version = "0.8.0", path = "../web" }
+backtrace = "0.3"
bytes = "1.0"
bytesize = "1.1"
timeago = "0.3"
+parse_duration = "2.1"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
@@ -58,7 +60,7 @@ opentelemetry-otlp = { version = "0.10", optional = true }
prometheus = { version = "0.13", optional = true }
[dev-dependencies]
-aws-sdk-s3 = "0.8"
+aws-sdk-s3 = "0.19"
chrono = "0.4"
http = "0.2"
hmac = "0.12"
@@ -81,6 +83,8 @@ sled = [ "garage_model/sled" ]
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
+# Automatic registration and discovery via Consul API
+consul-discovery = [ "garage_rpc/consul-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 802a8261..e973cfe7 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -85,6 +85,9 @@ impl AdminRpcHandler {
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.handle_bucket_cleanup_incomplete_uploads(query).await
+ }
}
}
@@ -512,6 +515,42 @@ impl AdminRpcHandler {
)))
}
+ async fn handle_bucket_cleanup_incomplete_uploads(
+ &self,
+ query: &CleanupIncompleteUploadsOpt,
+ ) -> Result<AdminRpc, Error> {
+ let mut bucket_ids = vec![];
+ for b in query.buckets.iter() {
+ bucket_ids.push(
+ self.garage
+ .bucket_helper()
+ .resolve_global_bucket_name(b)
+ .await?
+ .ok_or_bad_request(format!("Bucket not found: {}", b))?,
+ );
+ }
+
+ let duration = parse_duration::parse::parse(&query.older_than)
+ .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
+
+ let mut ret = String::new();
+ for bucket in bucket_ids {
+ let count = self
+ .garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket, duration)
+ .await?;
+ writeln!(
+ &mut ret,
+ "Bucket {:?}: {} incomplete uploads aborted",
+ bucket, count
+ )
+ .unwrap();
+ }
+
+ Ok(AdminRpc::Ok(ret))
+ }
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 02ed8992..64798952 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -200,6 +200,10 @@ pub enum BucketOperation {
/// Set the quotas for this bucket
#[structopt(name = "set-quotas", version = garage_version())]
SetQuotas(SetQuotasOpt),
+
+ /// Clean up (abort) old incomplete multipart uploads
+ #[structopt(name = "cleanup-incomplete-uploads", version = garage_version())]
+ CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@@ -302,6 +306,17 @@ pub struct SetQuotasOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct CleanupIncompleteUploadsOpt {
+ /// Abort multipart uploads older than this value
+ #[structopt(long = "older-than", default_value = "1d")]
+ pub older_than: String,
+
+ /// Name of bucket(s) to clean up
+ #[structopt(required = true)]
+ pub buckets: Vec<String>,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list", version = garage_version())]
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e5cba553..edda734b 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -65,21 +65,6 @@ struct Opt {
#[tokio::main]
async fn main() {
- if std::env::var("RUST_LOG").is_err() {
- std::env::set_var("RUST_LOG", "netapp=info,garage=info")
- }
- tracing_subscriber::fmt()
- .with_writer(std::io::stderr)
- .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
- .init();
- sodiumoxide::init().expect("Unable to init sodiumoxide");
-
- // Abort on panic (same behavior as in Go)
- std::panic::set_hook(Box::new(|panic_info| {
- error!("{}", panic_info.to_string());
- std::process::abort();
- }));
-
// Initialize version and features info
let features = &[
#[cfg(feature = "k2v")]
@@ -90,6 +75,8 @@ async fn main() {
"lmdb",
#[cfg(feature = "sqlite")]
"sqlite",
+ #[cfg(feature = "consul-discovery")]
+ "consul-discovery",
#[cfg(feature = "kubernetes-discovery")]
"kubernetes-discovery",
#[cfg(feature = "metrics")]
@@ -106,12 +93,51 @@ async fn main() {
}
garage_util::version::init_features(features);
- // Parse arguments
let version = format!(
"{} [features: {}]",
garage_util::version::garage_version(),
features.join(", ")
);
+
+ // Initialize panic handler that aborts on panic and shows a nice message.
+ // By default, Tokio continues runing normally when a task panics. We want
+ // to avoid this behavior in Garage as this would risk putting the process in an
+ // unknown/uncontrollable state. We prefer to exit the process and restart it
+ // from scratch, so that it boots back into a fresh, known state.
+ let panic_version_info = version.clone();
+ std::panic::set_hook(Box::new(move |panic_info| {
+ eprintln!("======== PANIC (internal Garage error) ========");
+ eprintln!("{}", panic_info);
+ eprintln!();
+ eprintln!("Panics are internal errors that Garage is unable to handle on its own.");
+ eprintln!("They can be caused by bugs in Garage's code, or by corrupted data in");
+ eprintln!("the node's storage. If you feel that this error is likely to be a bug");
+ eprintln!("in Garage, please report it on our issue tracker a the following address:");
+ eprintln!();
+ eprintln!(" https://git.deuxfleurs.fr/Deuxfleurs/garage/issues");
+ eprintln!();
+ eprintln!("Please include the last log messages and the the full backtrace below in");
+ eprintln!("your bug report, as well as any relevant information on the context in");
+ eprintln!("which Garage was running when this error occurred.");
+ eprintln!();
+ eprintln!("GARAGE VERSION: {}", panic_version_info);
+ eprintln!();
+ eprintln!("BACKTRACE:");
+ eprintln!("{:?}", backtrace::Backtrace::new());
+ std::process::abort();
+ }));
+
+ // Initialize logging as well as other libraries used in Garage
+ if std::env::var("RUST_LOG").is_err() {
+ std::env::set_var("RUST_LOG", "netapp=info,garage=info")
+ }
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
+ .init();
+ sodiumoxide::init().expect("Unable to init sodiumoxide");
+
+ // Parse arguments and dispatch command line
let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
let res = match opt.cmd {
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
index acae1910..6abba1c5 100644
--- a/src/garage/tests/k2v/batch.rs
+++ b/src/garage/tests/k2v/batch.rs
@@ -6,7 +6,7 @@ use assert_json_diff::assert_json_eq;
use serde_json::json;
use super::json_body;
-use hyper::Method;
+use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_batch() {
@@ -49,7 +49,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
for sk in ["a", "b", "c", "d.1", "d.2", "e"] {
let res = ctx
@@ -62,7 +62,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@@ -104,7 +104,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
@@ -266,7 +266,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
for sk in ["b", "c", "d.1", "d.2"] {
let res = ctx
@@ -280,9 +280,9 @@ async fn test_batch() {
.await
.unwrap();
if sk == "b" {
- assert_eq!(res.status(), 204);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
} else {
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
}
ct.insert(
sk,
@@ -317,7 +317,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
@@ -478,7 +478,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
@@ -514,7 +514,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 204);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@@ -547,7 +547,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
diff --git a/src/garage/tests/k2v/errorcodes.rs b/src/garage/tests/k2v/errorcodes.rs
index 2fcc45bc..ecb84729 100644
--- a/src/garage/tests/k2v/errorcodes.rs
+++ b/src/garage/tests/k2v/errorcodes.rs
@@ -1,13 +1,13 @@
use crate::common;
-use hyper::Method;
+use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_error_codes() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-k2v-error-codes");
- // Regular insert should work (code 200)
+ // Regular insert should work (code 204)
let res = ctx
.k2v
.request
@@ -19,7 +19,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Insert with trash causality token: invalid request
let res = ctx
@@ -34,7 +34,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Search without partition key: invalid request
let res = ctx
@@ -52,7 +52,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Search with start that is not in prefix: invalid request
let res = ctx
@@ -70,7 +70,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Search with invalid json: 400
let res = ctx
@@ -88,7 +88,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Batch insert with invalid causality token: 400
let res = ctx
@@ -105,7 +105,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Batch insert with invalid data: 400
let res = ctx
@@ -122,7 +122,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Poll with invalid causality token: 400
let res = ctx
@@ -137,5 +137,5 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 400);
+ assert_eq!(res.status(), StatusCode::BAD_REQUEST);
}
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 32537336..2641386f 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -6,7 +6,7 @@ use assert_json_diff::assert_json_eq;
use serde_json::json;
use super::json_body;
-use hyper::Method;
+use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_items_and_indices() {
@@ -56,7 +56,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Get value back
let res = ctx
@@ -69,7 +69,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@@ -132,7 +132,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Get value back
let res = ctx
@@ -145,7 +145,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@@ -201,7 +201,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Get value back
let res = ctx
@@ -214,7 +214,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -271,7 +271,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
let ct = res
.headers()
.get("x-garage-causality-token")
@@ -292,7 +292,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 204);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -364,7 +364,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@@ -377,7 +377,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@@ -405,7 +405,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -424,7 +424,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@@ -446,7 +446,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -466,7 +466,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@@ -479,7 +479,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -503,7 +503,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -528,7 +528,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 409); // CONFLICT
+ assert_eq!(res.status(), StatusCode::CONFLICT); // CONFLICT
// f3: json
let res = ctx
@@ -541,7 +541,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -568,7 +568,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 204);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@@ -581,7 +581,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -599,7 +599,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -625,7 +625,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 409); // CONFLICT
+ assert_eq!(res.status(), StatusCode::CONFLICT); // CONFLICT
// f3: json
let res = ctx
@@ -638,7 +638,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -658,7 +658,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 204);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@@ -671,7 +671,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 204); // NO CONTENT
+ assert_eq!(res.status(), StatusCode::NO_CONTENT); // NO CONTENT
// f1: not specified
let res = ctx
@@ -683,7 +683,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@@ -702,7 +702,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 204); // NO CONTENT
+ assert_eq!(res.status(), StatusCode::NO_CONTENT); // NO CONTENT
// f3: json
let res = ctx
@@ -715,7 +715,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index 70dc0410..e56705ae 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,4 +1,4 @@
-use hyper::Method;
+use hyper::{Method, StatusCode};
use std::time::Duration;
use crate::common;
@@ -20,7 +20,7 @@ async fn test_poll() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Retrieve initial value to get its causality token
let res2 = ctx
@@ -33,7 +33,7 @@ async fn test_poll() {
.send()
.await
.unwrap();
- assert_eq!(res2.status(), 200);
+ assert_eq!(res2.status(), StatusCode::OK);
let ct = res2
.headers()
.get("x-garage-causality-token")
@@ -80,7 +80,7 @@ async fn test_poll() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Check poll finishes with correct value
let poll_res = tokio::select! {
@@ -88,7 +88,7 @@ async fn test_poll() {
res = poll => res.unwrap().unwrap(),
};
- assert_eq!(poll_res.status(), 200);
+ assert_eq!(poll_res.status(), StatusCode::OK);
let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
.await
diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs
index ae9a8674..465fc24d 100644
--- a/src/garage/tests/k2v/simple.rs
+++ b/src/garage/tests/k2v/simple.rs
@@ -1,6 +1,6 @@
use crate::common;
-use hyper::Method;
+use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_simple() {
@@ -18,7 +18,7 @@ async fn test_simple() {
.send()
.await
.unwrap();
- assert_eq!(res.status(), 200);
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
let res2 = ctx
.k2v
@@ -30,7 +30,7 @@ async fn test_simple() {
.send()
.await
.unwrap();
- assert_eq!(res2.status(), 200);
+ assert_eq!(res2.status(), StatusCode::OK);
let res2_body = hyper::body::to_bytes(res2.into_body())
.await
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index 0570ac6a..244a2fa0 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -4,7 +4,7 @@ use aws_sdk_s3::{
model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
types::ByteStream,
};
-use http::Request;
+use http::{Request, StatusCode};
use hyper::{
body::{to_bytes, Body},
Client,
@@ -43,7 +43,7 @@ async fn test_website() {
let mut resp = client.request(req()).await.unwrap();
- assert_eq!(resp.status(), 404);
+ assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@@ -56,7 +56,7 @@ async fn test_website() {
.expect_success_status("Could not allow website on bucket");
resp = client.request(req()).await.unwrap();
- assert_eq!(resp.status(), 200);
+ assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@@ -69,7 +69,7 @@ async fn test_website() {
.expect_success_status("Could not deny website on bucket");
resp = client.request(req()).await.unwrap();
- assert_eq!(resp.status(), 404);
+ assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@@ -175,7 +175,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
- assert_eq!(resp.status(), 200);
+ assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
@@ -200,7 +200,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
- assert_eq!(resp.status(), 404);
+ assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY_ERR.as_ref()
@@ -220,7 +220,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
- assert_eq!(resp.status(), 200);
+ assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
@@ -244,7 +244,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
- assert_eq!(resp.status(), 403);
+ assert_eq!(resp.status(), StatusCode::FORBIDDEN);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@@ -285,7 +285,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
- assert_eq!(resp.status(), 403);
+ assert_eq!(resp.status(), StatusCode::FORBIDDEN);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@@ -311,7 +311,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
- assert_eq!(resp.status(), 404);
+ assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY_ERR.as_ref()
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 0f0b76ae..9d2b4e30 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -12,7 +12,7 @@ readme = "../../README.md"
base64 = "0.13.0"
http = "0.2.6"
log = "0.4"
-rusoto_core = "0.48.0"
+rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0"
serde = "1.0.137"
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 130ba5be..4a488d7f 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -1,3 +1,5 @@
+use std::time::Duration;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
@@ -12,7 +14,7 @@ use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
-use crate::s3::object_table::ObjectFilter;
+use crate::s3::object_table::*;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@@ -472,4 +474,69 @@ impl<'a> BucketHelper<'a> {
Ok(true)
}
+
+ // ----
+
+ /// Deletes all incomplete multipart uploads that are older than a certain time.
+ /// Returns the number of uploads aborted
+ pub async fn cleanup_incomplete_uploads(
+ &self,
+ bucket_id: &Uuid,
+ older_than: Duration,
+ ) -> Result<usize, Error> {
+ let older_than = now_msec() - older_than.as_millis() as u64;
+
+ let mut ret = 0usize;
+ let mut start = None;
+
+ loop {
+ let objects = self
+ .0
+ .object_table
+ .get_range(
+ bucket_id,
+ start,
+ Some(ObjectFilter::IsUploading),
+ 1000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ let abortions = objects
+ .iter()
+ .filter_map(|object| {
+ let aborted_versions = object
+ .versions()
+ .iter()
+ .filter(|v| v.is_uploading() && v.timestamp < older_than)
+ .map(|v| ObjectVersion {
+ state: ObjectVersionState::Aborted,
+ uuid: v.uuid,
+ timestamp: v.timestamp,
+ })
+ .collect::<Vec<_>>();
+ if !aborted_versions.is_empty() {
+ Some(Object::new(
+ object.bucket_id,
+ object.key.clone(),
+ aborted_versions,
+ ))
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ ret += abortions.len();
+ self.0.object_table.insert_many(abortions).await?;
+
+ if objects.len() < 1000 {
+ break;
+ } else {
+ start = Some(objects.last().unwrap().key.clone());
+ }
+ }
+
+ Ok(ret)
+ }
}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 5bb6aae0..5a427131 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -30,12 +30,13 @@ rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
+err-derive = { version = "0.3", optional = true }
# newer version requires rust edition 2021
-kube = { version = "0.62", features = ["runtime", "derive"], optional = true }
-k8s-openapi = { version = "0.13", features = ["v1_22"], optional = true }
-openssl = { version = "0.10", features = ["vendored"], optional = true }
+kube = { version = "0.75", default-features = false, features = ["runtime", "derive", "client", "rustls-tls"], optional = true }
+k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true }
schemars = { version = "0.8", optional = true }
+reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] }
# newer version requires rust edition 2021
pnet_datalink = "0.28"
@@ -48,9 +49,7 @@ opentelemetry = "0.17"
netapp = { version = "0.5.2", features = ["telemetry"] }
-hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
-
-
[features]
-kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ]
+kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]
+consul-discovery = [ "reqwest", "err-derive" ]
system-libs = [ "sodiumoxide/use-pkg-config" ]
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 15acbcef..b1772a1a 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -1,16 +1,14 @@
use std::collections::HashMap;
+use std::fs::File;
+use std::io::Read;
use std::net::{IpAddr, SocketAddr};
-use hyper::client::Client;
-use hyper::StatusCode;
-use hyper::{Body, Method, Request};
+use err_derive::Error;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
-use garage_util::error::Error;
-
-// ---- READING FROM CONSUL CATALOG ----
+use garage_util::config::ConsulDiscoveryConfig;
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
@@ -22,53 +20,6 @@ struct ConsulQueryEntry {
node_meta: HashMap<String, String>,
}
-pub async fn get_consul_nodes(
- consul_host: &str,
- consul_service_name: &str,
-) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
- let url = format!(
- "http://{}/v1/catalog/service/{}",
- consul_host, consul_service_name
- );
- let req = Request::builder()
- .uri(url)
- .method(Method::GET)
- .body(Body::default())?;
-
- let client = Client::new();
-
- let resp = client.request(req).await?;
- if resp.status() != StatusCode::OK {
- return Err(Error::Message(format!("HTTP error {}", resp.status())));
- }
-
- let body = hyper::body::to_bytes(resp.into_body()).await?;
- let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
-
- let mut ret = vec![];
- for ent in entries {
- let ip = ent.address.parse::<IpAddr>().ok();
- let pubkey = ent
- .node_meta
- .get("pubkey")
- .and_then(|k| hex::decode(&k).ok())
- .and_then(|k| NodeID::from_slice(&k[..]));
- if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
- ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
- } else {
- warn!(
- "Could not process node spec from Consul: {:?} (invalid IP or public key)",
- ent
- );
- }
- }
- debug!("Got nodes from Consul: {:?}", ret);
-
- Ok(ret)
-}
-
-// ---- PUBLISHING TO CONSUL CATALOG ----
-
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishEntry {
#[serde(rename = "Node")]
@@ -95,57 +46,134 @@ struct ConsulPublishService {
port: u16,
}
-pub async fn publish_consul_service(
- consul_host: &str,
- consul_service_name: &str,
- node_id: NodeID,
- hostname: &str,
- rpc_public_addr: SocketAddr,
-) -> Result<(), Error> {
- let node = format!("garage:{}", hex::encode(&node_id[..8]));
-
- let advertisment = ConsulPublishEntry {
- node: node.clone(),
- address: rpc_public_addr.ip(),
- node_meta: [
- ("pubkey".to_string(), hex::encode(node_id)),
- ("hostname".to_string(), hostname.to_string()),
- ]
- .iter()
- .cloned()
- .collect(),
- service: ConsulPublishService {
- service_id: node.clone(),
- service_name: consul_service_name.to_string(),
- tags: vec!["advertised-by-garage".into(), hostname.into()],
+// ----
+
+pub struct ConsulDiscovery {
+ config: ConsulDiscoveryConfig,
+ client: reqwest::Client,
+}
+
+impl ConsulDiscovery {
+ pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
+ let client = match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ if config.tls_skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(identity)
+ .build()?
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ .identity(identity)
+ .build()?
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(identity)
+ .build()?
+ }
+ }
+ (None, None) => reqwest::Client::new(),
+ _ => return Err(ConsulError::InvalidTLSConfig),
+ };
+
+ Ok(Self { client, config })
+ }
+
+ // ---- READING FROM CONSUL CATALOG ----
+
+ pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
+ let url = format!(
+ "{}/v1/catalog/service/{}",
+ self.config.consul_http_addr, self.config.service_name
+ );
+
+ let http = self.client.get(&url).send().await?;
+ let entries: Vec<ConsulQueryEntry> = http.json().await?;
+
+ let mut ret = vec![];
+ for ent in entries {
+ let ip = ent.address.parse::<IpAddr>().ok();
+ let pubkey = ent
+ .node_meta
+ .get("pubkey")
+ .and_then(|k| hex::decode(&k).ok())
+ .and_then(|k| NodeID::from_slice(&k[..]));
+ if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
+ ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
+ } else {
+ warn!(
+ "Could not process node spec from Consul: {:?} (invalid IP or public key)",
+ ent
+ );
+ }
+ }
+ debug!("Got nodes from Consul: {:?}", ret);
+
+ Ok(ret)
+ }
+
+ // ---- PUBLISHING TO CONSUL CATALOG ----
+
+ pub async fn publish_consul_service(
+ &self,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+ ) -> Result<(), ConsulError> {
+ let node = format!("garage:{}", hex::encode(&node_id[..8]));
+
+ let advertisement = ConsulPublishEntry {
+ node: node.clone(),
address: rpc_public_addr.ip(),
- port: rpc_public_addr.port(),
- },
- };
-
- let url = format!("http://{}/v1/catalog/register", consul_host);
- let req_body = serde_json::to_string(&advertisment)?;
- debug!("Request body for consul adv: {}", req_body);
-
- let req = Request::builder()
- .uri(url)
- .method(Method::PUT)
- .body(Body::from(req_body))?;
-
- let client = Client::new();
-
- let resp = client.request(req).await?;
- debug!("Response of advertising to Consul: {:?}", resp);
- let resp_code = resp.status();
- let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?;
- debug!(
- "{}",
- std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>")
- );
-
- if resp_code != StatusCode::OK {
- return Err(Error::Message(format!("HTTP error {}", resp_code)));
+ node_meta: [
+ ("pubkey".to_string(), hex::encode(node_id)),
+ ("hostname".to_string(), hostname.to_string()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ service: ConsulPublishService {
+ service_id: node.clone(),
+ service_name: self.config.service_name.clone(),
+ tags: vec!["advertised-by-garage".into(), hostname.into()],
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ };
+
+ let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
+
+ let http = self.client.put(&url).json(&advertisement).send().await?;
+ http.error_for_status()?;
+
+ Ok(())
}
+}
- Ok(())
+/// Regroup all Consul discovery errors
+#[derive(Debug, Error)]
+pub enum ConsulError {
+ #[error(display = "IO error: {}", _0)]
+ Io(#[error(source)] std::io::Error),
+ #[error(display = "HTTP error: {}", _0)]
+ Reqwest(#[error(source)] reqwest::Error),
+ #[error(display = "Invalid Consul TLS configuration")]
+ InvalidTLSConfig,
}
diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs
index 197245aa..63c6567d 100644
--- a/src/rpc/kubernetes.rs
+++ b/src/rpc/kubernetes.rs
@@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
+use garage_util::config::KubernetesDiscoveryConfig;
+
static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
@@ -41,15 +43,14 @@ pub async fn create_kubernetes_crd() -> Result<(), kube::Error> {
}
pub async fn get_kubernetes_nodes(
- kubernetes_service_name: &str,
- kubernetes_namespace: &str,
+ kubernetes_config: &KubernetesDiscoveryConfig,
) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> {
let client = Client::try_default().await?;
- let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
+ let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace);
let lp = ListParams::default().labels(&format!(
"garage.{}/service={}",
- K8S_GROUP, kubernetes_service_name
+ K8S_GROUP, kubernetes_config.service_name
));
let nodes = nodes.list(&lp).await?;
@@ -73,8 +74,7 @@ pub async fn get_kubernetes_nodes(
}
pub async fn publish_kubernetes_node(
- kubernetes_service_name: &str,
- kubernetes_namespace: &str,
+ kubernetes_config: &KubernetesDiscoveryConfig,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
@@ -93,13 +93,13 @@ pub async fn publish_kubernetes_node(
let labels = node.metadata.labels.insert(BTreeMap::new());
labels.insert(
format!("garage.{}/service", K8S_GROUP),
- kubernetes_service_name.to_string(),
+ kubernetes_config.service_name.to_string(),
);
debug!("Node object to be applied: {:#?}", node);
let client = Client::try_default().await?;
- let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
+ let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace);
if let Ok(old_node) = nodes.get(&node_pubkey).await {
node.metadata.resource_version = old_node.metadata.resource_version;
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 17e92dd7..248b9b52 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -3,6 +3,7 @@
#[macro_use]
extern crate tracing;
+#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 9e0bfa11..d6576f20 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -23,12 +23,15 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
+#[cfg(feature = "kubernetes-discovery")]
+use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
-use crate::consul::*;
+#[cfg(feature = "consul-discovery")]
+use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
@@ -90,12 +93,14 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
+ #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>,
- consul_discovery: Option<ConsulDiscoveryParam>,
+ #[cfg(feature = "consul-discovery")]
+ consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "kubernetes-discovery")]
- kubernetes_discovery: Option<KubernetesDiscoveryParam>,
+ kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
replication_factor: usize,
@@ -285,29 +290,21 @@ impl System {
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let consul_discovery = match (&config.consul_host, &config.consul_service_name) {
- (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam {
- consul_host: ch.to_string(),
- service_name: csn.to_string(),
- }),
- _ => None,
- };
-
- #[cfg(feature = "kubernetes-discovery")]
- let kubernetes_discovery = match (
- &config.kubernetes_service_name,
- &config.kubernetes_namespace,
- ) {
- (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam {
- service_name: ksn.to_string(),
- namespace: kn.to_string(),
- skip_crd: config.kubernetes_skip_crd,
- }),
- _ => None,
+ #[cfg(feature = "consul-discovery")]
+ let consul_discovery = match &config.consul_discovery {
+ Some(cfg) => Some(
+ ConsulDiscovery::new(cfg.clone())
+ .ok_or_message("Invalid Consul discovery configuration")?,
+ ),
+ None => None,
};
+ #[cfg(not(feature = "consul-discovery"))]
+ if config.consul_discovery.is_some() {
+ warn!("Consul discovery is not enabled in this build.");
+ }
#[cfg(not(feature = "kubernetes-discovery"))]
- if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() {
+ if config.kubernetes_discovery.is_some() {
warn!("Kubernetes discovery is not enabled in this build.");
}
@@ -329,11 +326,13 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
+ #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
+ #[cfg(feature = "consul-discovery")]
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
- kubernetes_discovery,
+ kubernetes_discovery: config.kubernetes_discovery.clone(),
ring,
update_ring: Mutex::new(update_ring),
@@ -432,6 +431,7 @@ impl System {
// ---- INTERNALS ----
+ #[cfg(feature = "consul-discovery")]
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
let c = match &self.consul_discovery {
Some(c) => c,
@@ -446,9 +446,7 @@ impl System {
}
};
- publish_consul_service(
- &c.consul_host,
- &c.service_name,
+ c.publish_consul_service(
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -473,8 +471,7 @@ impl System {
};
publish_kubernetes_node(
- &k.service_name,
- &k.namespace,
+ k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -644,8 +641,9 @@ impl System {
}
// Fetch peer list from Consul
+ #[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery {
- match get_consul_nodes(&c.consul_host, &c.service_name).await {
+ match c.get_consul_nodes().await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@@ -667,7 +665,7 @@ impl System {
};
}
- match get_kubernetes_nodes(&k.service_name, &k.namespace).await {
+ match get_kubernetes_nodes(k).await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@@ -691,6 +689,7 @@ impl System {
warn!("Could not save peer list to file: {}", e);
}
+ #[cfg(feature = "consul-discovery")]
self.background.spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
@@ -785,15 +784,3 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
-
-struct ConsulDiscoveryParam {
- consul_host: String,
- service_name: String,
-}
-
-#[cfg(feature = "kubernetes-discovery")]
-struct KubernetesDiscoveryParam {
- service_name: String,
- namespace: String,
- skip_crd: bool,
-}
diff --git a/src/util/config.rs b/src/util/config.rs
index 2d4b4f57..04f8375a 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -46,20 +46,17 @@ pub struct Config {
/// Timeout for Netapp RPC calls
pub rpc_timeout_msec: Option<u64>,
+ // -- Bootstraping and discovery
/// Bootstrap peers RPC address
#[serde(default)]
pub bootstrap_peers: Vec<String>,
- /// Consul host to connect to to discover more peers
- pub consul_host: Option<String>,
- /// Consul service name to use
- pub consul_service_name: Option<String>,
- /// Kubernetes namespace the service discovery resources are be created in
- pub kubernetes_namespace: Option<String>,
- /// Service name to filter for in k8s custom resources
- pub kubernetes_service_name: Option<String>,
- /// Skip creation of the garagenodes CRD
+
+ /// Configuration for automatic node discovery through Consul
#[serde(default)]
- pub kubernetes_skip_crd: bool,
+ pub consul_discovery: Option<ConsulDiscoveryConfig>,
+ /// Configuration for automatic node discovery through Kubernetes
+ #[serde(default)]
+ pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
// -- DB
/// Database engine to use for metadata (options: sled, sqlite, lmdb)
@@ -129,6 +126,34 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
+#[derive(Deserialize, Debug, Clone)]
+pub struct ConsulDiscoveryConfig {
+ /// Consul http or https address to connect to to discover more peers
+ pub consul_http_addr: String,
+ /// Consul service name to use
+ pub service_name: String,
+ /// CA TLS certificate to use when connecting to Consul
+ pub ca_cert: Option<String>,
+ /// Client TLS certificate to use when connecting to Consul
+ pub client_cert: Option<String>,
+ /// Client TLS key to use when connecting to Consul
+ pub client_key: Option<String>,
+ /// Skip TLS hostname verification
+ #[serde(default)]
+ pub tls_skip_verify: bool,
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct KubernetesDiscoveryConfig {
+ /// Kubernetes namespace the service discovery resources are be created in
+ pub namespace: String,
+ /// Service name to filter for in k8s custom resources
+ pub service_name: String,
+ /// Skip creation of the garagenodes CRD
+ #[serde(default)]
+ pub skip_crd: bool,
+}
+
fn default_db_engine() -> String {
"sled".into()
}
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index c2322073..1541c297 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -318,7 +318,7 @@ fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> {
}
Some(_) => match path_utf8 {
Cow::Borrowed(pu8) => Ok((&pu8[1..]).into()),
- Cow::Owned(pu8) => Ok((&pu8[1..]).to_string().into()),
+ Cow::Owned(pu8) => Ok(pu8[1..].to_string().into()),
},
}
}