diff options
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r-- | src/rpc_client.rs | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs index a1c5dde0..81d20966 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::borrow::Borrow; use bytes::IntoBuf; use futures::stream::futures_unordered::FuturesUnordered; @@ -19,12 +20,13 @@ use crate::tls_util; pub async fn rpc_call_many( sys: Arc<System>, to: &[UUID], - msg: &Message, + msg: Message, timeout: Duration, ) -> Vec<Result<Message, Error>> { + let msg = Arc::new(msg); let mut resp_stream = to .iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + .map(|to| rpc_call(sys.clone(), to, msg.clone(), timeout)) .collect::<FuturesUnordered<_>>(); let mut results = vec![]; @@ -37,13 +39,15 @@ pub async fn rpc_call_many( pub async fn rpc_try_call_many( sys: Arc<System>, to: &[UUID], - msg: &Message, + msg: Message, stop_after: usize, timeout: Duration, ) -> Result<Vec<Message>, Error> { - let mut resp_stream = to - .iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + let sys2 = sys.clone(); + let msg = Arc::new(msg); + let mut resp_stream = to.to_vec() + .into_iter() + .map(move |to| rpc_call(sys2.clone(), to.clone(), msg.clone(), timeout)) .collect::<FuturesUnordered<_>>(); let mut results = vec![]; @@ -64,6 +68,13 @@ pub async fn rpc_try_call_many( } if results.len() >= stop_after { + // Continue requests in background + // TODO: make this optionnal (only usefull for write requests) + sys.background.spawn(async move { + resp_stream.collect::<Vec<_>>().await; + Ok(()) + }); + Ok(results) } else { let mut msg = "Too many failures:".to_string(); @@ -74,17 +85,17 @@ pub async fn rpc_try_call_many( } } -pub async fn rpc_call( +pub async fn rpc_call<M: Borrow<Message>, N: Borrow<UUID>>( sys: Arc<System>, - to: &UUID, - msg: &Message, + to: N, + msg: M, timeout: Duration, ) -> Result<Message, Error> { let addr = { let status = sys.status.borrow().clone(); - match status.nodes.get(to) { + match status.nodes.get(to.borrow()) { Some(status) => status.addr.clone(), - None => return Err(Error::Message(format!("Peer ID not found: {:?}", to))), + None => return Err(Error::Message(format!("Peer ID not found: {:?}", to.borrow()))), } }; sys.rpc_client.call(&addr, msg, timeout).await @@ -119,10 +130,10 @@ impl RpcClient { } } - pub async fn call( + pub async fn call<M: Borrow<Message>>( &self, to_addr: &SocketAddr, - msg: &Message, + msg: M, timeout: Duration, ) -> Result<Message, Error> { let uri = match self { @@ -133,7 +144,7 @@ impl RpcClient { let req = Request::builder() .method(Method::POST) .uri(uri) - .body(Body::from(rmp_to_vec_all_named(msg)?))?; + .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; let resp_fut = match self { RpcClient::HTTP(client) => client.request(req).fuse(), |