aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 18:20:27 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:25:02 +0200
commit605a630333c8ee60c55fe011a375c01277bba173 (patch)
tree1ae21efe1070806ddc9af7b85cda718e64e105c8 /src/rpc/rpc_helper.rs
parenta35d4da721db3550a2833d8576d4283bc999e8df (diff)
downloadgarage-605a630333c8ee60c55fe011a375c01277bba173.tar.gz
garage-605a630333c8ee60c55fe011a375c01277bba173.zip
Use streaming in block manager
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs24
1 files changed, 14 insertions, 10 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 079cdc70..6e098446 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,10 +15,13 @@ use opentelemetry::{
Context,
};
-pub use netapp::endpoint::{Endpoint, EndpointHandler};
-pub use netapp::message::{Message as Rpc, *};
+pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
+use netapp::message::IntoReq;
+pub use netapp::message::{
+ Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
+};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-pub use netapp::{NetApp, NodeID};
+pub use netapp::{self, NetApp, NodeID};
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -117,7 +120,7 @@ impl RpcHelper {
where
M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M> + Send,
- H: EndpointHandler<M>,
+ H: StreamingEndpointHandler<M>,
{
let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
@@ -172,7 +175,7 @@ impl RpcHelper {
where
M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M>,
- H: EndpointHandler<M>,
+ H: StreamingEndpointHandler<M>,
{
let msg = msg.into_req().map_err(netapp::error::Error::from)?;
@@ -197,7 +200,7 @@ impl RpcHelper {
where
M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M>,
- H: EndpointHandler<M>,
+ H: StreamingEndpointHandler<M>,
{
let to = self
.0
@@ -211,16 +214,17 @@ impl RpcHelper {
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
- pub async fn try_call_many<M, H, S>(
+ pub async fn try_call_many<M, N, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
- msg: M,
+ msg: N,
strategy: RequestStrategy,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
- H: EndpointHandler<M> + 'static,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
{
let quorum = strategy.rs_quorum.unwrap_or(to.len());
@@ -261,7 +265,7 @@ impl RpcHelper {
where
M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>,
- H: EndpointHandler<M> + 'static,
+ H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
{
let msg = msg.into_req().map_err(netapp::error::Error::from)?;