diff options
Diffstat (limited to 'src/rpc/consul.rs')
-rw-r--r-- | src/rpc/consul.rs | 129 |
1 files changed, 81 insertions, 48 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 15acbcef..05ed278a 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -1,14 +1,66 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; -use hyper::client::Client; -use hyper::StatusCode; -use hyper::{Body, Method, Request}; +use tokio::fs::File; +use tokio::io::AsyncReadExt; + +use err_derive::Error; use serde::{Deserialize, Serialize}; use netapp::NodeID; -use garage_util::error::Error; +use garage_util::config::ConsulDiscoveryConfig; + +async fn make_consul_client( + config: &ConsulDiscoveryConfig, +) -> Result<reqwest::Client, ConsulError> { + match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert) + .await? + .read_to_end(&mut client_cert_buf) + .await?; + + let mut client_key_buf = vec![]; + File::open(client_key) + .await? + .read_to_end(&mut client_key_buf) + .await?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + if config.tls_skip_verify { + Ok(reqwest::Client::builder() + .use_rustls_tls() + .danger_accept_invalid_certs(true) + .identity(identity) + .build()?) + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert) + .await? + .read_to_end(&mut ca_cert_buf) + .await?; + + Ok(reqwest::Client::builder() + .use_rustls_tls() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .identity(identity) + .build()?) + } else { + Ok(reqwest::Client::builder() + .use_rustls_tls() + .identity(identity) + .build()?) + } + } + (None, None) => Ok(reqwest::Client::new()), + _ => Err(ConsulError::InvalidTLSConfig), + } +} // ---- READING FROM CONSUL CATALOG ---- @@ -23,27 +75,16 @@ struct ConsulQueryEntry { } pub async fn get_consul_nodes( - consul_host: &str, - consul_service_name: &str, -) -> Result<Vec<(NodeID, SocketAddr)>, Error> { + consul_config: &ConsulDiscoveryConfig, +) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> { let url = format!( "http://{}/v1/catalog/service/{}", - consul_host, consul_service_name + consul_config.consul_host, consul_config.service_name ); - let req = Request::builder() - .uri(url) - .method(Method::GET) - .body(Body::default())?; - - let client = Client::new(); - - let resp = client.request(req).await?; - if resp.status() != StatusCode::OK { - return Err(Error::Message(format!("HTTP error {}", resp.status()))); - } - let body = hyper::body::to_bytes(resp.into_body()).await?; - let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?; + let client = make_consul_client(consul_config).await?; + let http = client.get(&url).send().await?; + let entries: Vec<ConsulQueryEntry> = http.json().await?; let mut ret = vec![]; for ent in entries { @@ -96,15 +137,14 @@ struct ConsulPublishService { } pub async fn publish_consul_service( - consul_host: &str, - consul_service_name: &str, + consul_config: &ConsulDiscoveryConfig, node_id: NodeID, hostname: &str, rpc_public_addr: SocketAddr, -) -> Result<(), Error> { +) -> Result<(), ConsulError> { let node = format!("garage:{}", hex::encode(&node_id[..8])); - let advertisment = ConsulPublishEntry { + let advertisement = ConsulPublishEntry { node: node.clone(), address: rpc_public_addr.ip(), node_meta: [ @@ -116,36 +156,29 @@ pub async fn publish_consul_service( .collect(), service: ConsulPublishService { service_id: node.clone(), - service_name: consul_service_name.to_string(), + service_name: consul_config.service_name.clone(), tags: vec!["advertised-by-garage".into(), hostname.into()], address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }, }; - let url = format!("http://{}/v1/catalog/register", consul_host); - let req_body = serde_json::to_string(&advertisment)?; - debug!("Request body for consul adv: {}", req_body); + let url = format!("http://{}/v1/catalog/register", consul_config.consul_host); - let req = Request::builder() - .uri(url) - .method(Method::PUT) - .body(Body::from(req_body))?; - - let client = Client::new(); - - let resp = client.request(req).await?; - debug!("Response of advertising to Consul: {:?}", resp); - let resp_code = resp.status(); - let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?; - debug!( - "{}", - std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>") - ); - - if resp_code != StatusCode::OK { - return Err(Error::Message(format!("HTTP error {}", resp_code))); - } + let client = make_consul_client(consul_config).await?; + let http = client.put(&url).json(&advertisement).send().await?; + http.error_for_status()?; Ok(()) } + +/// Regroup all Garage errors +#[derive(Debug, Error)] +pub enum ConsulError { + #[error(display = "IO error: {}", _0)] + Io(#[error(source)] std::io::Error), + #[error(display = "HTTP error: {}", _0)] + Reqwest(#[error(source)] reqwest::Error), + #[error(display = "Invalid Consul TLS configuration")] + InvalidTLSConfig, +} |