diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/background.rs | 9 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/membership.rs | 6 | ||||
-rw-r--r-- | src/rpc_client.rs | 54 | ||||
-rw-r--r-- | src/rpc_server.rs | 7 | ||||
-rw-r--r-- | src/server.rs | 6 | ||||
-rw-r--r-- | src/table.rs | 10 | ||||
-rw-r--r-- | src/table_sync.rs | 3 |
8 files changed, 66 insertions, 31 deletions
diff --git a/src/background.rs b/src/background.rs index 1f04e49b..f0dbdcb5 100644 --- a/src/background.rs +++ b/src/background.rs @@ -3,9 +3,8 @@ use std::pin::Pin; use futures::future::join_all; use std::sync::Arc; -use std::time::Duration; use tokio::sync::Mutex; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use crate::error::Error; @@ -18,6 +17,7 @@ pub struct BackgroundRunner { queue_in: mpsc::UnboundedSender<(Job, bool)>, queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>, + job_notify: Notify, workers: Mutex<Vec<tokio::task::JoinHandle<()>>>, } @@ -30,6 +30,7 @@ impl BackgroundRunner { stop_signal, queue_in, queue_out: Mutex::new(queue_out), + job_notify: Notify::new(), workers: Mutex::new(Vec::new()), }) } @@ -58,6 +59,7 @@ impl BackgroundRunner { { let boxed: Job = Box::pin(job); let _: Result<_, _> = self.queue_in.clone().send((boxed, false)); + self.job_notify.notify(); } pub fn spawn_cancellable<T>(&self, job: T) @@ -66,6 +68,7 @@ impl BackgroundRunner { { let boxed: Job = Box::pin(job); let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); + self.job_notify.notify(); } pub async fn spawn_worker<F, T>(&self, name: String, worker: F) @@ -97,7 +100,7 @@ impl BackgroundRunner { info!("Background runner {} exiting", i); return; } - tokio::time::delay_for(Duration::from_secs(1)).await; + self.job_notify.notified().await; } } } diff --git a/src/main.rs b/src/main.rs index e0ae7db7..8985f181 100644 --- a/src/main.rs +++ b/src/main.rs @@ -238,7 +238,7 @@ async fn main() { }; let rpc_http_cli = - Arc::new(RpcHttpClient::new(&tls_config).expect("Could not create RPC client")); + Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client")); let membership_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string()); let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string()); diff --git a/src/membership.rs b/src/membership.rs index 78c1dbe3..f9ffa3b4 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -296,8 +296,10 @@ impl System { ring.rebuild_ring(); let (update_ring, ring) = watch::channel(Arc::new(ring)); - let rpc_http_client = - Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client")); + let rpc_http_client = Arc::new( + RpcHttpClient::new(config.max_concurrent_requests, &config.rpc_tls) + .expect("Could not create RPC client"), + ); let rpc_path = MEMBERSHIP_RPC_PATH.to_string(); let rpc_client = RpcClient::new( diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 4a065264..a5c44a2e 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -10,7 +10,7 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use hyper::client::{Client, HttpConnector}; use hyper::{Body, Method, Request}; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use crate::background::BackgroundRunner; use crate::data::*; @@ -117,7 +117,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { if results.len() >= stop_after { // Continue requests in background // TODO: make this optionnal (only usefull for write requests) - self.clone().background.spawn(async move { + self.clone().background.spawn_cancellable(async move { resp_stream.collect::<Vec<_>>().await; Ok(()) }); @@ -164,14 +164,22 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } -pub enum RpcHttpClient { +pub struct RpcHttpClient { + request_limiter: Semaphore, + method: ClientMethod, +} + +enum ClientMethod { HTTP(Client<HttpConnector, hyper::Body>), HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>), } impl RpcHttpClient { - pub fn new(tls_config: &Option<TlsConfig>) -> Result<Self, Error> { - if let Some(cf) = tls_config { + pub fn new( + max_concurrent_requests: usize, + tls_config: &Option<TlsConfig>, + ) -> Result<Self, Error> { + let method = if let Some(cf) = tls_config { let ca_certs = tls_util::load_certs(&cf.ca_cert)?; let node_certs = tls_util::load_certs(&cf.node_cert)?; let node_key = tls_util::load_private_key(&cf.node_key)?; @@ -187,10 +195,14 @@ impl RpcHttpClient { let connector = tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage"); - Ok(RpcHttpClient::HTTPS(Client::builder().build(connector))) + ClientMethod::HTTPS(Client::builder().build(connector)) } else { - Ok(RpcHttpClient::HTTP(Client::new())) - } + ClientMethod::HTTP(Client::new()) + }; + Ok(RpcHttpClient { + method, + request_limiter: Semaphore::new(max_concurrent_requests), + }) } async fn call<M, MB>( @@ -204,9 +216,9 @@ impl RpcHttpClient { MB: Borrow<M>, M: RpcMessage, { - let uri = match self { - RpcHttpClient::HTTP(_) => format!("http://{}/{}", to_addr, path), - RpcHttpClient::HTTPS(_) => format!("https://{}/{}", to_addr, path), + let uri = match self.method { + ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path), + ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path), }; let req = Request::builder() @@ -214,12 +226,22 @@ impl RpcHttpClient { .uri(uri) .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; - let resp_fut = match self { - RpcHttpClient::HTTP(client) => client.request(req).fuse(), - RpcHttpClient::HTTPS(client) => client.request(req).fuse(), + let resp_fut = match &self.method { + ClientMethod::HTTP(client) => client.request(req).fuse(), + ClientMethod::HTTPS(client) => client.request(req).fuse(), }; + + let slot = self.request_limiter.acquire().await; let resp = tokio::time::timeout(timeout, resp_fut) - .await? + .await + .map_err(|e| { + debug!( + "RPC timeout to {}: {}", + to_addr, + debug_serialize(msg.borrow()) + ); + e + })? .map_err(|e| { warn!( "RPC HTTP client error when connecting to {}: {}", @@ -227,6 +249,7 @@ impl RpcHttpClient { ); e })?; + drop(slot); let status = resp.status(); let body = hyper::body::to_bytes(resp.into_body()).await?; @@ -240,3 +263,4 @@ impl RpcHttpClient { } } } + diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 938eb512..51661a66 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -52,9 +52,10 @@ where Ok(resp) => { let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; trace!( - "]RPC:{},ok ({} ms)", + "]RPC:{},ok ({} ms), request: {}", name, - (Instant::now() - begin_time).as_millis() + (Instant::now() - begin_time).as_millis(), + req_str, ); Ok(Response::new(Body::from(resp_bytes))) } @@ -68,7 +69,7 @@ where name, e, (Instant::now() - begin_time).as_millis(), - req_str + req_str, ); Ok(err_response) } diff --git a/src/server.rs b/src/server.rs index 542c8675..7b6f2240 100644 --- a/src/server.rs +++ b/src/server.rs @@ -35,6 +35,9 @@ pub struct Config { pub bootstrap_peers: Vec<SocketAddr>, + #[serde(default = "default_max_concurrent_requests")] + pub max_concurrent_requests: usize, + #[serde(default = "default_block_size")] pub block_size: usize, @@ -50,6 +53,9 @@ pub struct Config { pub rpc_tls: Option<TlsConfig>, } +fn default_max_concurrent_requests() -> usize { + 12 +} fn default_block_size() -> usize { 1048576 } diff --git a/src/table.rs b/src/table.rs index 3a21dfc7..bc375a96 100644 --- a/src/table.rs +++ b/src/table.rs @@ -269,7 +269,7 @@ where let ent2 = ret_entry.clone(); self.system .background - .spawn(async move { self2.repair_on_read(&who[..], ent2).await }); + .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } } Ok(ret) @@ -324,7 +324,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.system.background.spawn(async move { + self.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -472,7 +472,7 @@ where self.instance.updated(old_entry, Some(new_entry)).await?; self.system .background - .spawn(syncer.clone().invalidate(tree_key)); + .spawn_cancellable(syncer.clone().invalidate(tree_key)); } } @@ -480,7 +480,7 @@ where let self2 = self.clone(); self.system .background - .spawn(async move { self2.insert_many(&epidemic_propagate[..]).await }); + .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await }); } Ok(()) @@ -500,7 +500,7 @@ where self.instance.updated(Some(old_entry), None).await?; self.system .background - .spawn(syncer.clone().invalidate(key.to_vec())); + .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); count += 1; } } diff --git a/src/table_sync.rs b/src/table_sync.rs index 6442841d..603c7aa6 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -132,10 +132,9 @@ where .await; let s3 = syncer.clone(); - table.system.background.spawn(async move { + tokio::spawn(async move { tokio::time::delay_for(Duration::from_secs(20)).await; s3.add_full_scan().await; - Ok(()) }); syncer |