aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_client.rs')
-rw-r--r--src/rpc/rpc_client.rs38
1 files changed, 19 insertions, 19 deletions
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index f68e4c03..5ed43d44 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -19,7 +19,7 @@ use tokio::sync::{watch, Semaphore};
use garage_util::background::BackgroundRunner;
use garage_util::config::TlsConfig;
use garage_util::data::*;
-use garage_util::error::{Error, RPCError};
+use garage_util::error::{Error, RpcError};
use crate::membership::Status;
use crate::rpc_server::RpcMessage;
@@ -70,7 +70,7 @@ pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
- local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>,
+ local_handler: ArcSwapOption<(Uuid, LocalHandlerFn<M>)>,
rpc_addr_client: RpcAddrClient<M>,
}
@@ -91,7 +91,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Set the local handler, to process RPC to this node without network usage
- pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
+ pub fn set_local_handler<F, Fut>(&self, my_id: Uuid, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
@@ -110,12 +110,12 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Make a RPC call
- pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
+ pub async fn call(&self, to: Uuid, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
/// Make a RPC call from a message stored in an Arc
- pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
+ pub async fn call_arc(&self, to: Uuid, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
if to.borrow() == my_id {
@@ -128,7 +128,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if node_status.is_up() {
node_status
} else {
- return Err(Error::from(RPCError::NodeDown(to)));
+ return Err(Error::from(RpcError::NodeDown(to)));
}
}
None => {
@@ -152,7 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Make a RPC call to multiple servers, returning a Vec containing each result
- pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
+ pub async fn call_many(&self, to: &[Uuid], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
.iter()
@@ -170,7 +170,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
/// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
- to: &[UUID],
+ to: &[Uuid],
msg: M,
strategy: RequestStrategy,
) -> Result<Vec<M>, Error> {
@@ -222,7 +222,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::from(RPCError::TooManyErrors(errors)))
+ Err(Error::from(RpcError::TooManyErrors(errors)))
}
}
}
@@ -251,7 +251,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<Result<M, Error>, RPCError>
+ ) -> Result<Result<M, Error>, RpcError>
where
MB: Borrow<M>,
{
@@ -268,8 +268,8 @@ pub struct RpcHttpClient {
}
enum ClientMethod {
- HTTP(Client<HttpConnector, hyper::Body>),
- HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
+ Http(Client<HttpConnector, hyper::Body>),
+ Https(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
}
impl RpcHttpClient {
@@ -294,9 +294,9 @@ impl RpcHttpClient {
let connector =
tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage");
- ClientMethod::HTTPS(Client::builder().build(connector))
+ ClientMethod::Https(Client::builder().build(connector))
} else {
- ClientMethod::HTTP(Client::new())
+ ClientMethod::Http(Client::new())
};
Ok(RpcHttpClient {
method,
@@ -311,14 +311,14 @@ impl RpcHttpClient {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<Result<M, Error>, RPCError>
+ ) -> Result<Result<M, Error>, RpcError>
where
MB: Borrow<M>,
M: RpcMessage,
{
let uri = match self.method {
- ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path),
- ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path),
+ ClientMethod::Http(_) => format!("http://{}/{}", to_addr, path),
+ ClientMethod::Https(_) => format!("https://{}/{}", to_addr, path),
};
let req = Request::builder()
@@ -327,8 +327,8 @@ impl RpcHttpClient {
.body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?;
let resp_fut = match &self.method {
- ClientMethod::HTTP(client) => client.request(req).fuse(),
- ClientMethod::HTTPS(client) => client.request(req).fuse(),
+ ClientMethod::Http(client) => client.request(req).fuse(),
+ ClientMethod::Https(client) => client.request(req).fuse(),
};
trace!("({}) Acquiring request_limiter slot...", path);