From d1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 12 Apr 2020 15:51:19 +0200 Subject: Trying to do TLS --- src/rpc_server.rs | 98 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 19 deletions(-) (limited to 'src/rpc_server.rs') diff --git a/src/rpc_server.rs b/src/rpc_server.rs index f54b5099..f42d54ac 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -4,15 +4,20 @@ use std::sync::Arc; use bytes::IntoBuf; use futures::future::Future; use futures_util::future::*; +use futures_util::stream::*; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::Serialize; +use tokio::net::{TcpListener, TcpStream}; +use tokio_rustls::server::TlsStream; +use tokio_rustls::TlsAcceptor; use crate::data::rmp_to_vec_all_named; use crate::error::Error; use crate::proto::Message; use crate::server::Garage; +use crate::tls_util; fn debug_serialize(x: T) -> String { match serde_json::to_string(&x) { @@ -71,9 +76,7 @@ async fn handler( // and the request handler simply sits there waiting for the task to finish. // (if it's cancelled, that's not an issue) // (TODO FIXME except if garage happens to shut down at that point) - let write_fut = async move { - garage.block_manager.write_block(&m.hash, &m.data).await - }; + let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await }; tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, @@ -105,25 +108,82 @@ pub async fn run_rpc_server( ) -> Result<(), Error> { let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into(); - let service = make_service_fn(|conn: &AddrStream| { - let client_addr = conn.remote_addr(); - let garage = garage.clone(); - async move { - Ok::<_, Error>(service_fn(move |req: Request| { - let garage = garage.clone(); - handler(garage, req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); - e - }) - })) + if let Some(tls_config) = &garage.system.config.rpc_tls { + let ca_certs = tls_util::load_certs(&tls_config.ca_cert)?; + let node_certs = tls_util::load_certs(&tls_config.node_cert)?; + let node_key = tls_util::load_private_key(&tls_config.node_key)?; + + let mut ca_store = rustls::RootCertStore::empty(); + for crt in ca_certs.iter() { + ca_store.add(crt)?; } - }); - let server = Server::bind(&bind_addr).serve(service); + let mut config = + rustls::ServerConfig::new(rustls::AllowAnyAuthenticatedClient::new(ca_store)); + config.set_single_cert([&ca_certs[..], &node_certs[..]].concat(), node_key)?; + let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); + + let mut listener = TcpListener::bind(&bind_addr).await?; + let incoming = listener.incoming().filter_map(|socket| async { + match socket { + Ok(stream) => match tls_acceptor.clone().accept(stream).await { + Ok(x) => Some(Ok::<_, hyper::Error>(x)), + Err(e) => { + eprintln!("RPC server TLS error: {}", e); + None + } + }, + Err(_) => None, + } + }); + let incoming = hyper::server::accept::from_stream(incoming); + + let service = make_service_fn(|conn: &TlsStream| { + let client_addr = conn + .get_ref() + .0 + .peer_addr() + .unwrap_or(([0, 0, 0, 0], 0).into()); + let garage = garage.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request| { + let garage = garage.clone(); + handler(garage, req, client_addr).map_err(|e| { + eprintln!("RPC handler error: {}", e); + e + }) + })) + } + }); + + let server = Server::builder(incoming).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("RPC server listening on http://{}", bind_addr); + + graceful.await?; + } else { + let service = make_service_fn(|conn: &AddrStream| { + let client_addr = conn.remote_addr(); + let garage = garage.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request| { + let garage = garage.clone(); + handler(garage, req, client_addr).map_err(|e| { + eprintln!("RPC handler error: {}", e); + e + }) + })) + } + }); + + let server = Server::bind(&bind_addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", bind_addr); + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("RPC server listening on http://{}", bind_addr); + + graceful.await?; + } - graceful.await?; Ok(()) } -- cgit v1.2.3