diff options
Diffstat (limited to 'src/rpc_client.rs')
-rw-r--r-- | src/rpc_client.rs | 47 |
1 files changed, 38 insertions, 9 deletions
diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 057c19e8..7995cdfa 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -16,7 +16,6 @@ use crate::membership::System; pub async fn rpc_call_many(sys: Arc<System>, to: &[UUID], msg: &Message, - stop_after: Option<usize>, timeout: Duration) -> Vec<Result<Message, Error>> { @@ -25,19 +24,49 @@ pub async fn rpc_call_many(sys: Arc<System>, .collect::<FuturesUnordered<_>>(); let mut results = vec![]; - let mut n_ok = 0; while let Some(resp) = resp_stream.next().await { - if resp.is_ok() { - n_ok += 1 - } results.push(resp); - if let Some(n) = stop_after { - if n_ok >= n { - break + } + results +} + +pub async fn rpc_try_call_many(sys: Arc<System>, + to: &[UUID], + msg: &Message, + stop_after: usize, + timeout: Duration) + -> Result<Vec<Message>, Error> +{ + let mut resp_stream = to.iter() + .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + .collect::<FuturesUnordered<_>>(); + + let mut results = vec![]; + let mut errors = vec![]; + + while let Some(resp) = resp_stream.next().await { + match resp { + Ok(msg) => { + results.push(msg); + if results.len() >= stop_after { + break + } + } + Err(e) => { + errors.push(e); } } } - results + + if results.len() >= stop_after { + Ok(results) + } else { + let mut msg = "Too many failures:".to_string(); + for e in errors { + msg += &format!("\n{}", e); + } + Err(Error::Message(msg)) + } } pub async fn rpc_call(sys: Arc<System>, |