From 605a630333c8ee60c55fe011a375c01277bba173 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:20:27 +0200 Subject: Use streaming in block manager --- src/rpc/rpc_helper.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 079cdc70..6e098446 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,13 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler}; -pub use netapp::message::{Message as Rpc, *}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; +use netapp::message::IntoReq; +pub use netapp::message::{ + Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, +}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::{NetApp, NodeID}; +pub use netapp::{self, NetApp, NodeID}; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -117,7 +120,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq + Send, - H: EndpointHandler, + H: StreamingEndpointHandler, { let metric_tags = [ KeyValue::new("rpc_endpoint", endpoint.path().to_string()), @@ -172,7 +175,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq, - H: EndpointHandler, + H: StreamingEndpointHandler, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; @@ -197,7 +200,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq, - H: EndpointHandler, + H: StreamingEndpointHandler, { let to = self .0 @@ -211,16 +214,17 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many( + pub async fn try_call_many( &self, endpoint: &Arc>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, ) -> Result, Error> where M: Rpc> + 'static, - H: EndpointHandler + 'static, + N: IntoReq, + H: StreamingEndpointHandler + 'static, S: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -261,7 +265,7 @@ impl RpcHelper { where M: Rpc> + 'static, N: IntoReq, - H: EndpointHandler + 'static, + H: StreamingEndpointHandler + 'static, S: Send + 'static, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; -- cgit v1.2.3