diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-12 15:51:19 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-12 15:51:19 +0200 |
commit | d1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15 (patch) | |
tree | 74ac969472fad3baa8f5a3cdac6bfc6b3846d2e3 /src/rpc_server.rs | |
parent | 5967c5a5af430855fbd73f380041d63bd82f5ce1 (diff) | |
download | garage-d1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15.tar.gz garage-d1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15.zip |
Trying to do TLS
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r-- | src/rpc_server.rs | 98 |
1 files changed, 79 insertions, 19 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs index f54b5099..f42d54ac 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -4,15 +4,20 @@ use std::sync::Arc; use bytes::IntoBuf; use futures::future::Future; use futures_util::future::*; +use futures_util::stream::*; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::Serialize; +use tokio::net::{TcpListener, TcpStream}; +use tokio_rustls::server::TlsStream; +use tokio_rustls::TlsAcceptor; use crate::data::rmp_to_vec_all_named; use crate::error::Error; use crate::proto::Message; use crate::server::Garage; +use crate::tls_util; fn debug_serialize<T: Serialize>(x: T) -> String { match serde_json::to_string(&x) { @@ -71,9 +76,7 @@ async fn handler( // and the request handler simply sits there waiting for the task to finish. // (if it's cancelled, that's not an issue) // (TODO FIXME except if garage happens to shut down at that point) - let write_fut = async move { - garage.block_manager.write_block(&m.hash, &m.data).await - }; + let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await }; tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, @@ -105,25 +108,82 @@ pub async fn run_rpc_server( ) -> Result<(), Error> { let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into(); - let service = make_service_fn(|conn: &AddrStream| { - let client_addr = conn.remote_addr(); - let garage = garage.clone(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - handler(garage, req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); - e - }) - })) + if let Some(tls_config) = &garage.system.config.rpc_tls { + let ca_certs = tls_util::load_certs(&tls_config.ca_cert)?; + let node_certs = tls_util::load_certs(&tls_config.node_cert)?; + let node_key = tls_util::load_private_key(&tls_config.node_key)?; + + let mut ca_store = rustls::RootCertStore::empty(); + for crt in ca_certs.iter() { + ca_store.add(crt)?; } - }); - let server = Server::bind(&bind_addr).serve(service); + let mut config = + rustls::ServerConfig::new(rustls::AllowAnyAuthenticatedClient::new(ca_store)); + config.set_single_cert([&ca_certs[..], &node_certs[..]].concat(), node_key)?; + let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); + + let mut listener = TcpListener::bind(&bind_addr).await?; + let incoming = listener.incoming().filter_map(|socket| async { + match socket { + Ok(stream) => match tls_acceptor.clone().accept(stream).await { + Ok(x) => Some(Ok::<_, hyper::Error>(x)), + Err(e) => { + eprintln!("RPC server TLS error: {}", e); + None + } + }, + Err(_) => None, + } + }); + let incoming = hyper::server::accept::from_stream(incoming); + + let service = make_service_fn(|conn: &TlsStream<TcpStream>| { + let client_addr = conn + .get_ref() + .0 + .peer_addr() + .unwrap_or(([0, 0, 0, 0], 0).into()); + let garage = garage.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let garage = garage.clone(); + handler(garage, req, client_addr).map_err(|e| { + eprintln!("RPC handler error: {}", e); + e + }) + })) + } + }); + + let server = Server::builder(incoming).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("RPC server listening on http://{}", bind_addr); + + graceful.await?; + } else { + let service = make_service_fn(|conn: &AddrStream| { + let client_addr = conn.remote_addr(); + let garage = garage.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let garage = garage.clone(); + handler(garage, req, client_addr).map_err(|e| { + eprintln!("RPC handler error: {}", e); + e + }) + })) + } + }); + + let server = Server::bind(&bind_addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", bind_addr); + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("RPC server listening on http://{}", bind_addr); + + graceful.await?; + } - graceful.await?; Ok(()) } |