diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index c9458ee6..9f735ab4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -8,13 +8,14 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; -use garage_util::error::{Error, RpcError}; +use garage_util::error::Error; +use garage_util::data::Uuid; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -66,46 +67,47 @@ pub struct RpcHelper { } impl RpcHelper { - pub async fn call<M, H>( + pub async fn call<M, H, S>( &self, endpoint: &Endpoint<M, H>, - to: NodeID, + to: Uuid, msg: M, strat: RequestStrategy, - ) -> Result<M::Response, Error> + ) -> Result<S, Error> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { self.call_arc(endpoint, to, Arc::new(msg), strat).await } - pub async fn call_arc<M, H>( + pub async fn call_arc<M, H, S>( &self, endpoint: &Endpoint<M, H>, - to: NodeID, + to: Uuid, msg: Arc<M>, strat: RequestStrategy, - ) -> Result<M::Response, Error> + ) -> Result<S, Error> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { + let node_id = to.into(); select! { - res = endpoint.call(&to, &msg, strat.rs_priority) => Ok(res?), - _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Rpc(RpcError::Timeout)), + res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??), + _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout), } } - pub async fn call_many<M, H>( + pub async fn call_many<M, H, S>( &self, endpoint: &Endpoint<M, H>, - to: &[NodeID], + to: &[Uuid], msg: M, strat: RequestStrategy, - ) -> Vec<(NodeID, Result<M::Response, Error>)> + ) -> Vec<(Uuid, Result<S, Error>)> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { let msg = Arc::new(msg); @@ -120,37 +122,38 @@ impl RpcHelper { .collect::<Vec<_>>() } - pub async fn broadcast<M, H>( + pub async fn broadcast<M, H, S>( &self, endpoint: &Endpoint<M, H>, msg: M, strat: RequestStrategy, - ) -> Vec<(NodeID, Result<M::Response, Error>)> + ) -> Vec<(Uuid, Result<S, Error>)> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { let to = self .fullmesh .get_peer_list() .iter() - .map(|p| p.id) + .map(|p| p.id.into()) .collect::<Vec<_>>(); self.call_many(endpoint, &to[..], msg, strat).await } /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if /// strategy could not be respected due to too many errors - pub async fn try_call_many<M, H>( + pub async fn try_call_many<M, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, - to: &[NodeID], + to: &[Uuid], msg: M, strategy: RequestStrategy, - ) -> Result<Vec<M::Response>, Error> + ) -> Result<Vec<S>, Error> where - M: Message + 'static, + M: Rpc<Response = Result<S, Error>> + 'static, H: EndpointHandler<M> + 'static, + S: Send, { let msg = Arc::new(msg); let mut resp_stream = to @@ -200,7 +203,7 @@ impl RpcHelper { Ok(results) } else { let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); - Err(Error::from(RpcError::TooManyErrors(errors))) + Err(Error::TooManyErrors(errors)) } } } |