diff options
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r-- | src/rpc_server.rs | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs index f78c27f1..4541e4da 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; +use std::time::Instant; use std::sync::Arc; use bytes::IntoBuf; @@ -36,17 +37,21 @@ async fn handle_func<M, F, Fut>( handler: Arc<F>, req: Request<Body>, sockaddr: SocketAddr, + name: Arc<String>, ) -> Result<Response<Body>, Error> where M: RpcMessage + 'static, F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<M, Error>> + Send + 'static, { + let begin_time = Instant::now(); let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; + let req_str = debug_serialize(&msg); match handler(msg, sockaddr).await { Ok(resp) => { let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; + trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis()); Ok(Response::new(Body::from(resp_bytes))) } Err(e) => { @@ -54,6 +59,7 @@ where let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?; let mut err_response = Response::new(Body::from(rep_bytes)); *err_response.status_mut() = e.http_status_code(); + warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str); Ok(err_response) } } @@ -74,10 +80,11 @@ impl RpcServer { F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<M, Error>> + Send + 'static, { + let name2 = Arc::new(name.clone()); let handler_arc = Arc::new(handler); let handler = Box::new(move |req: Request<Body>, sockaddr: SocketAddr| { let handler2 = handler_arc.clone(); - let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr)); + let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr, name2.clone())); b }); self.handlers.insert(name, handler); @@ -107,7 +114,7 @@ impl RpcServer { let resp_waiter = tokio::spawn(handler(req, addr)); match resp_waiter.await { Err(err) => { - eprintln!("Handler await error: {}", err); + warn!("Handler await error: {}", err); let mut ise = Response::default(); *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; Ok(ise) @@ -163,7 +170,7 @@ impl RpcServer { async move { Ok::<_, Error>(service_fn(move |req: Request<Body>| { self_arc.clone().handler(req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); + warn!("RPC handler error: {}", e); e }) })) @@ -173,7 +180,7 @@ impl RpcServer { let server = Server::builder(incoming).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", self.bind_addr); + info!("RPC server listening on http://{}", self.bind_addr); graceful.await?; } else { @@ -184,7 +191,7 @@ impl RpcServer { async move { Ok::<_, Error>(service_fn(move |req: Request<Body>| { self_arc.clone().handler(req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); + warn!("RPC handler error: {}", e); e }) })) @@ -194,7 +201,7 @@ impl RpcServer { let server = Server::bind(&self.bind_addr).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", self.bind_addr); + info!("RPC server listening on http://{}", self.bind_addr); graceful.await?; } |