diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-10 22:01:48 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-10 22:01:48 +0200 |
commit | 3477864142ed09c36abea1111937b829fb41c8a4 (patch) | |
tree | d95221e66b9c014af7f4dba61ae4ff113c0e409a /src/rpc_client.rs | |
parent | d66c0d6833ddbeb61e34ee222dde92a5363bda1f (diff) | |
download | garage-3477864142ed09c36abea1111937b829fb41c8a4.tar.gz garage-3477864142ed09c36abea1111937b829fb41c8a4.zip |
Fix the Sync issue. Details:
So the HTTP client future of Hyper is not Sync, thus the stream
that read blocks wasn't either. However Hyper's default Body type
requires a stream to be Sync for wrap_stream. Solution: reimplement
a custom HTTP body type.
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r-- | src/rpc_client.rs | 71 |
1 files changed, 37 insertions, 34 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 134f8e98..7587782e 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -3,23 +3,25 @@ use std::sync::Arc; use std::time::Duration; use bytes::IntoBuf; -use hyper::{Body, Method, Request, StatusCode}; -use hyper::client::Client; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; +use futures_util::future::FutureExt; +use hyper::client::Client; +use hyper::{Body, Method, Request, StatusCode}; use crate::data::*; use crate::error::Error; -use crate::proto::Message; use crate::membership::System; +use crate::proto::Message; -pub async fn rpc_call_many(sys: Arc<System>, - to: &[UUID], - msg: &Message, - timeout: Duration) - -> Vec<Result<Message, Error>> -{ - let mut resp_stream = to.iter() +pub async fn rpc_call_many( + sys: Arc<System>, + to: &[UUID], + msg: &Message, + timeout: Duration, +) -> Vec<Result<Message, Error>> { + let mut resp_stream = to + .iter() .map(|to| rpc_call(sys.clone(), to, msg, timeout)) .collect::<FuturesUnordered<_>>(); @@ -30,14 +32,15 @@ pub async fn rpc_call_many(sys: Arc<System>, results } -pub async fn rpc_try_call_many(sys: Arc<System>, - to: &[UUID], - msg: &Message, - stop_after: usize, - timeout: Duration) - -> Result<Vec<Message>, Error> -{ - let mut resp_stream = to.iter() +pub async fn rpc_try_call_many( + sys: Arc<System>, + to: &[UUID], + msg: &Message, + stop_after: usize, + timeout: Duration, +) -> Result<Vec<Message>, Error> { + let mut resp_stream = to + .iter() .map(|to| rpc_call(sys.clone(), to, msg, timeout)) .collect::<FuturesUnordered<_>>(); @@ -49,7 +52,7 @@ pub async fn rpc_try_call_many(sys: Arc<System>, Ok(msg) => { results.push(msg); if results.len() >= stop_after { - break + break; } } Err(e) => { @@ -69,12 +72,12 @@ pub async fn rpc_try_call_many(sys: Arc<System>, } } -pub async fn rpc_call(sys: Arc<System>, - to: &UUID, - msg: &Message, - timeout: Duration) - -> Result<Message, Error> -{ +pub async fn rpc_call( + sys: Arc<System>, + to: &UUID, + msg: &Message, + timeout: Duration, +) -> Result<Message, Error> { let addr = { let members = sys.members.read().await; match members.status.get(to) { @@ -91,24 +94,24 @@ pub struct RpcClient { impl RpcClient { pub fn new() -> Self { - RpcClient{ + RpcClient { client: Client::new(), } } - pub async fn call(&self, - to_addr: &SocketAddr, - msg: &Message, - timeout: Duration) - -> Result<Message, Error> - { + pub async fn call( + &self, + to_addr: &SocketAddr, + msg: &Message, + timeout: Duration, + ) -> Result<Message, Error> { let uri = format!("http://{}/rpc", to_addr); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(rmp_to_vec_all_named(msg)?))?; - let resp_fut = self.client.request(req); + let resp_fut = self.client.request(req).fuse(); let resp = tokio::time::timeout(timeout, resp_fut).await??; if resp.status() == StatusCode::OK { @@ -116,7 +119,7 @@ impl RpcClient { let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?; match msg { Message::Error(e) => Err(Error::RPCError(e)), - x => Ok(x) + x => Ok(x), } } else { Err(Error::RPCError(format!("Status code {}", resp.status()))) |