aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
commit3477864142ed09c36abea1111937b829fb41c8a4 (patch)
treed95221e66b9c014af7f4dba61ae4ff113c0e409a /src/rpc_server.rs
parentd66c0d6833ddbeb61e34ee222dde92a5363bda1f (diff)
downloadgarage-3477864142ed09c36abea1111937b829fb41c8a4.tar.gz
garage-3477864142ed09c36abea1111937b829fb41c8a4.zip
Fix the Sync issue. Details:
So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type.
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs53
1 files changed, 30 insertions, 23 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 7d8df658..9eeac5f3 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -1,18 +1,18 @@
use std::net::SocketAddr;
use std::sync::Arc;
-use serde::Serialize;
use bytes::IntoBuf;
-use hyper::service::{make_service_fn, service_fn};
+use futures::future::Future;
use hyper::server::conn::AddrStream;
+use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
-use futures::future::Future;
+use serde::Serialize;
-use crate::error::Error;
+use crate::block::*;
use crate::data::rmp_to_vec_all_named;
+use crate::error::Error;
use crate::proto::Message;
use crate::server::Garage;
-use crate::block::*;
fn debug_serialize<T: Serialize>(x: T) -> Result<String, Error> {
let ss = serde_json::to_string(&x)?;
@@ -30,7 +30,11 @@ fn err_to_msg(x: Result<Message, Error>) -> Message {
}
}
-async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
+async fn handler(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ addr: SocketAddr,
+) -> Result<Response<Body>, Error> {
if req.method() != &Method::POST {
let mut bad_request = Response::default();
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
@@ -40,7 +44,12 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
- eprintln!("RPC from {}: {} ({} bytes)", addr, debug_serialize(&msg)?, whole_body.len());
+ eprintln!(
+ "RPC from {}: {} ({} bytes)",
+ addr,
+ debug_serialize(&msg)?,
+ whole_body.len()
+ );
let sys = garage.system.clone();
let resp = err_to_msg(match &msg {
@@ -49,15 +58,13 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
Message::PullConfig => sys.handle_pull_config().await,
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
- Message::PutBlock(m) => {
- write_block(garage, &m.hash, &m.data).await
- }
- Message::GetBlock(h) => {
- read_block(garage, &h).await
- }
+ Message::PutBlock(m) => write_block(garage, &m.hash, &m.data).await,
+ Message::GetBlock(h) => read_block(garage, &h).await,
Message::TableRPC(table, msg) => {
if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
- rpc_handler.handle(&msg[..]).await
+ rpc_handler
+ .handle(&msg[..])
+ .await
.map(|rep| Message::TableRPC(table.to_string(), rep))
} else {
Ok(Message::Error(format!("Unknown table: {}", table)))
@@ -69,16 +76,16 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
eprintln!("reply to {}: {}", addr, debug_serialize(&resp)?);
- Ok(Response::new(Body::from(
- rmp_to_vec_all_named(&resp)?
- )))
+ Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?)))
}
+pub async fn run_rpc_server(
+ garage: Arc<Garage>,
+ shutdown_signal: impl Future<Output = ()>,
+) -> Result<(), hyper::Error> {
+ let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into();
-pub async fn run_rpc_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
- let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into();
-
- let service = make_service_fn(|conn: &AddrStream| {
+ let service = make_service_fn(|conn: &AddrStream| {
let client_addr = conn.remote_addr();
let garage = garage.clone();
async move {
@@ -89,10 +96,10 @@ pub async fn run_rpc_server(garage: Arc<Garage>, shutdown_signal: impl Future<Ou
}
});
- let server = Server::bind(&bind_addr).serve(service) ;
+ let server = Server::bind(&bind_addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("RPC server listening on http://{}", bind_addr);
+ println!("RPC server listening on http://{}", bind_addr);
graceful.await
}