diff options
Diffstat (limited to 'src/rpc/rpc_client.rs')
-rw-r--r-- | src/rpc/rpc_client.rs | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index eb4f6620..261dec7a 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -1,3 +1,4 @@ +//! Contain structs related to making RPCs use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; @@ -26,14 +27,19 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +/// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { + /// Max time to wait for reponse pub rs_timeout: Duration, + /// Min number of response to consider the request successful pub rs_quorum: usize, + /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, } impl RequestStrategy { + /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_quorum(quorum: usize) -> Self { RequestStrategy { rs_timeout: DEFAULT_TIMEOUT, @@ -41,19 +47,24 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, } } + /// Set timeout of the strategy pub fn with_timeout(mut self, timeout: Duration) -> Self { self.rs_timeout = timeout; self } + /// Set if requests can be dropped after quorum has been reached pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } } +/// Shortcut for a boxed async function taking a message, and resolving to another message or an +/// error pub type LocalHandlerFn<M> = Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; +/// Client used to send RPC pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, @@ -64,6 +75,7 @@ pub struct RpcClient<M: RpcMessage> { } impl<M: RpcMessage + 'static> RpcClient<M> { + /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers pub fn new( rac: RpcAddrClient<M>, background: Arc<BackgroundRunner>, @@ -77,6 +89,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { }) } + /// Set the local handler, to process RPC to this node without network usage pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) where F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, @@ -90,14 +103,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } - pub fn by_addr(&self) -> &RpcAddrClient<M> { + /// Get the server address this client connect to + pub fn get_addr(&self) -> &RpcAddrClient<M> { &self.rpc_addr_client } + /// Make a RPC call pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { self.call_arc(to, Arc::new(msg), timeout).await } + /// Make a RPC call from a message stored in an Arc pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); @@ -135,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } + /// Make a RPC call to multiple servers, returning a Vec containing each result pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { let msg = Arc::new(msg); let mut resp_stream = to @@ -149,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> { results } + /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if + /// strategy could not be respected due to to many errors pub async fn try_call_many( self: &Arc<Self>, to: &[UUID], @@ -208,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } +/// Endpoint to which send RPC pub struct RpcAddrClient<M: RpcMessage> { phantom: PhantomData<M>, @@ -216,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> { } impl<M: RpcMessage> RpcAddrClient<M> { + /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { Self { phantom: PhantomData::default(), @@ -224,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } + /// Make a RPC pub async fn call<MB>( &self, to_addr: &SocketAddr, @@ -239,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } +/// HTTP client used to make RPCs pub struct RpcHttpClient { request_limiter: Semaphore, method: ClientMethod, @@ -250,6 +273,7 @@ enum ClientMethod { } impl RpcHttpClient { + /// Create a new RpcHttpClient pub fn new( max_concurrent_requests: usize, tls_config: &Option<TlsConfig>, @@ -280,6 +304,7 @@ impl RpcHttpClient { }) } + /// Make a RPC async fn call<M, MB>( &self, path: &str, |