From 69b89fb46df3b2b0ab189218812efdcf5852f8f2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 26 Oct 2021 19:13:41 +0200 Subject: Fix race in block resync --- src/model/block.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 35d3871a..08002911 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -12,7 +12,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::*; use garage_util::time::*; use garage_util::token_bucket::TokenBucket; @@ -28,7 +28,7 @@ use crate::garage::Garage; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; -pub const BACKGROUND_WORKERS: u64 = 2; +pub const BACKGROUND_WORKERS: u64 = 1; const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); @@ -393,8 +393,7 @@ impl BlockManager { } async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result<(), Error> { - if let Some(first_item) = self.resync_queue.iter().next() { - let (time_bytes, hash_bytes) = first_item?; + if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let now = now_msec(); if now >= time_msec { @@ -404,9 +403,9 @@ impl BlockManager { warn!("Error when resyncing {:?}: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } - self.resync_queue.remove(&time_bytes)?; res?; // propagate error to delay main loop } else { + self.resync_queue.insert(time_bytes, hash_bytes)?; let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { _ = delay.fuse() => {}, @@ -453,7 +452,7 @@ impl BlockManager { &self.endpoint, *to, msg.clone(), - RequestStrategy::with_priority(PRIO_NORMAL) + RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) }); @@ -461,7 +460,7 @@ impl BlockManager { let mut need_nodes = vec![]; for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { - match needed? { + match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { need_nodes.push(*node); @@ -482,14 +481,14 @@ impl BlockManager { need_nodes.len() ); - let put_block_message = self.read_block(hash).await?; + let put_block_message = self.read_block(hash).await.err_context("PutBlock RPC")?; self.system .rpc .try_call_many( &self.endpoint, &need_nodes[..], put_block_message, - RequestStrategy::with_priority(PRIO_NORMAL) + RequestStrategy::with_priority(PRIO_BACKGROUND) .with_quorum(need_nodes.len()) .with_timeout(BLOCK_RW_TIMEOUT), ) -- cgit v1.2.3