diff options
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r-- | src/rpc_server.rs | 53 |
1 files changed, 30 insertions, 23 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 7d8df658..9eeac5f3 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,18 +1,18 @@ use std::net::SocketAddr; use std::sync::Arc; -use serde::Serialize; use bytes::IntoBuf; -use hyper::service::{make_service_fn, service_fn}; +use futures::future::Future; use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use futures::future::Future; +use serde::Serialize; -use crate::error::Error; +use crate::block::*; use crate::data::rmp_to_vec_all_named; +use crate::error::Error; use crate::proto::Message; use crate::server::Garage; -use crate::block::*; fn debug_serialize<T: Serialize>(x: T) -> Result<String, Error> { let ss = serde_json::to_string(&x)?; @@ -30,7 +30,11 @@ fn err_to_msg(x: Result<Message, Error>) -> Message { } } -async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { +async fn handler( + garage: Arc<Garage>, + req: Request<Body>, + addr: SocketAddr, +) -> Result<Response<Body>, Error> { if req.method() != &Method::POST { let mut bad_request = Response::default(); *bad_request.status_mut() = StatusCode::BAD_REQUEST; @@ -40,7 +44,12 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - eprintln!("RPC from {}: {} ({} bytes)", addr, debug_serialize(&msg)?, whole_body.len()); + eprintln!( + "RPC from {}: {} ({} bytes)", + addr, + debug_serialize(&msg)?, + whole_body.len() + ); let sys = garage.system.clone(); let resp = err_to_msg(match &msg { @@ -49,15 +58,13 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R Message::PullConfig => sys.handle_pull_config().await, Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, - Message::PutBlock(m) => { - write_block(garage, &m.hash, &m.data).await - } - Message::GetBlock(h) => { - read_block(garage, &h).await - } + Message::PutBlock(m) => write_block(garage, &m.hash, &m.data).await, + Message::GetBlock(h) => read_block(garage, &h).await, Message::TableRPC(table, msg) => { if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) { - rpc_handler.handle(&msg[..]).await + rpc_handler + .handle(&msg[..]) + .await .map(|rep| Message::TableRPC(table.to_string(), rep)) } else { Ok(Message::Error(format!("Unknown table: {}", table))) @@ -69,16 +76,16 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R eprintln!("reply to {}: {}", addr, debug_serialize(&resp)?); - Ok(Response::new(Body::from( - rmp_to_vec_all_named(&resp)? - ))) + Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?))) } +pub async fn run_rpc_server( + garage: Arc<Garage>, + shutdown_signal: impl Future<Output = ()>, +) -> Result<(), hyper::Error> { + let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); -pub async fn run_rpc_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { - let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); - - let service = make_service_fn(|conn: &AddrStream| { + let service = make_service_fn(|conn: &AddrStream| { let client_addr = conn.remote_addr(); let garage = garage.clone(); async move { @@ -89,10 +96,10 @@ pub async fn run_rpc_server(garage: Arc<Garage>, shutdown_signal: impl Future<Ou } }); - let server = Server::bind(&bind_addr).serve(service) ; + let server = Server::bind(&bind_addr).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", bind_addr); + println!("RPC server listening on http://{}", bind_addr); graceful.await } |