aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block.rs46
-rw-r--r--src/rpc_client.rs56
-rw-r--r--src/table.rs17
3 files changed, 76 insertions, 43 deletions
diff --git a/src/block.rs b/src/block.rs
index ea4f7c10..21ddf837 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -317,7 +317,7 @@ impl BlockManager {
e
)));
}
- _ => {
+ Ok(_) => {
return Err(Error::Message(format!(
"Unexpected response to NeedBlockQuery RPC"
)));
@@ -327,13 +327,14 @@ impl BlockManager {
if need_nodes.len() > 0 {
let put_block_message = self.read_block(hash).await?;
- let put_responses = self
- .rpc_client
- .call_many(&need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT)
- .await;
- for resp in put_responses {
- resp?;
- }
+ self.rpc_client
+ .try_call_many(
+ &need_nodes[..],
+ put_block_message,
+ RequestStrategy::with_quorum(need_nodes.len())
+ .with_timeout(BLOCK_RW_TIMEOUT),
+ )
+ .await?;
}
}
fs::remove_file(path).await?;
@@ -354,17 +355,20 @@ impl BlockManager {
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let ring = self.system.ring.borrow().clone();
let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
- let msg = Arc::new(Message::GetBlock(*hash));
- let mut resp_stream = who
- .iter()
- .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT))
- .collect::<FuturesUnordered<_>>();
-
- while let Some(resp) = resp_stream.next().await {
- if let Ok(Message::PutBlock(msg)) = resp {
- if data::hash(&msg.data[..]) == *hash {
- return Ok(msg.data);
- }
+ let resps = self
+ .rpc_client
+ .try_call_many(
+ &who[..],
+ Message::GetBlock(*hash),
+ RequestStrategy::with_quorum(1)
+ .with_timeout(BLOCK_RW_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ for resp in resps {
+ if let Message::PutBlock(msg) = resp {
+ return Ok(msg.data);
}
}
Err(Error::Message(format!(
@@ -380,8 +384,8 @@ impl BlockManager {
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
- (self.system.config.data_replication_factor + 1) / 2,
- BLOCK_RW_TIMEOUT,
+ RequestStrategy::with_quorum((self.system.config.data_replication_factor + 1) / 2)
+ .with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
Ok(())
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index eb02213a..8bc3fe50 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -20,6 +20,33 @@ use crate::rpc_server::RpcMessage;
use crate::server::TlsConfig;
use crate::tls_util;
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+
+#[derive(Copy, Clone)]
+pub struct RequestStrategy {
+ pub rs_timeout: Duration,
+ pub rs_quorum: usize,
+ pub rs_interrupt_after_quorum: bool,
+}
+
+impl RequestStrategy {
+ pub fn with_quorum(quorum: usize) -> Self {
+ RequestStrategy {
+ rs_timeout: DEFAULT_TIMEOUT,
+ rs_quorum: quorum,
+ rs_interrupt_after_quorum: false,
+ }
+ }
+ pub fn with_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = timeout;
+ self
+ }
+ pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
+ self.rs_interrupt_after_quorum = interrupt;
+ self
+ }
+}
+
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@@ -83,9 +110,10 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self: &Arc<Self>,
to: &[UUID],
msg: M,
- stop_after: usize,
- timeout: Duration,
+ strategy: RequestStrategy,
) -> Result<Vec<M>, Error> {
+ let timeout = strategy.rs_timeout;
+
let msg = Arc::new(msg);
let mut resp_stream = to
.to_vec()
@@ -104,7 +132,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
match resp {
Ok(msg) => {
results.push(msg);
- if results.len() >= stop_after {
+ if results.len() >= strategy.rs_quorum {
break;
}
}
@@ -114,23 +142,23 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
- if results.len() >= stop_after {
- // Continue requests in background
- // TODO: make this optionnal (only usefull for write requests)
-
+ if results.len() >= strategy.rs_quorum {
+ // Continue requests in background.
// Continue the remaining requests immediately using tokio::spawn
// but enqueue a task in the background runner
// to ensure that the process won't exit until the requests are done
// (if we had just enqueued the resp_stream.collect directly in the background runner,
// the requests might have been put on hold in the background runner's queue,
// in which case they might timeout or otherwise fail)
- let wait_finished_fut = tokio::spawn(async move {
- resp_stream.collect::<Vec<_>>().await;
- Ok(())
- });
- self.clone().background.spawn(wait_finished_fut.map(|x| {
- x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
- }));
+ if !strategy.rs_interrupt_after_quorum {
+ let wait_finished_fut = tokio::spawn(async move {
+ resp_stream.collect::<Vec<_>>().await;
+ Ok(())
+ });
+ self.clone().background.spawn(wait_finished_fut.map(|x| {
+ x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
+ }));
+ }
Ok(results)
} else {
diff --git a/src/table.rs b/src/table.rs
index bc375a96..53e17396 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -179,8 +179,8 @@ where
.try_call_many(
&who[..],
rpc,
- self.replication.write_quorum(),
- TABLE_RPC_TIMEOUT,
+ RequestStrategy::with_quorum(self.replication.write_quorum())
+ .with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
Ok(())
@@ -237,8 +237,9 @@ where
.try_call_many(
&who[..],
rpc,
- self.replication.read_quorum(),
- TABLE_RPC_TIMEOUT,
+ RequestStrategy::with_quorum(self.replication.read_quorum())
+ .with_timeout(TABLE_RPC_TIMEOUT)
+ .interrupt_after_quorum(true),
)
.await?;
@@ -292,8 +293,9 @@ where
.try_call_many(
&who[..],
rpc,
- self.replication.read_quorum(),
- TABLE_RPC_TIMEOUT,
+ RequestStrategy::with_quorum(self.replication.read_quorum())
+ .with_timeout(TABLE_RPC_TIMEOUT)
+ .interrupt_after_quorum(true),
)
.await?;
@@ -347,8 +349,7 @@ where
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
- who.len(),
- TABLE_RPC_TIMEOUT,
+ RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
Ok(())