aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_client.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
commit3477864142ed09c36abea1111937b829fb41c8a4 (patch)
treed95221e66b9c014af7f4dba61ae4ff113c0e409a /src/rpc_client.rs
parentd66c0d6833ddbeb61e34ee222dde92a5363bda1f (diff)
downloadgarage-3477864142ed09c36abea1111937b829fb41c8a4.tar.gz
garage-3477864142ed09c36abea1111937b829fb41c8a4.zip
Fix the Sync issue. Details:
So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type.
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r--src/rpc_client.rs71
1 files changed, 37 insertions, 34 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 134f8e98..7587782e 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -3,23 +3,25 @@ use std::sync::Arc;
use std::time::Duration;
use bytes::IntoBuf;
-use hyper::{Body, Method, Request, StatusCode};
-use hyper::client::Client;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
+use futures_util::future::FutureExt;
+use hyper::client::Client;
+use hyper::{Body, Method, Request, StatusCode};
use crate::data::*;
use crate::error::Error;
-use crate::proto::Message;
use crate::membership::System;
+use crate::proto::Message;
-pub async fn rpc_call_many(sys: Arc<System>,
- to: &[UUID],
- msg: &Message,
- timeout: Duration)
- -> Vec<Result<Message, Error>>
-{
- let mut resp_stream = to.iter()
+pub async fn rpc_call_many(
+ sys: Arc<System>,
+ to: &[UUID],
+ msg: &Message,
+ timeout: Duration,
+) -> Vec<Result<Message, Error>> {
+ let mut resp_stream = to
+ .iter()
.map(|to| rpc_call(sys.clone(), to, msg, timeout))
.collect::<FuturesUnordered<_>>();
@@ -30,14 +32,15 @@ pub async fn rpc_call_many(sys: Arc<System>,
results
}
-pub async fn rpc_try_call_many(sys: Arc<System>,
- to: &[UUID],
- msg: &Message,
- stop_after: usize,
- timeout: Duration)
- -> Result<Vec<Message>, Error>
-{
- let mut resp_stream = to.iter()
+pub async fn rpc_try_call_many(
+ sys: Arc<System>,
+ to: &[UUID],
+ msg: &Message,
+ stop_after: usize,
+ timeout: Duration,
+) -> Result<Vec<Message>, Error> {
+ let mut resp_stream = to
+ .iter()
.map(|to| rpc_call(sys.clone(), to, msg, timeout))
.collect::<FuturesUnordered<_>>();
@@ -49,7 +52,7 @@ pub async fn rpc_try_call_many(sys: Arc<System>,
Ok(msg) => {
results.push(msg);
if results.len() >= stop_after {
- break
+ break;
}
}
Err(e) => {
@@ -69,12 +72,12 @@ pub async fn rpc_try_call_many(sys: Arc<System>,
}
}
-pub async fn rpc_call(sys: Arc<System>,
- to: &UUID,
- msg: &Message,
- timeout: Duration)
- -> Result<Message, Error>
-{
+pub async fn rpc_call(
+ sys: Arc<System>,
+ to: &UUID,
+ msg: &Message,
+ timeout: Duration,
+) -> Result<Message, Error> {
let addr = {
let members = sys.members.read().await;
match members.status.get(to) {
@@ -91,24 +94,24 @@ pub struct RpcClient {
impl RpcClient {
pub fn new() -> Self {
- RpcClient{
+ RpcClient {
client: Client::new(),
}
}
- pub async fn call(&self,
- to_addr: &SocketAddr,
- msg: &Message,
- timeout: Duration)
- -> Result<Message, Error>
- {
+ pub async fn call(
+ &self,
+ to_addr: &SocketAddr,
+ msg: &Message,
+ timeout: Duration,
+ ) -> Result<Message, Error> {
let uri = format!("http://{}/rpc", to_addr);
let req = Request::builder()
.method(Method::POST)
.uri(uri)
.body(Body::from(rmp_to_vec_all_named(msg)?))?;
- let resp_fut = self.client.request(req);
+ let resp_fut = self.client.request(req).fuse();
let resp = tokio::time::timeout(timeout, resp_fut).await??;
if resp.status() == StatusCode::OK {
@@ -116,7 +119,7 @@ impl RpcClient {
let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?;
match msg {
Message::Error(e) => Err(Error::RPCError(e)),
- x => Ok(x)
+ x => Ok(x),
}
} else {
Err(Error::RPCError(format!("Status code {}", resp.status())))