diff options
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r-- | src/rpc_client.rs | 53 |
1 files changed, 44 insertions, 9 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 8d8b724b..247f114e 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -6,13 +6,16 @@ use bytes::IntoBuf; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; -use hyper::client::Client; +use hyper::client::{Client, HttpConnector}; use hyper::{Body, Method, Request, StatusCode}; +use hyper_rustls::HttpsConnector; use crate::data::*; use crate::error::Error; use crate::membership::System; use crate::proto::Message; +use crate::server::*; +use crate::tls_util; pub async fn rpc_call_many( sys: Arc<System>, @@ -88,14 +91,34 @@ pub async fn rpc_call( sys.rpc_client.call(&addr, msg, timeout).await } -pub struct RpcClient { - pub client: Client<hyper::client::HttpConnector, hyper::Body>, +pub enum RpcClient { + HTTP(Client<HttpConnector, hyper::Body>), + HTTPS(Client<HttpsConnector<HttpConnector>, hyper::Body>), } impl RpcClient { - pub fn new() -> Self { - RpcClient { - client: Client::new(), + pub fn new(tls_config: &Option<TlsConfig>) -> Result<Self, Error> { + if let Some(cf) = tls_config { + let ca_certs = tls_util::load_certs(&cf.ca_cert)?; + let node_certs = tls_util::load_certs(&cf.node_cert)?; + let node_key = tls_util::load_private_key(&cf.node_key)?; + + let mut config = rustls::ClientConfig::new(); + + for crt in ca_certs.iter() { + config.root_store.add(crt)?; + } + + config.set_single_client_cert([&ca_certs[..], &node_certs[..]].concat(), node_key)?; + + let mut http_connector = HttpConnector::new(); + http_connector.enforce_http(false); + let connector = + HttpsConnector::<HttpConnector>::from((http_connector, Arc::new(config))); + + Ok(RpcClient::HTTPS(Client::builder().build(connector))) + } else { + Ok(RpcClient::HTTP(Client::new())) } } @@ -105,14 +128,26 @@ impl RpcClient { msg: &Message, timeout: Duration, ) -> Result<Message, Error> { - let uri = format!("http://{}/rpc", to_addr); + let uri = match self { + RpcClient::HTTP(_) => format!("http://{}/rpc", to_addr), + RpcClient::HTTPS(_) => format!("https://{}/rpc", to_addr), + }; + let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(rmp_to_vec_all_named(msg)?))?; - let resp_fut = self.client.request(req).fuse(); - let resp = tokio::time::timeout(timeout, resp_fut).await??; + let resp_fut = match self { + RpcClient::HTTP(client) => client.request(req).fuse(), + RpcClient::HTTPS(client) => client.request(req).fuse(), + }; + let resp = tokio::time::timeout(timeout, resp_fut) + .await? + .map_err(|e| { + eprintln!("RPC client error: {}", e); + e + })?; if resp.status() == StatusCode::OK { let body = hyper::body::to_bytes(resp.into_body()).await?; |