diff options
Diffstat (limited to 'src/rpc/rpc_server.rs')
-rw-r--r-- | src/rpc/rpc_server.rs | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 1c6bc8d2..0d82d796 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -4,7 +4,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use bytes::IntoBuf; use futures::future::Future; use futures_util::future::*; use futures_util::stream::*; @@ -15,6 +14,7 @@ use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; +use tokio_stream::wrappers::TcpListenerStream; use garage_util::config::TlsConfig; use garage_util::data::*; @@ -47,11 +47,15 @@ where { let begin_time = Instant::now(); let whole_body = hyper::body::to_bytes(req.into_body()).await?; - let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; + let msg = rmp_serde::decode::from_read::<_, M>(&whole_body[..])?; trace!( "Request message: {}", - serde_json::to_string(&msg).unwrap_or("<json error>".into()) + serde_json::to_string(&msg) + .unwrap_or("<json error>".into()) + .chars() + .take(100) + .collect::<String>() ); match handler(msg, sockaddr).await { @@ -171,8 +175,8 @@ impl RpcServer { config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?; let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); - let mut listener = TcpListener::bind(&self.bind_addr).await?; - let incoming = listener.incoming().filter_map(|socket| async { + let listener = TcpListener::bind(&self.bind_addr).await?; + let incoming = TcpListenerStream::new(listener).filter_map(|socket| async { match socket { Ok(stream) => match tls_acceptor.clone().accept(stream).await { Ok(x) => Some(Ok::<_, hyper::Error>(x)), |