From e8214cb1807d3145907c7ed9e077fa45ada4aeea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 22 Apr 2020 16:51:52 +0000 Subject: Better concurrency: Use Notify instead of stupid sleep in background worker Use Semaphore to limit concurrent requests in rpc_client Make more background tasks cancellable --- src/rpc_client.rs | 54 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 15 deletions(-) (limited to 'src/rpc_client.rs') diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 4a065264..a5c44a2e 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -10,7 +10,7 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use hyper::client::{Client, HttpConnector}; use hyper::{Body, Method, Request}; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use crate::background::BackgroundRunner; use crate::data::*; @@ -117,7 +117,7 @@ impl RpcClient { if results.len() >= stop_after { // Continue requests in background // TODO: make this optionnal (only usefull for write requests) - self.clone().background.spawn(async move { + self.clone().background.spawn_cancellable(async move { resp_stream.collect::>().await; Ok(()) }); @@ -164,14 +164,22 @@ impl RpcAddrClient { } } -pub enum RpcHttpClient { +pub struct RpcHttpClient { + request_limiter: Semaphore, + method: ClientMethod, +} + +enum ClientMethod { HTTP(Client), HTTPS(Client, hyper::Body>), } impl RpcHttpClient { - pub fn new(tls_config: &Option) -> Result { - if let Some(cf) = tls_config { + pub fn new( + max_concurrent_requests: usize, + tls_config: &Option, + ) -> Result { + let method = if let Some(cf) = tls_config { let ca_certs = tls_util::load_certs(&cf.ca_cert)?; let node_certs = tls_util::load_certs(&cf.node_cert)?; let node_key = tls_util::load_private_key(&cf.node_key)?; @@ -187,10 +195,14 @@ impl RpcHttpClient { let connector = tls_util::HttpsConnectorFixedDnsname::::new(config, "garage"); - Ok(RpcHttpClient::HTTPS(Client::builder().build(connector))) + ClientMethod::HTTPS(Client::builder().build(connector)) } else { - Ok(RpcHttpClient::HTTP(Client::new())) - } + ClientMethod::HTTP(Client::new()) + }; + Ok(RpcHttpClient { + method, + request_limiter: Semaphore::new(max_concurrent_requests), + }) } async fn call( @@ -204,9 +216,9 @@ impl RpcHttpClient { MB: Borrow, M: RpcMessage, { - let uri = match self { - RpcHttpClient::HTTP(_) => format!("http://{}/{}", to_addr, path), - RpcHttpClient::HTTPS(_) => format!("https://{}/{}", to_addr, path), + let uri = match self.method { + ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path), + ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path), }; let req = Request::builder() @@ -214,12 +226,22 @@ impl RpcHttpClient { .uri(uri) .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; - let resp_fut = match self { - RpcHttpClient::HTTP(client) => client.request(req).fuse(), - RpcHttpClient::HTTPS(client) => client.request(req).fuse(), + let resp_fut = match &self.method { + ClientMethod::HTTP(client) => client.request(req).fuse(), + ClientMethod::HTTPS(client) => client.request(req).fuse(), }; + + let slot = self.request_limiter.acquire().await; let resp = tokio::time::timeout(timeout, resp_fut) - .await? + .await + .map_err(|e| { + debug!( + "RPC timeout to {}: {}", + to_addr, + debug_serialize(msg.borrow()) + ); + e + })? .map_err(|e| { warn!( "RPC HTTP client error when connecting to {}: {}", @@ -227,6 +249,7 @@ impl RpcHttpClient { ); e })?; + drop(slot); let status = resp.status(); let body = hyper::body::to_bytes(resp.into_body()).await?; @@ -240,3 +263,4 @@ impl RpcHttpClient { } } } + -- cgit v1.2.3