aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/consul.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/consul.rs')
-rw-r--r--src/rpc/consul.rs129
1 files changed, 81 insertions, 48 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 15acbcef..05ed278a 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -1,14 +1,66 @@
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
-use hyper::client::Client;
-use hyper::StatusCode;
-use hyper::{Body, Method, Request};
+use tokio::fs::File;
+use tokio::io::AsyncReadExt;
+
+use err_derive::Error;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
-use garage_util::error::Error;
+use garage_util::config::ConsulDiscoveryConfig;
+
+async fn make_consul_client(
+ config: &ConsulDiscoveryConfig,
+) -> Result<reqwest::Client, ConsulError> {
+ match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)
+ .await?
+ .read_to_end(&mut client_cert_buf)
+ .await?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)
+ .await?
+ .read_to_end(&mut client_key_buf)
+ .await?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ if config.tls_skip_verify {
+ Ok(reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(identity)
+ .build()?)
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)
+ .await?
+ .read_to_end(&mut ca_cert_buf)
+ .await?;
+
+ Ok(reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ .identity(identity)
+ .build()?)
+ } else {
+ Ok(reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(identity)
+ .build()?)
+ }
+ }
+ (None, None) => Ok(reqwest::Client::new()),
+ _ => Err(ConsulError::InvalidTLSConfig),
+ }
+}
// ---- READING FROM CONSUL CATALOG ----
@@ -23,27 +75,16 @@ struct ConsulQueryEntry {
}
pub async fn get_consul_nodes(
- consul_host: &str,
- consul_service_name: &str,
-) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
+ consul_config: &ConsulDiscoveryConfig,
+) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
let url = format!(
"http://{}/v1/catalog/service/{}",
- consul_host, consul_service_name
+ consul_config.consul_host, consul_config.service_name
);
- let req = Request::builder()
- .uri(url)
- .method(Method::GET)
- .body(Body::default())?;
-
- let client = Client::new();
-
- let resp = client.request(req).await?;
- if resp.status() != StatusCode::OK {
- return Err(Error::Message(format!("HTTP error {}", resp.status())));
- }
- let body = hyper::body::to_bytes(resp.into_body()).await?;
- let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
+ let client = make_consul_client(consul_config).await?;
+ let http = client.get(&url).send().await?;
+ let entries: Vec<ConsulQueryEntry> = http.json().await?;
let mut ret = vec![];
for ent in entries {
@@ -96,15 +137,14 @@ struct ConsulPublishService {
}
pub async fn publish_consul_service(
- consul_host: &str,
- consul_service_name: &str,
+ consul_config: &ConsulDiscoveryConfig,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
-) -> Result<(), Error> {
+) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
- let advertisment = ConsulPublishEntry {
+ let advertisement = ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
node_meta: [
@@ -116,36 +156,29 @@ pub async fn publish_consul_service(
.collect(),
service: ConsulPublishService {
service_id: node.clone(),
- service_name: consul_service_name.to_string(),
+ service_name: consul_config.service_name.clone(),
tags: vec!["advertised-by-garage".into(), hostname.into()],
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
};
- let url = format!("http://{}/v1/catalog/register", consul_host);
- let req_body = serde_json::to_string(&advertisment)?;
- debug!("Request body for consul adv: {}", req_body);
+ let url = format!("http://{}/v1/catalog/register", consul_config.consul_host);
- let req = Request::builder()
- .uri(url)
- .method(Method::PUT)
- .body(Body::from(req_body))?;
-
- let client = Client::new();
-
- let resp = client.request(req).await?;
- debug!("Response of advertising to Consul: {:?}", resp);
- let resp_code = resp.status();
- let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?;
- debug!(
- "{}",
- std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>")
- );
-
- if resp_code != StatusCode::OK {
- return Err(Error::Message(format!("HTTP error {}", resp_code)));
- }
+ let client = make_consul_client(consul_config).await?;
+ let http = client.put(&url).json(&advertisement).send().await?;
+ http.error_for_status()?;
Ok(())
}
+
+/// Regroup all Garage errors
+#[derive(Debug, Error)]
+pub enum ConsulError {
+ #[error(display = "IO error: {}", _0)]
+ Io(#[error(source)] std::io::Error),
+ #[error(display = "HTTP error: {}", _0)]
+ Reqwest(#[error(source)] reqwest::Error),
+ #[error(display = "Invalid Consul TLS configuration")]
+ InvalidTLSConfig,
+}