aboutsummaryrefslogtreecommitdiff
path: root/src/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-23 13:37:10 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-23 13:37:10 +0000
commit37f880bc09bc4d9eb6aaeaec299a8d1d2043f674 (patch)
treed0f8aeeb826edbaaa8e38f9eb907b3a84579e4c5 /src/block.rs
parent73574ab43e5dca999545c931b959f2a3cbacea95 (diff)
downloadgarage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.tar.gz
garage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.zip
RequestStrategy with possible interruption or not
Diffstat (limited to 'src/block.rs')
-rw-r--r--src/block.rs46
1 files changed, 25 insertions, 21 deletions
diff --git a/src/block.rs b/src/block.rs
index ea4f7c10..21ddf837 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -317,7 +317,7 @@ impl BlockManager {
e
)));
}
- _ => {
+ Ok(_) => {
return Err(Error::Message(format!(
"Unexpected response to NeedBlockQuery RPC"
)));
@@ -327,13 +327,14 @@ impl BlockManager {
if need_nodes.len() > 0 {
let put_block_message = self.read_block(hash).await?;
- let put_responses = self
- .rpc_client
- .call_many(&need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT)
- .await;
- for resp in put_responses {
- resp?;
- }
+ self.rpc_client
+ .try_call_many(
+ &need_nodes[..],
+ put_block_message,
+ RequestStrategy::with_quorum(need_nodes.len())
+ .with_timeout(BLOCK_RW_TIMEOUT),
+ )
+ .await?;
}
}
fs::remove_file(path).await?;
@@ -354,17 +355,20 @@ impl BlockManager {
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let ring = self.system.ring.borrow().clone();
let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
- let msg = Arc::new(Message::GetBlock(*hash));
- let mut resp_stream = who
- .iter()
- .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT))
- .collect::<FuturesUnordered<_>>();
-
- while let Some(resp) = resp_stream.next().await {
- if let Ok(Message::PutBlock(msg)) = resp {
- if data::hash(&msg.data[..]) == *hash {
- return Ok(msg.data);
- }
+ let resps = self
+ .rpc_client
+ .try_call_many(
+ &who[..],
+ Message::GetBlock(*hash),
+ RequestStrategy::with_quorum(1)
+ .with_timeout(BLOCK_RW_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ for resp in resps {
+ if let Message::PutBlock(msg) = resp {
+ return Ok(msg.data);
}
}
Err(Error::Message(format!(
@@ -380,8 +384,8 @@ impl BlockManager {
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
- (self.system.config.data_replication_factor + 1) / 2,
- BLOCK_RW_TIMEOUT,
+ RequestStrategy::with_quorum((self.system.config.data_replication_factor + 1) / 2)
+ .with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
Ok(())