aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs24
1 files changed, 17 insertions, 7 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 55c7482b..eda300c4 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
+use futures_util::future::FutureExt;
use bytes::IntoBuf;
use hyper::service::{make_service_fn, service_fn};
use hyper::server::conn::AddrStream;
@@ -9,7 +10,7 @@ use futures::future::Future;
use crate::error::Error;
use crate::proto::Message;
-use crate::membership::System;
+use crate::server::Garage;
fn err_to_msg(x: Result<Message, Error>) -> Message {
match x {
@@ -18,7 +19,7 @@ fn err_to_msg(x: Result<Message, Error>) -> Message {
}
}
-async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
+async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
if req.method() != &Method::POST {
let mut bad_request = Response::default();
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
@@ -30,12 +31,21 @@ async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Resu
eprintln!("RPC from {}: {:?}", addr, msg);
+ let sys = garage.system.clone();
let resp = err_to_msg(match &msg {
Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
Message::PullStatus => sys.handle_pull_status().await,
Message::PullConfig => sys.handle_pull_config().await,
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
+ Message::TableRPC(table, msg) => {
+ if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
+ rpc_handler.handle(&msg[..]).await
+ .map(|rep| Message::TableRPC(table.to_string(), rep))
+ } else {
+ Ok(Message::Error(format!("Unknown table: {}", table)))
+ }
+ }
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});
@@ -46,16 +56,16 @@ async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Resu
}
-pub async fn run_rpc_server(sys: Arc<System>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
- let bind_addr = ([0, 0, 0, 0], sys.config.rpc_port).into();
+pub async fn run_rpc_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
+ let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into();
let service = make_service_fn(|conn: &AddrStream| {
let client_addr = conn.remote_addr();
- let sys = sys.clone();
+ let garage = garage.clone();
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let sys = sys.clone();
- handler(sys, req, client_addr)
+ let garage = garage.clone();
+ handler(garage, req, client_addr)
}))
}
});