From a44f4869312678e3c6eaac1a26a7beb4652f3e69 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:57:25 +0200 Subject: block manager: refactoring & increase max worker count to 8 --- src/block/manager.rs | 26 +++++++++++++------------- src/block/resync.rs | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index d18d3f4c..b42a9aa9 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -279,21 +279,21 @@ impl BlockManager { let res = match res { Ok(res) => res, Err(e) => { - debug!("Node {:?} returned error: {}", node, e); + debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); continue; } }; let (header, stream) = match res.into_parts() { (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), _ => { - debug!("Node {:?} returned a malformed response", node); + debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; } }; match f(header, stream).await { Ok(ret) => return Ok(ret), Err(e) => { - debug!("Error reading stream from node {:?}: {}", node, e); + debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); } } } @@ -302,15 +302,14 @@ impl BlockManager { // TODO: keep first request running when initiating a new one and take the // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); + debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; } - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + let msg = format!("Get block {:?}: no node returned a valid block", hash); + debug!("{}", msg); + Err(Error::Message(msg)) } // ---- Public interface ---- @@ -666,7 +665,7 @@ impl StreamingEndpointHandler for BlockManager { BlockRpc::PutBlock { hash, header } => Resp::new( self.handle_put_block(*hash, *header, message.take_stream()) .await - .map(|_| BlockRpc::Ok), + .map(|()| BlockRpc::Ok), ), BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { @@ -687,15 +686,14 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut tgt_path = mgr.data_layout.primary_block_dir(hash); - let directory = tgt_path.clone(); + let directory = mgr.data_layout.primary_block_dir(hash); + + let mut tgt_path = directory.clone(); tgt_path.push(hex::encode(hash)); if compressed { tgt_path.set_extension("zst"); } - fs::create_dir_all(&directory).await?; - let to_delete = match (mgr.find_block(hash).await, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path @@ -723,6 +721,8 @@ impl BlockManagerLocked { let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); path_tmp.set_extension(tmp_extension); + fs::create_dir_all(&directory).await?; + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); let mut f = fs::File::create(&path_tmp).await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index bb43ad7e..9c1da4a7 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; // No more than 4 resync workers can be running in the system -pub(crate) const MAX_RESYNC_WORKERS: usize = 4; +pub(crate) const MAX_RESYNC_WORKERS: usize = 8; // Resync tranquility is initially set to 2, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_RESYNC_TRANQUILITY: u32 = 2; -- cgit v1.2.3