aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-19 23:38:42 +0200
commit65070f3c05775f6692f4a16b6a304991d0510301 (patch)
treeaad55f988f3d29e3643e4141af6c53e79fd89257 /src/rpc/rpc_helper.rs
parente6da0dc90098a7c830e14f6f0dce61e8e7132d3a (diff)
downloadgarage-65070f3c05775f6692f4a16b6a304991d0510301.tar.gz
garage-65070f3c05775f6692f4a16b6a304991d0510301.zip
Improvements to CLI and various fixes for netapp version
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs53
1 files changed, 28 insertions, 25 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index c9458ee6..9f735ab4 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -8,13 +8,14 @@ use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-pub use netapp::endpoint::{Endpoint, EndpointHandler, Message};
+pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
-use garage_util::error::{Error, RpcError};
+use garage_util::error::Error;
+use garage_util::data::Uuid;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -66,46 +67,47 @@ pub struct RpcHelper {
}
impl RpcHelper {
- pub async fn call<M, H>(
+ pub async fn call<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- to: NodeID,
+ to: Uuid,
msg: M,
strat: RequestStrategy,
- ) -> Result<M::Response, Error>
+ ) -> Result<S, Error>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
self.call_arc(endpoint, to, Arc::new(msg), strat).await
}
- pub async fn call_arc<M, H>(
+ pub async fn call_arc<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- to: NodeID,
+ to: Uuid,
msg: Arc<M>,
strat: RequestStrategy,
- ) -> Result<M::Response, Error>
+ ) -> Result<S, Error>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
+ let node_id = to.into();
select! {
- res = endpoint.call(&to, &msg, strat.rs_priority) => Ok(res?),
- _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Rpc(RpcError::Timeout)),
+ res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??),
+ _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout),
}
}
- pub async fn call_many<M, H>(
+ pub async fn call_many<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- to: &[NodeID],
+ to: &[Uuid],
msg: M,
strat: RequestStrategy,
- ) -> Vec<(NodeID, Result<M::Response, Error>)>
+ ) -> Vec<(Uuid, Result<S, Error>)>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
let msg = Arc::new(msg);
@@ -120,37 +122,38 @@ impl RpcHelper {
.collect::<Vec<_>>()
}
- pub async fn broadcast<M, H>(
+ pub async fn broadcast<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
msg: M,
strat: RequestStrategy,
- ) -> Vec<(NodeID, Result<M::Response, Error>)>
+ ) -> Vec<(Uuid, Result<S, Error>)>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
let to = self
.fullmesh
.get_peer_list()
.iter()
- .map(|p| p.id)
+ .map(|p| p.id.into())
.collect::<Vec<_>>();
self.call_many(endpoint, &to[..], msg, strat).await
}
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
/// strategy could not be respected due to too many errors
- pub async fn try_call_many<M, H>(
+ pub async fn try_call_many<M, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
- to: &[NodeID],
+ to: &[Uuid],
msg: M,
strategy: RequestStrategy,
- ) -> Result<Vec<M::Response>, Error>
+ ) -> Result<Vec<S>, Error>
where
- M: Message + 'static,
+ M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static,
+ S: Send,
{
let msg = Arc::new(msg);
let mut resp_stream = to
@@ -200,7 +203,7 @@ impl RpcHelper {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::from(RpcError::TooManyErrors(errors)))
+ Err(Error::TooManyErrors(errors))
}
}
}