aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/background.rs9
-rw-r--r--src/main.rs2
-rw-r--r--src/membership.rs6
-rw-r--r--src/rpc_client.rs54
-rw-r--r--src/rpc_server.rs7
-rw-r--r--src/server.rs6
-rw-r--r--src/table.rs10
-rw-r--r--src/table_sync.rs3
8 files changed, 66 insertions, 31 deletions
diff --git a/src/background.rs b/src/background.rs
index 1f04e49b..f0dbdcb5 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -3,9 +3,8 @@ use std::pin::Pin;
use futures::future::join_all;
use std::sync::Arc;
-use std::time::Duration;
use tokio::sync::Mutex;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::{mpsc, watch, Notify};
use crate::error::Error;
@@ -18,6 +17,7 @@ pub struct BackgroundRunner {
queue_in: mpsc::UnboundedSender<(Job, bool)>,
queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
+ job_notify: Notify,
workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
}
@@ -30,6 +30,7 @@ impl BackgroundRunner {
stop_signal,
queue_in,
queue_out: Mutex::new(queue_out),
+ job_notify: Notify::new(),
workers: Mutex::new(Vec::new()),
})
}
@@ -58,6 +59,7 @@ impl BackgroundRunner {
{
let boxed: Job = Box::pin(job);
let _: Result<_, _> = self.queue_in.clone().send((boxed, false));
+ self.job_notify.notify();
}
pub fn spawn_cancellable<T>(&self, job: T)
@@ -66,6 +68,7 @@ impl BackgroundRunner {
{
let boxed: Job = Box::pin(job);
let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
+ self.job_notify.notify();
}
pub async fn spawn_worker<F, T>(&self, name: String, worker: F)
@@ -97,7 +100,7 @@ impl BackgroundRunner {
info!("Background runner {} exiting", i);
return;
}
- tokio::time::delay_for(Duration::from_secs(1)).await;
+ self.job_notify.notified().await;
}
}
}
diff --git a/src/main.rs b/src/main.rs
index e0ae7db7..8985f181 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -238,7 +238,7 @@ async fn main() {
};
let rpc_http_cli =
- Arc::new(RpcHttpClient::new(&tls_config).expect("Could not create RPC client"));
+ Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client"));
let membership_rpc_cli =
RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string());
let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string());
diff --git a/src/membership.rs b/src/membership.rs
index 78c1dbe3..f9ffa3b4 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -296,8 +296,10 @@ impl System {
ring.rebuild_ring();
let (update_ring, ring) = watch::channel(Arc::new(ring));
- let rpc_http_client =
- Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client"));
+ let rpc_http_client = Arc::new(
+ RpcHttpClient::new(config.max_concurrent_requests, &config.rpc_tls)
+ .expect("Could not create RPC client"),
+ );
let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
let rpc_client = RpcClient::new(
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 4a065264..a5c44a2e 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -10,7 +10,7 @@ use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use hyper::client::{Client, HttpConnector};
use hyper::{Body, Method, Request};
-use tokio::sync::watch;
+use tokio::sync::{watch, Semaphore};
use crate::background::BackgroundRunner;
use crate::data::*;
@@ -117,7 +117,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if results.len() >= stop_after {
// Continue requests in background
// TODO: make this optionnal (only usefull for write requests)
- self.clone().background.spawn(async move {
+ self.clone().background.spawn_cancellable(async move {
resp_stream.collect::<Vec<_>>().await;
Ok(())
});
@@ -164,14 +164,22 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
-pub enum RpcHttpClient {
+pub struct RpcHttpClient {
+ request_limiter: Semaphore,
+ method: ClientMethod,
+}
+
+enum ClientMethod {
HTTP(Client<HttpConnector, hyper::Body>),
HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
}
impl RpcHttpClient {
- pub fn new(tls_config: &Option<TlsConfig>) -> Result<Self, Error> {
- if let Some(cf) = tls_config {
+ pub fn new(
+ max_concurrent_requests: usize,
+ tls_config: &Option<TlsConfig>,
+ ) -> Result<Self, Error> {
+ let method = if let Some(cf) = tls_config {
let ca_certs = tls_util::load_certs(&cf.ca_cert)?;
let node_certs = tls_util::load_certs(&cf.node_cert)?;
let node_key = tls_util::load_private_key(&cf.node_key)?;
@@ -187,10 +195,14 @@ impl RpcHttpClient {
let connector =
tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage");
- Ok(RpcHttpClient::HTTPS(Client::builder().build(connector)))
+ ClientMethod::HTTPS(Client::builder().build(connector))
} else {
- Ok(RpcHttpClient::HTTP(Client::new()))
- }
+ ClientMethod::HTTP(Client::new())
+ };
+ Ok(RpcHttpClient {
+ method,
+ request_limiter: Semaphore::new(max_concurrent_requests),
+ })
}
async fn call<M, MB>(
@@ -204,9 +216,9 @@ impl RpcHttpClient {
MB: Borrow<M>,
M: RpcMessage,
{
- let uri = match self {
- RpcHttpClient::HTTP(_) => format!("http://{}/{}", to_addr, path),
- RpcHttpClient::HTTPS(_) => format!("https://{}/{}", to_addr, path),
+ let uri = match self.method {
+ ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path),
+ ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path),
};
let req = Request::builder()
@@ -214,12 +226,22 @@ impl RpcHttpClient {
.uri(uri)
.body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?;
- let resp_fut = match self {
- RpcHttpClient::HTTP(client) => client.request(req).fuse(),
- RpcHttpClient::HTTPS(client) => client.request(req).fuse(),
+ let resp_fut = match &self.method {
+ ClientMethod::HTTP(client) => client.request(req).fuse(),
+ ClientMethod::HTTPS(client) => client.request(req).fuse(),
};
+
+ let slot = self.request_limiter.acquire().await;
let resp = tokio::time::timeout(timeout, resp_fut)
- .await?
+ .await
+ .map_err(|e| {
+ debug!(
+ "RPC timeout to {}: {}",
+ to_addr,
+ debug_serialize(msg.borrow())
+ );
+ e
+ })?
.map_err(|e| {
warn!(
"RPC HTTP client error when connecting to {}: {}",
@@ -227,6 +249,7 @@ impl RpcHttpClient {
);
e
})?;
+ drop(slot);
let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?;
@@ -240,3 +263,4 @@ impl RpcHttpClient {
}
}
}
+
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 938eb512..51661a66 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -52,9 +52,10 @@ where
Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
trace!(
- "]RPC:{},ok ({} ms)",
+ "]RPC:{},ok ({} ms), request: {}",
name,
- (Instant::now() - begin_time).as_millis()
+ (Instant::now() - begin_time).as_millis(),
+ req_str,
);
Ok(Response::new(Body::from(resp_bytes)))
}
@@ -68,7 +69,7 @@ where
name,
e,
(Instant::now() - begin_time).as_millis(),
- req_str
+ req_str,
);
Ok(err_response)
}
diff --git a/src/server.rs b/src/server.rs
index 542c8675..7b6f2240 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -35,6 +35,9 @@ pub struct Config {
pub bootstrap_peers: Vec<SocketAddr>,
+ #[serde(default = "default_max_concurrent_requests")]
+ pub max_concurrent_requests: usize,
+
#[serde(default = "default_block_size")]
pub block_size: usize,
@@ -50,6 +53,9 @@ pub struct Config {
pub rpc_tls: Option<TlsConfig>,
}
+fn default_max_concurrent_requests() -> usize {
+ 12
+}
fn default_block_size() -> usize {
1048576
}
diff --git a/src/table.rs b/src/table.rs
index 3a21dfc7..bc375a96 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -269,7 +269,7 @@ where
let ent2 = ret_entry.clone();
self.system
.background
- .spawn(async move { self2.repair_on_read(&who[..], ent2).await });
+ .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
Ok(ret)
@@ -324,7 +324,7 @@ where
}
if !to_repair.is_empty() {
let self2 = self.clone();
- self.system.background.spawn(async move {
+ self.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?;
}
@@ -472,7 +472,7 @@ where
self.instance.updated(old_entry, Some(new_entry)).await?;
self.system
.background
- .spawn(syncer.clone().invalidate(tree_key));
+ .spawn_cancellable(syncer.clone().invalidate(tree_key));
}
}
@@ -480,7 +480,7 @@ where
let self2 = self.clone();
self.system
.background
- .spawn(async move { self2.insert_many(&epidemic_propagate[..]).await });
+ .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await });
}
Ok(())
@@ -500,7 +500,7 @@ where
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
- .spawn(syncer.clone().invalidate(key.to_vec()));
+ .spawn_cancellable(syncer.clone().invalidate(key.to_vec()));
count += 1;
}
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 6442841d..603c7aa6 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -132,10 +132,9 @@ where
.await;
let s3 = syncer.clone();
- table.system.background.spawn(async move {
+ tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(20)).await;
s3.add_full_scan().await;
- Ok(())
});
syncer