aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_server.rs')
-rw-r--r--src/rpc/rpc_server.rs14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 1c6bc8d2..0d82d796 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -4,7 +4,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
-use bytes::IntoBuf;
use futures::future::Future;
use futures_util::future::*;
use futures_util::stream::*;
@@ -15,6 +14,7 @@ use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
+use tokio_stream::wrappers::TcpListenerStream;
use garage_util::config::TlsConfig;
use garage_util::data::*;
@@ -47,11 +47,15 @@ where
{
let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
- let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
+ let msg = rmp_serde::decode::from_read::<_, M>(&whole_body[..])?;
trace!(
"Request message: {}",
- serde_json::to_string(&msg).unwrap_or("<json error>".into())
+ serde_json::to_string(&msg)
+ .unwrap_or("<json error>".into())
+ .chars()
+ .take(100)
+ .collect::<String>()
);
match handler(msg, sockaddr).await {
@@ -171,8 +175,8 @@ impl RpcServer {
config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
- let mut listener = TcpListener::bind(&self.bind_addr).await?;
- let incoming = listener.incoming().filter_map(|socket| async {
+ let listener = TcpListener::bind(&self.bind_addr).await?;
+ let incoming = TcpListenerStream::new(listener).filter_map(|socket| async {
match socket {
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
Ok(x) => Some(Ok::<_, hyper::Error>(x)),