diff options
author | Alex <alex@adnab.me> | 2021-04-08 15:01:13 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2021-04-08 15:01:13 +0200 |
commit | c35c472dc9b04ed49e28a78bc9fa96baa7282502 (patch) | |
tree | cb7390fabc55b047b2d75a0af3f3db9ab9d247bc /src/rpc/rpc_client.rs | |
parent | c3bd672d58d32c8fc3b3225bfc2bfb5330ec726e (diff) | |
parent | 718ae005486baeed358d56cc7cd319fedd1e76eb (diff) | |
download | garage-c35c472dc9b04ed49e28a78bc9fa96baa7282502.tar.gz garage-c35c472dc9b04ed49e28a78bc9fa96baa7282502.zip |
Merge pull request 'add doc comments' (#53) from trinity-1686a/garage:doc-comments into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/53
Diffstat (limited to 'src/rpc/rpc_client.rs')
-rw-r--r-- | src/rpc/rpc_client.rs | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 5e76e215..8a6cc721 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -1,3 +1,4 @@ +//! Contain structs related to making RPCs use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; @@ -26,14 +27,19 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +/// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { + /// Max time to wait for reponse pub rs_timeout: Duration, + /// Min number of response to consider the request successful pub rs_quorum: usize, + /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, } impl RequestStrategy { + /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_quorum(quorum: usize) -> Self { RequestStrategy { rs_timeout: DEFAULT_TIMEOUT, @@ -41,19 +47,25 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, } } + /// Set timeout of the strategy pub fn with_timeout(mut self, timeout: Duration) -> Self { self.rs_timeout = timeout; self } + /// Set if requests can be dropped after quorum has been reached + /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } } +/// Shortcut for a boxed async function taking a message, and resolving to another message or an +/// error pub type LocalHandlerFn<M> = Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; +/// Client used to send RPC pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, @@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> { } impl<M: RpcMessage + 'static> RpcClient<M> { + /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers pub fn new( rac: RpcAddrClient<M>, background: Arc<BackgroundRunner>, @@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { }) } + /// Set the local handler, to process RPC to this node without network usage pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) where F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, @@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } + /// Get a RPC client to make calls using node's SocketAddr instead of its ID pub fn by_addr(&self) -> &RpcAddrClient<M> { &self.rpc_addr_client } + /// Make a RPC call pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { self.call_arc(to, Arc::new(msg), timeout).await } + /// Make a RPC call from a message stored in an Arc pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); @@ -134,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } + /// Make a RPC call to multiple servers, returning a Vec containing each result pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { let msg = Arc::new(msg); let mut resp_stream = to @@ -148,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> { results } + /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if + /// strategy could not be respected due to too many errors pub async fn try_call_many( self: &Arc<Self>, to: &[UUID], @@ -207,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } +/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request pub struct RpcAddrClient<M: RpcMessage> { phantom: PhantomData<M>, @@ -215,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> { } impl<M: RpcMessage> RpcAddrClient<M> { + /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { Self { phantom: PhantomData::default(), @@ -223,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } + /// Make a RPC pub async fn call<MB>( &self, to_addr: &SocketAddr, @@ -238,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } +/// HTTP client used to make RPCs pub struct RpcHttpClient { request_limiter: Semaphore, method: ClientMethod, @@ -249,6 +273,7 @@ enum ClientMethod { } impl RpcHttpClient { + /// Create a new RpcHttpClient pub fn new( max_concurrent_requests: usize, tls_config: &Option<TlsConfig>, @@ -279,6 +304,7 @@ impl RpcHttpClient { }) } + /// Make a RPC async fn call<M, MB>( &self, path: &str, |