aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs33
1 files changed, 30 insertions, 3 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 8c7cc681..cdac6f14 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,6 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
+use tokio::sync::Semaphore;
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
@@ -14,11 +15,16 @@ pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+// Try to never have more than 200MB of outgoing requests
+// buffered at the same time. Other requests are queued until
+// space is freed.
+const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024;
+
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
@@ -64,9 +70,21 @@ impl RequestStrategy {
pub struct RpcHelper {
pub(crate) fullmesh: Arc<FullMeshPeeringStrategy>,
pub(crate) background: Arc<BackgroundRunner>,
+ request_buffer_semaphore: Arc<Semaphore>,
}
impl RpcHelper {
+ pub(crate) fn new(
+ fullmesh: Arc<FullMeshPeeringStrategy>,
+ background: Arc<BackgroundRunner>,
+ ) -> Self {
+ Self {
+ fullmesh,
+ background,
+ request_buffer_semaphore: Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)),
+ }
+ }
+
pub async fn call<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -92,10 +110,19 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
+ let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
+ let permit = self.request_buffer_semaphore.acquire_many(msg_size).await?;
+
let node_id = to.into();
select! {
- res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??),
- _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout),
+ res = endpoint.call(&node_id, &msg, strat.rs_priority) => {
+ drop(permit);
+ Ok(res??)
+ }
+ _ = tokio::time::sleep(strat.rs_timeout) => {
+ drop(permit);
+ Err(Error::Timeout)
+ }
}
}