aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-11-03 17:00:40 +0100
committerAlex Auvolat <alex@adnab.me>2021-11-03 18:02:57 +0100
commit6f13d083ab188060d2a2dc5f619070a445fe61ba (patch)
treee83a9b53b61d90086c519a8ae9d8f0fffa750e0f /src/rpc
parent8c4f418fe889106758a086b34688f7c3b378ca64 (diff)
downloadgarage-request-buffer-semaphore.tar.gz
garage-request-buffer-semaphore.zip
Add semaphore to limit RAM used by buffered outgoing requestsrequest-buffer-semaphore
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/rpc_helper.rs33
-rw-r--r--src/rpc/system.rs5
2 files changed, 31 insertions, 7 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 8c7cc681..cdac6f14 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,6 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
+use tokio::sync::Semaphore;
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
@@ -14,11 +15,16 @@ pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+// Try to never have more than 200MB of outgoing requests
+// buffered at the same time. Other requests are queued until
+// space is freed.
+const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024;
+
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
@@ -64,9 +70,21 @@ impl RequestStrategy {
pub struct RpcHelper {
pub(crate) fullmesh: Arc<FullMeshPeeringStrategy>,
pub(crate) background: Arc<BackgroundRunner>,
+ request_buffer_semaphore: Arc<Semaphore>,
}
impl RpcHelper {
+ pub(crate) fn new(
+ fullmesh: Arc<FullMeshPeeringStrategy>,
+ background: Arc<BackgroundRunner>,
+ ) -> Self {
+ Self {
+ fullmesh,
+ background,
+ request_buffer_semaphore: Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)),
+ }
+ }
+
pub async fn call<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -92,10 +110,19 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
+ let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
+ let permit = self.request_buffer_semaphore.acquire_many(msg_size).await?;
+
let node_id = to.into();
select! {
- res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??),
- _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout),
+ res = endpoint.call(&node_id, &msg, strat.rs_priority) => {
+ drop(permit);
+ Ok(res??)
+ }
+ _ = tokio::time::sleep(strat.rs_timeout) => {
+ drop(permit);
+ Err(Error::Timeout)
+ }
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 8f5a1ec5..a518ef21 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -235,10 +235,7 @@ impl System {
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
- rpc: RpcHelper {
- fullmesh,
- background: background.clone(),
- },
+ rpc: RpcHelper::new(fullmesh, background.clone()),
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,