aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r--src/rpc_client.rs54
1 files changed, 39 insertions, 15 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 4a065264..a5c44a2e 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -10,7 +10,7 @@ use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use hyper::client::{Client, HttpConnector};
use hyper::{Body, Method, Request};
-use tokio::sync::watch;
+use tokio::sync::{watch, Semaphore};
use crate::background::BackgroundRunner;
use crate::data::*;
@@ -117,7 +117,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if results.len() >= stop_after {
// Continue requests in background
// TODO: make this optionnal (only usefull for write requests)
- self.clone().background.spawn(async move {
+ self.clone().background.spawn_cancellable(async move {
resp_stream.collect::<Vec<_>>().await;
Ok(())
});
@@ -164,14 +164,22 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
-pub enum RpcHttpClient {
+pub struct RpcHttpClient {
+ request_limiter: Semaphore,
+ method: ClientMethod,
+}
+
+enum ClientMethod {
HTTP(Client<HttpConnector, hyper::Body>),
HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
}
impl RpcHttpClient {
- pub fn new(tls_config: &Option<TlsConfig>) -> Result<Self, Error> {
- if let Some(cf) = tls_config {
+ pub fn new(
+ max_concurrent_requests: usize,
+ tls_config: &Option<TlsConfig>,
+ ) -> Result<Self, Error> {
+ let method = if let Some(cf) = tls_config {
let ca_certs = tls_util::load_certs(&cf.ca_cert)?;
let node_certs = tls_util::load_certs(&cf.node_cert)?;
let node_key = tls_util::load_private_key(&cf.node_key)?;
@@ -187,10 +195,14 @@ impl RpcHttpClient {
let connector =
tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage");
- Ok(RpcHttpClient::HTTPS(Client::builder().build(connector)))
+ ClientMethod::HTTPS(Client::builder().build(connector))
} else {
- Ok(RpcHttpClient::HTTP(Client::new()))
- }
+ ClientMethod::HTTP(Client::new())
+ };
+ Ok(RpcHttpClient {
+ method,
+ request_limiter: Semaphore::new(max_concurrent_requests),
+ })
}
async fn call<M, MB>(
@@ -204,9 +216,9 @@ impl RpcHttpClient {
MB: Borrow<M>,
M: RpcMessage,
{
- let uri = match self {
- RpcHttpClient::HTTP(_) => format!("http://{}/{}", to_addr, path),
- RpcHttpClient::HTTPS(_) => format!("https://{}/{}", to_addr, path),
+ let uri = match self.method {
+ ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path),
+ ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path),
};
let req = Request::builder()
@@ -214,12 +226,22 @@ impl RpcHttpClient {
.uri(uri)
.body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?;
- let resp_fut = match self {
- RpcHttpClient::HTTP(client) => client.request(req).fuse(),
- RpcHttpClient::HTTPS(client) => client.request(req).fuse(),
+ let resp_fut = match &self.method {
+ ClientMethod::HTTP(client) => client.request(req).fuse(),
+ ClientMethod::HTTPS(client) => client.request(req).fuse(),
};
+
+ let slot = self.request_limiter.acquire().await;
let resp = tokio::time::timeout(timeout, resp_fut)
- .await?
+ .await
+ .map_err(|e| {
+ debug!(
+ "RPC timeout to {}: {}",
+ to_addr,
+ debug_serialize(msg.borrow())
+ );
+ e
+ })?
.map_err(|e| {
warn!(
"RPC HTTP client error when connecting to {}: {}",
@@ -227,6 +249,7 @@ impl RpcHttpClient {
);
e
})?;
+ drop(slot);
let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?;
@@ -240,3 +263,4 @@ impl RpcHttpClient {
}
}
}
+