aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs19
1 files changed, 13 insertions, 6 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index f78c27f1..4541e4da 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
+use std::time::Instant;
use std::sync::Arc;
use bytes::IntoBuf;
@@ -36,17 +37,21 @@ async fn handle_func<M, F, Fut>(
handler: Arc<F>,
req: Request<Body>,
sockaddr: SocketAddr,
+ name: Arc<String>,
) -> Result<Response<Body>, Error>
where
M: RpcMessage + 'static,
F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
{
+ let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
+ let req_str = debug_serialize(&msg);
match handler(msg, sockaddr).await {
Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
+ trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis());
Ok(Response::new(Body::from(resp_bytes)))
}
Err(e) => {
@@ -54,6 +59,7 @@ where
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code();
+ warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str);
Ok(err_response)
}
}
@@ -74,10 +80,11 @@ impl RpcServer {
F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
{
+ let name2 = Arc::new(name.clone());
let handler_arc = Arc::new(handler);
let handler = Box::new(move |req: Request<Body>, sockaddr: SocketAddr| {
let handler2 = handler_arc.clone();
- let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr));
+ let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr, name2.clone()));
b
});
self.handlers.insert(name, handler);
@@ -107,7 +114,7 @@ impl RpcServer {
let resp_waiter = tokio::spawn(handler(req, addr));
match resp_waiter.await {
Err(err) => {
- eprintln!("Handler await error: {}", err);
+ warn!("Handler await error: {}", err);
let mut ise = Response::default();
*ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(ise)
@@ -163,7 +170,7 @@ impl RpcServer {
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
self_arc.clone().handler(req, client_addr).map_err(|e| {
- eprintln!("RPC handler error: {}", e);
+ warn!("RPC handler error: {}", e);
e
})
}))
@@ -173,7 +180,7 @@ impl RpcServer {
let server = Server::builder(incoming).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("RPC server listening on http://{}", self.bind_addr);
+ info!("RPC server listening on http://{}", self.bind_addr);
graceful.await?;
} else {
@@ -184,7 +191,7 @@ impl RpcServer {
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
self_arc.clone().handler(req, client_addr).map_err(|e| {
- eprintln!("RPC handler error: {}", e);
+ warn!("RPC handler error: {}", e);
e
})
}))
@@ -194,7 +201,7 @@ impl RpcServer {
let server = Server::bind(&self.bind_addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("RPC server listening on http://{}", self.bind_addr);
+ info!("RPC server listening on http://{}", self.bind_addr);
graceful.await?;
}