aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_client.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-23 13:37:10 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-23 13:37:10 +0000
commit37f880bc09bc4d9eb6aaeaec299a8d1d2043f674 (patch)
treed0f8aeeb826edbaaa8e38f9eb907b3a84579e4c5 /src/rpc_client.rs
parent73574ab43e5dca999545c931b959f2a3cbacea95 (diff)
downloadgarage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.tar.gz
garage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.zip
RequestStrategy with possible interruption or not
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r--src/rpc_client.rs56
1 files changed, 42 insertions, 14 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index eb02213a..8bc3fe50 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -20,6 +20,33 @@ use crate::rpc_server::RpcMessage;
use crate::server::TlsConfig;
use crate::tls_util;
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+
+#[derive(Copy, Clone)]
+pub struct RequestStrategy {
+ pub rs_timeout: Duration,
+ pub rs_quorum: usize,
+ pub rs_interrupt_after_quorum: bool,
+}
+
+impl RequestStrategy {
+ pub fn with_quorum(quorum: usize) -> Self {
+ RequestStrategy {
+ rs_timeout: DEFAULT_TIMEOUT,
+ rs_quorum: quorum,
+ rs_interrupt_after_quorum: false,
+ }
+ }
+ pub fn with_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = timeout;
+ self
+ }
+ pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
+ self.rs_interrupt_after_quorum = interrupt;
+ self
+ }
+}
+
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@@ -83,9 +110,10 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self: &Arc<Self>,
to: &[UUID],
msg: M,
- stop_after: usize,
- timeout: Duration,
+ strategy: RequestStrategy,
) -> Result<Vec<M>, Error> {
+ let timeout = strategy.rs_timeout;
+
let msg = Arc::new(msg);
let mut resp_stream = to
.to_vec()
@@ -104,7 +132,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
match resp {
Ok(msg) => {
results.push(msg);
- if results.len() >= stop_after {
+ if results.len() >= strategy.rs_quorum {
break;
}
}
@@ -114,23 +142,23 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
- if results.len() >= stop_after {
- // Continue requests in background
- // TODO: make this optionnal (only usefull for write requests)
-
+ if results.len() >= strategy.rs_quorum {
+ // Continue requests in background.
// Continue the remaining requests immediately using tokio::spawn
// but enqueue a task in the background runner
// to ensure that the process won't exit until the requests are done
// (if we had just enqueued the resp_stream.collect directly in the background runner,
// the requests might have been put on hold in the background runner's queue,
// in which case they might timeout or otherwise fail)
- let wait_finished_fut = tokio::spawn(async move {
- resp_stream.collect::<Vec<_>>().await;
- Ok(())
- });
- self.clone().background.spawn(wait_finished_fut.map(|x| {
- x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
- }));
+ if !strategy.rs_interrupt_after_quorum {
+ let wait_finished_fut = tokio::spawn(async move {
+ resp_stream.collect::<Vec<_>>().await;
+ Ok(())
+ });
+ self.clone().background.spawn(wait_finished_fut.map(|x| {
+ x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
+ }));
+ }
Ok(results)
} else {