diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-18 19:21:34 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-18 19:21:34 +0200 |
commit | f41583e1b731574b4bb13a20d4b3fd9fe3a899f5 (patch) | |
tree | a2c1d32284fa0dc30fdf5408afad8255d50e51f6 /src/rpc_server.rs | |
parent | 3f40ef149f6dd4d61ceb326b5691e186aec178c3 (diff) | |
download | garage-f41583e1b731574b4bb13a20d4b3fd9fe3a899f5.tar.gz garage-f41583e1b731574b4bb13a20d4b3fd9fe3a899f5.zip |
Massive RPC refactoring
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r-- | src/rpc_server.rs | 310 |
1 files changed, 167 insertions, 143 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 3410ab97..83f8ddc9 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; use bytes::IntoBuf; @@ -8,175 +10,197 @@ use futures_util::stream::*; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; use crate::data::*; use crate::error::Error; -use crate::proto::Message; -use crate::server::Garage; +use crate::server::TlsConfig; use crate::tls_util; -fn err_to_msg(x: Result<Message, Error>) -> Message { - match x { - Err(e) => Message::Error(format!("{}", e)), - Ok(msg) => msg, - } +pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} + +type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>; +type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>; + +pub struct RpcServer { + pub bind_addr: SocketAddr, + pub tls_config: Option<TlsConfig>, + + handlers: HashMap<String, Handler>, } -async fn handler( - garage: Arc<Garage>, +async fn handle_func<M, F, Fut>( + handler: Arc<F>, req: Request<Body>, - addr: SocketAddr, -) -> Result<Response<Body>, Error> { - if req.method() != &Method::POST { - let mut bad_request = Response::default(); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); - } - + sockaddr: SocketAddr, +) -> Result<Response<Body>, Error> +where + M: RpcMessage + 'static, + F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static, + Fut: Future<Output = Result<M, Error>> + Send + 'static, +{ let whole_body = hyper::body::to_bytes(req.into_body()).await?; - let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - - // eprintln!( - // "RPC from {}: {} ({} bytes)", - // addr, - // debug_serialize(&msg), - // whole_body.len() - // ); - - let sys = garage.system.clone(); - let resp = err_to_msg(match msg { - Message::Ping(ping) => sys.handle_ping(&addr, &ping).await, - - Message::PullStatus => sys.handle_pull_status(), - Message::PullConfig => sys.handle_pull_config(), - Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(&adv).await, - Message::AdvertiseConfig(adv) => sys.handle_advertise_config(&adv).await, - - Message::PutBlock(m) => { - // A RPC can be interrupted in the middle, however we don't want to write partial blocks, - // which might happen if the write_block() future is cancelled in the middle. - // To solve this, the write itself is in a spawned task that has its own separate lifetime, - // and the request handler simply sits there waiting for the task to finish. - // (if it's cancelled, that's not an issue) - // (TODO FIXME except if garage happens to shut down at that point) - let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await }; - tokio::spawn(write_fut).await? + let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; + match handler(msg, sockaddr).await { + Ok(resp) => { + let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; + Ok(Response::new(Body::from(resp_bytes))) } - Message::GetBlock(h) => garage.block_manager.read_block(&h).await, - Message::NeedBlockQuery(h) => garage - .block_manager - .need_block(&h) - .await - .map(Message::NeedBlockReply), - - Message::TableRPC(table, msg) => { - // Same trick for table RPCs than for PutBlock - let op_fut = async move { - if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) { - rpc_handler - .handle(&msg[..]) - .await - .map(|rep| Message::TableRPC(table.to_string(), rep)) - } else { - Ok(Message::Error(format!("Unknown table: {}", table))) - } - }; - tokio::spawn(op_fut).await? + Err(e) => { + let err_str = format!("{}", e); + let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?; + let mut err_response = Response::new(Body::from(rep_bytes)); + *err_response.status_mut() = e.http_status_code(); + Ok(err_response) } - - _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), - }); - - // eprintln!("reply to {}: {}", addr, debug_serialize(&resp)); - - Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?))) + } } -pub async fn run_rpc_server( - garage: Arc<Garage>, - shutdown_signal: impl Future<Output = ()>, -) -> Result<(), Error> { - let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into(); +impl RpcServer { + pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self { + Self { + bind_addr, + tls_config, + handlers: HashMap::new(), + } + } - if let Some(tls_config) = &garage.system.config.rpc_tls { - let ca_certs = tls_util::load_certs(&tls_config.ca_cert)?; - let node_certs = tls_util::load_certs(&tls_config.node_cert)?; - let node_key = tls_util::load_private_key(&tls_config.node_key)?; + pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F) + where + M: RpcMessage + 'static, + F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static, + Fut: Future<Output = Result<M, Error>> + Send + 'static, + { + let handler_arc = Arc::new(handler); + let handler = Box::new(move |req: Request<Body>, sockaddr: SocketAddr| { + let handler2 = handler_arc.clone(); + let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr)); + b + }); + self.handlers.insert(name, handler); + } - let mut ca_store = rustls::RootCertStore::empty(); - for crt in ca_certs.iter() { - ca_store.add(crt)?; + async fn handler( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, Error> { + if req.method() != &Method::POST { + let mut bad_request = Response::default(); + *bad_request.status_mut() = StatusCode::BAD_REQUEST; + return Ok(bad_request); } - let mut config = - rustls::ServerConfig::new(rustls::AllowAnyAuthenticatedClient::new(ca_store)); - config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?; - let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); - - let mut listener = TcpListener::bind(&bind_addr).await?; - let incoming = listener.incoming().filter_map(|socket| async { - match socket { - Ok(stream) => match tls_acceptor.clone().accept(stream).await { - Ok(x) => Some(Ok::<_, hyper::Error>(x)), - Err(e) => { - eprintln!("RPC server TLS error: {}", e); - None - } - }, - Err(_) => None, + let path = &req.uri().path()[1..]; + let handler = match self.handlers.get(path) { + Some(h) => h, + None => { + let mut not_found = Response::default(); + *not_found.status_mut() = StatusCode::NOT_FOUND; + return Ok(not_found); } - }); - let incoming = hyper::server::accept::from_stream(incoming); - - let service = make_service_fn(|conn: &TlsStream<TcpStream>| { - let client_addr = conn - .get_ref() - .0 - .peer_addr() - .unwrap_or(([0, 0, 0, 0], 0).into()); - let garage = garage.clone(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - handler(garage, req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); - e - }) - })) + }; + + let resp_waiter = tokio::spawn(handler(req, addr)); + match resp_waiter.await { + Err(_err) => { + let mut ise = Response::default(); + *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + Ok(ise) } - }); + Ok(Err(err)) => { + let mut bad_request = Response::new(Body::from(format!("{}", err))); + *bad_request.status_mut() = StatusCode::BAD_REQUEST; + Ok(bad_request) + } + Ok(Ok(resp)) => Ok(resp), + } + } - let server = Server::builder(incoming).serve(service); - - let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", bind_addr); - - graceful.await?; - } else { - let service = make_service_fn(|conn: &AddrStream| { - let client_addr = conn.remote_addr(); - let garage = garage.clone(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - handler(garage, req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); - e - }) - })) + pub async fn run( + self: Arc<Self>, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), Error> { + if let Some(tls_config) = self.tls_config.as_ref() { + let ca_certs = tls_util::load_certs(&tls_config.ca_cert)?; + let node_certs = tls_util::load_certs(&tls_config.node_cert)?; + let node_key = tls_util::load_private_key(&tls_config.node_key)?; + + let mut ca_store = rustls::RootCertStore::empty(); + for crt in ca_certs.iter() { + ca_store.add(crt)?; } - }); - let server = Server::bind(&bind_addr).serve(service); + let mut config = + rustls::ServerConfig::new(rustls::AllowAnyAuthenticatedClient::new(ca_store)); + config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?; + let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); + + let mut listener = TcpListener::bind(&self.bind_addr).await?; + let incoming = listener.incoming().filter_map(|socket| async { + match socket { + Ok(stream) => match tls_acceptor.clone().accept(stream).await { + Ok(x) => Some(Ok::<_, hyper::Error>(x)), + Err(e) => { + eprintln!("RPC server TLS error: {}", e); + None + } + }, + Err(_) => None, + } + }); + let incoming = hyper::server::accept::from_stream(incoming); + + let self_arc = self.clone(); + let service = make_service_fn(|conn: &TlsStream<TcpStream>| { + let client_addr = conn + .get_ref() + .0 + .peer_addr() + .unwrap_or(([0, 0, 0, 0], 0).into()); + let self_arc = self_arc.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + self_arc.clone().handler(req, client_addr).map_err(|e| { + eprintln!("RPC handler error: {}", e); + e + }) + })) + } + }); + + let server = Server::builder(incoming).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("RPC server listening on http://{}", self.bind_addr); + + graceful.await?; + } else { + let self_arc = self.clone(); + let service = make_service_fn(move |conn: &AddrStream| { + let client_addr = conn.remote_addr(); + let self_arc = self_arc.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + self_arc.clone().handler(req, client_addr).map_err(|e| { + eprintln!("RPC handler error: {}", e); + e + }) + })) + } + }); - let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", bind_addr); + let server = Server::bind(&self.bind_addr).serve(service); - graceful.await?; - } + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("RPC server listening on http://{}", self.bind_addr); - Ok(()) + graceful.await?; + } + + Ok(()) + } } |