diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-23 13:37:10 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-23 13:37:10 +0000 |
commit | 37f880bc09bc4d9eb6aaeaec299a8d1d2043f674 (patch) | |
tree | d0f8aeeb826edbaaa8e38f9eb907b3a84579e4c5 /src/rpc_client.rs | |
parent | 73574ab43e5dca999545c931b959f2a3cbacea95 (diff) | |
download | garage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.tar.gz garage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.zip |
RequestStrategy with possible interruption or not
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r-- | src/rpc_client.rs | 56 |
1 files changed, 42 insertions, 14 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs index eb02213a..8bc3fe50 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -20,6 +20,33 @@ use crate::rpc_server::RpcMessage; use crate::server::TlsConfig; use crate::tls_util; +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Copy, Clone)] +pub struct RequestStrategy { + pub rs_timeout: Duration, + pub rs_quorum: usize, + pub rs_interrupt_after_quorum: bool, +} + +impl RequestStrategy { + pub fn with_quorum(quorum: usize) -> Self { + RequestStrategy { + rs_timeout: DEFAULT_TIMEOUT, + rs_quorum: quorum, + rs_interrupt_after_quorum: false, + } + } + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.rs_timeout = timeout; + self + } + pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { + self.rs_interrupt_after_quorum = interrupt; + self + } +} + pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, @@ -83,9 +110,10 @@ impl<M: RpcMessage + 'static> RpcClient<M> { self: &Arc<Self>, to: &[UUID], msg: M, - stop_after: usize, - timeout: Duration, + strategy: RequestStrategy, ) -> Result<Vec<M>, Error> { + let timeout = strategy.rs_timeout; + let msg = Arc::new(msg); let mut resp_stream = to .to_vec() @@ -104,7 +132,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { match resp { Ok(msg) => { results.push(msg); - if results.len() >= stop_after { + if results.len() >= strategy.rs_quorum { break; } } @@ -114,23 +142,23 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } - if results.len() >= stop_after { - // Continue requests in background - // TODO: make this optionnal (only usefull for write requests) - + if results.len() >= strategy.rs_quorum { + // Continue requests in background. // Continue the remaining requests immediately using tokio::spawn // but enqueue a task in the background runner // to ensure that the process won't exit until the requests are done // (if we had just enqueued the resp_stream.collect directly in the background runner, // the requests might have been put on hold in the background runner's queue, // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { - resp_stream.collect::<Vec<_>>().await; - Ok(()) - }); - self.clone().background.spawn(wait_finished_fut.map(|x| { - x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e)))) - })); + if !strategy.rs_interrupt_after_quorum { + let wait_finished_fut = tokio::spawn(async move { + resp_stream.collect::<Vec<_>>().await; + Ok(()) + }); + self.clone().background.spawn(wait_finished_fut.map(|x| { + x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e)))) + })); + } Ok(results) } else { |