aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_client.rs')
-rw-r--r--src/rpc/rpc_client.rs27
1 files changed, 26 insertions, 1 deletions
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index eb4f6620..261dec7a 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -1,3 +1,4 @@
+//! Contain structs related to making RPCs
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::net::SocketAddr;
@@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
+ /// Max time to wait for reponse
pub rs_timeout: Duration,
+ /// Min number of response to consider the request successful
pub rs_quorum: usize,
+ /// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
}
impl RequestStrategy {
+ /// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT,
@@ -41,19 +47,24 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
}
}
+ /// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout;
self
}
+ /// Set if requests can be dropped after quorum has been reached
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
}
+/// Shortcut for a boxed async function taking a message, and resolving to another message or an
+/// error
pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
+/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@@ -64,6 +75,7 @@ pub struct RpcClient<M: RpcMessage> {
}
impl<M: RpcMessage + 'static> RpcClient<M> {
+ /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new(
rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>,
@@ -77,6 +89,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
})
}
+ /// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@@ -90,14 +103,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler))));
}
- pub fn by_addr(&self) -> &RpcAddrClient<M> {
+ /// Get the server address this client connect to
+ pub fn get_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client
}
+ /// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
+ /// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
@@ -135,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
+ /// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
@@ -149,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results
}
+ /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
+ /// strategy could not be respected due to to many errors
pub async fn try_call_many(
self: &Arc<Self>,
to: &[UUID],
@@ -208,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
+/// Endpoint to which send RPC
pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>,
@@ -216,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
}
impl<M: RpcMessage> RpcAddrClient<M> {
+ /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
@@ -224,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
+ /// Make a RPC
pub async fn call<MB>(
&self,
to_addr: &SocketAddr,
@@ -239,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
+/// HTTP client used to make RPCs
pub struct RpcHttpClient {
request_limiter: Semaphore,
method: ClientMethod,
@@ -250,6 +273,7 @@ enum ClientMethod {
}
impl RpcHttpClient {
+ /// Create a new RpcHttpClient
pub fn new(
max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>,
@@ -280,6 +304,7 @@ impl RpcHttpClient {
})
}
+ /// Make a RPC
async fn call<M, MB>(
&self,
path: &str,