aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs21
1 files changed, 19 insertions, 2 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index d3bc174d..7d8df658 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
+use serde::Serialize;
use bytes::IntoBuf;
use hyper::service::{make_service_fn, service_fn};
use hyper::server::conn::AddrStream;
@@ -11,6 +12,16 @@ use crate::error::Error;
use crate::data::rmp_to_vec_all_named;
use crate::proto::Message;
use crate::server::Garage;
+use crate::block::*;
+
+fn debug_serialize<T: Serialize>(x: T) -> Result<String, Error> {
+ let ss = serde_json::to_string(&x)?;
+ if ss.len() > 100 {
+ Ok(ss[..100].to_string())
+ } else {
+ Ok(ss)
+ }
+}
fn err_to_msg(x: Result<Message, Error>) -> Message {
match x {
@@ -29,7 +40,7 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
- eprintln!("RPC from {}: {}", addr, serde_json::to_string(&msg)?);
+ eprintln!("RPC from {}: {} ({} bytes)", addr, debug_serialize(&msg)?, whole_body.len());
let sys = garage.system.clone();
let resp = err_to_msg(match &msg {
@@ -38,6 +49,12 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
Message::PullConfig => sys.handle_pull_config().await,
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
+ Message::PutBlock(m) => {
+ write_block(garage, &m.hash, &m.data).await
+ }
+ Message::GetBlock(h) => {
+ read_block(garage, &h).await
+ }
Message::TableRPC(table, msg) => {
if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
rpc_handler.handle(&msg[..]).await
@@ -50,7 +67,7 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});
- eprintln!("reply to {}: {}", addr, serde_json::to_string(&resp)?);
+ eprintln!("reply to {}: {}", addr, debug_serialize(&resp)?);
Ok(Response::new(Body::from(
rmp_to_vec_all_named(&resp)?