From bacc76a057bcd90d61bfe3584bd3cdbadc748364 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Apr 2020 22:00:41 +0200 Subject: Some work in actually storing things --- src/rpc_server.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) (limited to 'src/rpc_server.rs') diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 55c7482b..eda300c4 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use futures_util::future::FutureExt; use bytes::IntoBuf; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; @@ -9,7 +10,7 @@ use futures::future::Future; use crate::error::Error; use crate::proto::Message; -use crate::membership::System; +use crate::server::Garage; fn err_to_msg(x: Result) -> Message { match x { @@ -18,7 +19,7 @@ fn err_to_msg(x: Result) -> Message { } } -async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Result, Error> { +async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> Result, Error> { if req.method() != &Method::POST { let mut bad_request = Response::default(); *bad_request.status_mut() = StatusCode::BAD_REQUEST; @@ -30,12 +31,21 @@ async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Resu eprintln!("RPC from {}: {:?}", addr, msg); + let sys = garage.system.clone(); let resp = err_to_msg(match &msg { Message::Ping(ping) => sys.handle_ping(&addr, ping).await, Message::PullStatus => sys.handle_pull_status().await, Message::PullConfig => sys.handle_pull_config().await, Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, + Message::TableRPC(table, msg) => { + if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) { + rpc_handler.handle(&msg[..]).await + .map(|rep| Message::TableRPC(table.to_string(), rep)) + } else { + Ok(Message::Error(format!("Unknown table: {}", table))) + } + } _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), }); @@ -46,16 +56,16 @@ async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Resu } -pub async fn run_rpc_server(sys: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { - let bind_addr = ([0, 0, 0, 0], sys.config.rpc_port).into(); +pub async fn run_rpc_server(garage: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { + let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); let service = make_service_fn(|conn: &AddrStream| { let client_addr = conn.remote_addr(); - let sys = sys.clone(); + let garage = garage.clone(); async move { Ok::<_, Error>(service_fn(move |req: Request| { - let sys = sys.clone(); - handler(sys, req, client_addr) + let garage = garage.clone(); + handler(garage, req, client_addr) })) } }); -- cgit v1.2.3