diff options
author | Alex Auvolat <alex@adnab.me> | 2021-11-03 17:00:40 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-11-03 18:02:57 +0100 |
commit | 6f13d083ab188060d2a2dc5f619070a445fe61ba (patch) | |
tree | e83a9b53b61d90086c519a8ae9d8f0fffa750e0f /src/rpc | |
parent | 8c4f418fe889106758a086b34688f7c3b378ca64 (diff) | |
download | garage-6f13d083ab188060d2a2dc5f619070a445fe61ba.tar.gz garage-6f13d083ab188060d2a2dc5f619070a445fe61ba.zip |
Add semaphore to limit RAM used by buffered outgoing requestsrequest-buffer-semaphore
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/rpc_helper.rs | 33 | ||||
-rw-r--r-- | src/rpc/system.rs | 5 |
2 files changed, 31 insertions, 7 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 8c7cc681..cdac6f14 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -7,6 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; +use tokio::sync::Semaphore; pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; @@ -14,11 +15,16 @@ pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; -use garage_util::data::Uuid; +use garage_util::data::*; use garage_util::error::Error; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +// Try to never have more than 200MB of outgoing requests +// buffered at the same time. Other requests are queued until +// space is freed. +const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024; + /// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { @@ -64,9 +70,21 @@ impl RequestStrategy { pub struct RpcHelper { pub(crate) fullmesh: Arc<FullMeshPeeringStrategy>, pub(crate) background: Arc<BackgroundRunner>, + request_buffer_semaphore: Arc<Semaphore>, } impl RpcHelper { + pub(crate) fn new( + fullmesh: Arc<FullMeshPeeringStrategy>, + background: Arc<BackgroundRunner>, + ) -> Self { + Self { + fullmesh, + background, + request_buffer_semaphore: Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)), + } + } + pub async fn call<M, H, S>( &self, endpoint: &Endpoint<M, H>, @@ -92,10 +110,19 @@ impl RpcHelper { M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { + let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; + let permit = self.request_buffer_semaphore.acquire_many(msg_size).await?; + let node_id = to.into(); select! { - res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??), - _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout), + res = endpoint.call(&node_id, &msg, strat.rs_priority) => { + drop(permit); + Ok(res??) + } + _ = tokio::time::sleep(strat.rs_timeout) => { + drop(permit); + Err(Error::Timeout) + } } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 8f5a1ec5..a518ef21 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -235,10 +235,7 @@ impl System { node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper { - fullmesh, - background: background.clone(), - }, + rpc: RpcHelper::new(fullmesh, background.clone()), system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, |