aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r--src/rpc_client.rs47
1 files changed, 38 insertions, 9 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 057c19e8..7995cdfa 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -16,7 +16,6 @@ use crate::membership::System;
pub async fn rpc_call_many(sys: Arc<System>,
to: &[UUID],
msg: &Message,
- stop_after: Option<usize>,
timeout: Duration)
-> Vec<Result<Message, Error>>
{
@@ -25,19 +24,49 @@ pub async fn rpc_call_many(sys: Arc<System>,
.collect::<FuturesUnordered<_>>();
let mut results = vec![];
- let mut n_ok = 0;
while let Some(resp) = resp_stream.next().await {
- if resp.is_ok() {
- n_ok += 1
- }
results.push(resp);
- if let Some(n) = stop_after {
- if n_ok >= n {
- break
+ }
+ results
+}
+
+pub async fn rpc_try_call_many(sys: Arc<System>,
+ to: &[UUID],
+ msg: &Message,
+ stop_after: usize,
+ timeout: Duration)
+ -> Result<Vec<Message>, Error>
+{
+ let mut resp_stream = to.iter()
+ .map(|to| rpc_call(sys.clone(), to, msg, timeout))
+ .collect::<FuturesUnordered<_>>();
+
+ let mut results = vec![];
+ let mut errors = vec![];
+
+ while let Some(resp) = resp_stream.next().await {
+ match resp {
+ Ok(msg) => {
+ results.push(msg);
+ if results.len() >= stop_after {
+ break
+ }
+ }
+ Err(e) => {
+ errors.push(e);
}
}
}
- results
+
+ if results.len() >= stop_after {
+ Ok(results)
+ } else {
+ let mut msg = "Too many failures:".to_string();
+ for e in errors {
+ msg += &format!("\n{}", e);
+ }
+ Err(Error::Message(msg))
+ }
}
pub async fn rpc_call(sys: Arc<System>,