diff options
-rw-r--r-- | TODO | 4 | ||||
-rw-r--r-- | src/block.rs | 46 | ||||
-rw-r--r-- | src/rpc_client.rs | 56 | ||||
-rw-r--r-- | src/table.rs | 17 |
4 files changed, 76 insertions, 47 deletions
@@ -12,10 +12,6 @@ Membership: keep IP addresses of failed nodes and try to reping them regularly RPC client/server: do not go through the serialization+HTTP+TLS+deserialization when doing a request to ourself. -RPC requests: unify quorum + timeout in a "RequestStrategy" class, -and add to the request strategy whether or not the request should continue in the background -once `quorum` valid responses have been received - Attaining S3 compatibility -------------------------- diff --git a/src/block.rs b/src/block.rs index ea4f7c10..21ddf837 100644 --- a/src/block.rs +++ b/src/block.rs @@ -317,7 +317,7 @@ impl BlockManager { e ))); } - _ => { + Ok(_) => { return Err(Error::Message(format!( "Unexpected response to NeedBlockQuery RPC" ))); @@ -327,13 +327,14 @@ impl BlockManager { if need_nodes.len() > 0 { let put_block_message = self.read_block(hash).await?; - let put_responses = self - .rpc_client - .call_many(&need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT) - .await; - for resp in put_responses { - resp?; - } + self.rpc_client + .try_call_many( + &need_nodes[..], + put_block_message, + RequestStrategy::with_quorum(need_nodes.len()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; } } fs::remove_file(path).await?; @@ -354,17 +355,20 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); - let msg = Arc::new(Message::GetBlock(*hash)); - let mut resp_stream = who - .iter() - .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT)) - .collect::<FuturesUnordered<_>>(); - - while let Some(resp) = resp_stream.next().await { - if let Ok(Message::PutBlock(msg)) = resp { - if data::hash(&msg.data[..]) == *hash { - return Ok(msg.data); - } + let resps = self + .rpc_client + .try_call_many( + &who[..], + Message::GetBlock(*hash), + RequestStrategy::with_quorum(1) + .with_timeout(BLOCK_RW_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + for resp in resps { + if let Message::PutBlock(msg) = resp { + return Ok(msg.data); } } Err(Error::Message(format!( @@ -380,8 +384,8 @@ impl BlockManager { .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - (self.system.config.data_replication_factor + 1) / 2, - BLOCK_RW_TIMEOUT, + RequestStrategy::with_quorum((self.system.config.data_replication_factor + 1) / 2) + .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; Ok(()) diff --git a/src/rpc_client.rs b/src/rpc_client.rs index eb02213a..8bc3fe50 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -20,6 +20,33 @@ use crate::rpc_server::RpcMessage; use crate::server::TlsConfig; use crate::tls_util; +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Copy, Clone)] +pub struct RequestStrategy { + pub rs_timeout: Duration, + pub rs_quorum: usize, + pub rs_interrupt_after_quorum: bool, +} + +impl RequestStrategy { + pub fn with_quorum(quorum: usize) -> Self { + RequestStrategy { + rs_timeout: DEFAULT_TIMEOUT, + rs_quorum: quorum, + rs_interrupt_after_quorum: false, + } + } + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.rs_timeout = timeout; + self + } + pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { + self.rs_interrupt_after_quorum = interrupt; + self + } +} + pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, @@ -83,9 +110,10 @@ impl<M: RpcMessage + 'static> RpcClient<M> { self: &Arc<Self>, to: &[UUID], msg: M, - stop_after: usize, - timeout: Duration, + strategy: RequestStrategy, ) -> Result<Vec<M>, Error> { + let timeout = strategy.rs_timeout; + let msg = Arc::new(msg); let mut resp_stream = to .to_vec() @@ -104,7 +132,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { match resp { Ok(msg) => { results.push(msg); - if results.len() >= stop_after { + if results.len() >= strategy.rs_quorum { break; } } @@ -114,23 +142,23 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } - if results.len() >= stop_after { - // Continue requests in background - // TODO: make this optionnal (only usefull for write requests) - + if results.len() >= strategy.rs_quorum { + // Continue requests in background. // Continue the remaining requests immediately using tokio::spawn // but enqueue a task in the background runner // to ensure that the process won't exit until the requests are done // (if we had just enqueued the resp_stream.collect directly in the background runner, // the requests might have been put on hold in the background runner's queue, // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { - resp_stream.collect::<Vec<_>>().await; - Ok(()) - }); - self.clone().background.spawn(wait_finished_fut.map(|x| { - x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e)))) - })); + if !strategy.rs_interrupt_after_quorum { + let wait_finished_fut = tokio::spawn(async move { + resp_stream.collect::<Vec<_>>().await; + Ok(()) + }); + self.clone().background.spawn(wait_finished_fut.map(|x| { + x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e)))) + })); + } Ok(results) } else { diff --git a/src/table.rs b/src/table.rs index bc375a96..53e17396 100644 --- a/src/table.rs +++ b/src/table.rs @@ -179,8 +179,8 @@ where .try_call_many( &who[..], rpc, - self.replication.write_quorum(), - TABLE_RPC_TIMEOUT, + RequestStrategy::with_quorum(self.replication.write_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; Ok(()) @@ -237,8 +237,9 @@ where .try_call_many( &who[..], rpc, - self.replication.read_quorum(), - TABLE_RPC_TIMEOUT, + RequestStrategy::with_quorum(self.replication.read_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), ) .await?; @@ -292,8 +293,9 @@ where .try_call_many( &who[..], rpc, - self.replication.read_quorum(), - TABLE_RPC_TIMEOUT, + RequestStrategy::with_quorum(self.replication.read_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), ) .await?; @@ -347,8 +349,7 @@ where .try_call_many( &who[..], TableRPC::<F>::Update(vec![what_enc]), - who.len(), - TABLE_RPC_TIMEOUT, + RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), ) .await?; Ok(()) |