diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 18:20:27 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-29 12:25:02 +0200 |
commit | 605a630333c8ee60c55fe011a375c01277bba173 (patch) | |
tree | 1ae21efe1070806ddc9af7b85cda718e64e105c8 /src/rpc/rpc_helper.rs | |
parent | a35d4da721db3550a2833d8576d4283bc999e8df (diff) | |
download | garage-605a630333c8ee60c55fe011a375c01277bba173.tar.gz garage-605a630333c8ee60c55fe011a375c01277bba173.zip |
Use streaming in block manager
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 079cdc70..6e098446 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,13 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler}; -pub use netapp::message::{Message as Rpc, *}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; +use netapp::message::IntoReq; +pub use netapp::message::{ + Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, +}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::{NetApp, NodeID}; +pub use netapp::{self, NetApp, NodeID}; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -117,7 +120,7 @@ impl RpcHelper { where M: Rpc<Response = Result<S, Error>>, N: IntoReq<M> + Send, - H: EndpointHandler<M>, + H: StreamingEndpointHandler<M>, { let metric_tags = [ KeyValue::new("rpc_endpoint", endpoint.path().to_string()), @@ -172,7 +175,7 @@ impl RpcHelper { where M: Rpc<Response = Result<S, Error>>, N: IntoReq<M>, - H: EndpointHandler<M>, + H: StreamingEndpointHandler<M>, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; @@ -197,7 +200,7 @@ impl RpcHelper { where M: Rpc<Response = Result<S, Error>>, N: IntoReq<M>, - H: EndpointHandler<M>, + H: StreamingEndpointHandler<M>, { let to = self .0 @@ -211,16 +214,17 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many<M, H, S>( + pub async fn try_call_many<M, N, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, - H: EndpointHandler<M> + 'static, + N: IntoReq<M>, + H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -261,7 +265,7 @@ impl RpcHelper { where M: Rpc<Response = Result<S, Error>> + 'static, N: IntoReq<M>, - H: EndpointHandler<M> + 'static, + H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; |