aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-12 15:51:19 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-12 15:51:19 +0200
commitd1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15 (patch)
tree74ac969472fad3baa8f5a3cdac6bfc6b3846d2e3 /src/rpc_server.rs
parent5967c5a5af430855fbd73f380041d63bd82f5ce1 (diff)
downloadgarage-d1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15.tar.gz
garage-d1e8f78b2cd28f4514ad6f7d54aae6aaa4ef3f15.zip
Trying to do TLS
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs98
1 files changed, 79 insertions, 19 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index f54b5099..f42d54ac 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -4,15 +4,20 @@ use std::sync::Arc;
use bytes::IntoBuf;
use futures::future::Future;
use futures_util::future::*;
+use futures_util::stream::*;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::Serialize;
+use tokio::net::{TcpListener, TcpStream};
+use tokio_rustls::server::TlsStream;
+use tokio_rustls::TlsAcceptor;
use crate::data::rmp_to_vec_all_named;
use crate::error::Error;
use crate::proto::Message;
use crate::server::Garage;
+use crate::tls_util;
fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) {
@@ -71,9 +76,7 @@ async fn handler(
// and the request handler simply sits there waiting for the task to finish.
// (if it's cancelled, that's not an issue)
// (TODO FIXME except if garage happens to shut down at that point)
- let write_fut = async move {
- garage.block_manager.write_block(&m.hash, &m.data).await
- };
+ let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await };
tokio::spawn(write_fut).await?
}
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
@@ -105,25 +108,82 @@ pub async fn run_rpc_server(
) -> Result<(), Error> {
let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into();
- let service = make_service_fn(|conn: &AddrStream| {
- let client_addr = conn.remote_addr();
- let garage = garage.clone();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let garage = garage.clone();
- handler(garage, req, client_addr).map_err(|e| {
- eprintln!("RPC handler error: {}", e);
- e
- })
- }))
+ if let Some(tls_config) = &garage.system.config.rpc_tls {
+ let ca_certs = tls_util::load_certs(&tls_config.ca_cert)?;
+ let node_certs = tls_util::load_certs(&tls_config.node_cert)?;
+ let node_key = tls_util::load_private_key(&tls_config.node_key)?;
+
+ let mut ca_store = rustls::RootCertStore::empty();
+ for crt in ca_certs.iter() {
+ ca_store.add(crt)?;
}
- });
- let server = Server::bind(&bind_addr).serve(service);
+ let mut config =
+ rustls::ServerConfig::new(rustls::AllowAnyAuthenticatedClient::new(ca_store));
+ config.set_single_cert([&ca_certs[..], &node_certs[..]].concat(), node_key)?;
+ let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
+
+ let mut listener = TcpListener::bind(&bind_addr).await?;
+ let incoming = listener.incoming().filter_map(|socket| async {
+ match socket {
+ Ok(stream) => match tls_acceptor.clone().accept(stream).await {
+ Ok(x) => Some(Ok::<_, hyper::Error>(x)),
+ Err(e) => {
+ eprintln!("RPC server TLS error: {}", e);
+ None
+ }
+ },
+ Err(_) => None,
+ }
+ });
+ let incoming = hyper::server::accept::from_stream(incoming);
+
+ let service = make_service_fn(|conn: &TlsStream<TcpStream>| {
+ let client_addr = conn
+ .get_ref()
+ .0
+ .peer_addr()
+ .unwrap_or(([0, 0, 0, 0], 0).into());
+ let garage = garage.clone();
+ async move {
+ Ok::<_, Error>(service_fn(move |req: Request<Body>| {
+ let garage = garage.clone();
+ handler(garage, req, client_addr).map_err(|e| {
+ eprintln!("RPC handler error: {}", e);
+ e
+ })
+ }))
+ }
+ });
+
+ let server = Server::builder(incoming).serve(service);
+
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ println!("RPC server listening on http://{}", bind_addr);
+
+ graceful.await?;
+ } else {
+ let service = make_service_fn(|conn: &AddrStream| {
+ let client_addr = conn.remote_addr();
+ let garage = garage.clone();
+ async move {
+ Ok::<_, Error>(service_fn(move |req: Request<Body>| {
+ let garage = garage.clone();
+ handler(garage, req, client_addr).map_err(|e| {
+ eprintln!("RPC handler error: {}", e);
+ e
+ })
+ }))
+ }
+ });
+
+ let server = Server::bind(&bind_addr).serve(service);
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("RPC server listening on http://{}", bind_addr);
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ println!("RPC server listening on http://{}", bind_addr);
+
+ graceful.await?;
+ }
- graceful.await?;
Ok(())
}