diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-23 13:37:10 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-23 13:37:10 +0000 |
commit | 37f880bc09bc4d9eb6aaeaec299a8d1d2043f674 (patch) | |
tree | d0f8aeeb826edbaaa8e38f9eb907b3a84579e4c5 /src/block.rs | |
parent | 73574ab43e5dca999545c931b959f2a3cbacea95 (diff) | |
download | garage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.tar.gz garage-37f880bc09bc4d9eb6aaeaec299a8d1d2043f674.zip |
RequestStrategy with possible interruption or not
Diffstat (limited to 'src/block.rs')
-rw-r--r-- | src/block.rs | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/src/block.rs b/src/block.rs index ea4f7c10..21ddf837 100644 --- a/src/block.rs +++ b/src/block.rs @@ -317,7 +317,7 @@ impl BlockManager { e ))); } - _ => { + Ok(_) => { return Err(Error::Message(format!( "Unexpected response to NeedBlockQuery RPC" ))); @@ -327,13 +327,14 @@ impl BlockManager { if need_nodes.len() > 0 { let put_block_message = self.read_block(hash).await?; - let put_responses = self - .rpc_client - .call_many(&need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT) - .await; - for resp in put_responses { - resp?; - } + self.rpc_client + .try_call_many( + &need_nodes[..], + put_block_message, + RequestStrategy::with_quorum(need_nodes.len()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; } } fs::remove_file(path).await?; @@ -354,17 +355,20 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); - let msg = Arc::new(Message::GetBlock(*hash)); - let mut resp_stream = who - .iter() - .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT)) - .collect::<FuturesUnordered<_>>(); - - while let Some(resp) = resp_stream.next().await { - if let Ok(Message::PutBlock(msg)) = resp { - if data::hash(&msg.data[..]) == *hash { - return Ok(msg.data); - } + let resps = self + .rpc_client + .try_call_many( + &who[..], + Message::GetBlock(*hash), + RequestStrategy::with_quorum(1) + .with_timeout(BLOCK_RW_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + for resp in resps { + if let Message::PutBlock(msg) = resp { + return Ok(msg.data); } } Err(Error::Message(format!( @@ -380,8 +384,8 @@ impl BlockManager { .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - (self.system.config.data_replication_factor + 1) / 2, - BLOCK_RW_TIMEOUT, + RequestStrategy::with_quorum((self.system.config.data_replication_factor + 1) / 2) + .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; Ok(()) |