diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-22 16:51:52 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-22 16:51:52 +0000 |
commit | e8214cb1807d3145907c7ed9e077fa45ada4aeea (patch) | |
tree | 00192416f1c2d2157988a1c07df4601475a30a73 /src/rpc_client.rs | |
parent | c0335ac6904598b9ac367e17651da477d4d970d7 (diff) | |
download | garage-e8214cb1807d3145907c7ed9e077fa45ada4aeea.tar.gz garage-e8214cb1807d3145907c7ed9e077fa45ada4aeea.zip |
Better concurrency:
Use Notify instead of stupid sleep in background worker
Use Semaphore to limit concurrent requests in rpc_client
Make more background tasks cancellable
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r-- | src/rpc_client.rs | 54 |
1 files changed, 39 insertions, 15 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 4a065264..a5c44a2e 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -10,7 +10,7 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use hyper::client::{Client, HttpConnector}; use hyper::{Body, Method, Request}; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use crate::background::BackgroundRunner; use crate::data::*; @@ -117,7 +117,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { if results.len() >= stop_after { // Continue requests in background // TODO: make this optionnal (only usefull for write requests) - self.clone().background.spawn(async move { + self.clone().background.spawn_cancellable(async move { resp_stream.collect::<Vec<_>>().await; Ok(()) }); @@ -164,14 +164,22 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } -pub enum RpcHttpClient { +pub struct RpcHttpClient { + request_limiter: Semaphore, + method: ClientMethod, +} + +enum ClientMethod { HTTP(Client<HttpConnector, hyper::Body>), HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>), } impl RpcHttpClient { - pub fn new(tls_config: &Option<TlsConfig>) -> Result<Self, Error> { - if let Some(cf) = tls_config { + pub fn new( + max_concurrent_requests: usize, + tls_config: &Option<TlsConfig>, + ) -> Result<Self, Error> { + let method = if let Some(cf) = tls_config { let ca_certs = tls_util::load_certs(&cf.ca_cert)?; let node_certs = tls_util::load_certs(&cf.node_cert)?; let node_key = tls_util::load_private_key(&cf.node_key)?; @@ -187,10 +195,14 @@ impl RpcHttpClient { let connector = tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage"); - Ok(RpcHttpClient::HTTPS(Client::builder().build(connector))) + ClientMethod::HTTPS(Client::builder().build(connector)) } else { - Ok(RpcHttpClient::HTTP(Client::new())) - } + ClientMethod::HTTP(Client::new()) + }; + Ok(RpcHttpClient { + method, + request_limiter: Semaphore::new(max_concurrent_requests), + }) } async fn call<M, MB>( @@ -204,9 +216,9 @@ impl RpcHttpClient { MB: Borrow<M>, M: RpcMessage, { - let uri = match self { - RpcHttpClient::HTTP(_) => format!("http://{}/{}", to_addr, path), - RpcHttpClient::HTTPS(_) => format!("https://{}/{}", to_addr, path), + let uri = match self.method { + ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path), + ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path), }; let req = Request::builder() @@ -214,12 +226,22 @@ impl RpcHttpClient { .uri(uri) .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; - let resp_fut = match self { - RpcHttpClient::HTTP(client) => client.request(req).fuse(), - RpcHttpClient::HTTPS(client) => client.request(req).fuse(), + let resp_fut = match &self.method { + ClientMethod::HTTP(client) => client.request(req).fuse(), + ClientMethod::HTTPS(client) => client.request(req).fuse(), }; + + let slot = self.request_limiter.acquire().await; let resp = tokio::time::timeout(timeout, resp_fut) - .await? + .await + .map_err(|e| { + debug!( + "RPC timeout to {}: {}", + to_addr, + debug_serialize(msg.borrow()) + ); + e + })? .map_err(|e| { warn!( "RPC HTTP client error when connecting to {}: {}", @@ -227,6 +249,7 @@ impl RpcHttpClient { ); e })?; + drop(slot); let status = resp.status(); let body = hyper::body::to_bytes(resp.into_body()).await?; @@ -240,3 +263,4 @@ impl RpcHttpClient { } } } + |