aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-05 15:57:25 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-06 16:35:28 +0200
commita44f4869312678e3c6eaac1a26a7beb4652f3e69 (patch)
tree653a7ac26e3ddf5ff344452fea3c26d7226510ea /src/block
parent3a74844df02b5ecec0b96bfb8b2ff3bcdd33f7f4 (diff)
downloadgarage-a44f4869312678e3c6eaac1a26a7beb4652f3e69.tar.gz
garage-a44f4869312678e3c6eaac1a26a7beb4652f3e69.zip
block manager: refactoring & increase max worker count to 8
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs26
-rw-r--r--src/block/resync.rs2
2 files changed, 14 insertions, 14 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index d18d3f4c..b42a9aa9 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -279,21 +279,21 @@ impl BlockManager {
let res = match res {
Ok(res) => res,
Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
+ debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
_ => {
- debug!("Node {:?} returned a malformed response", node);
+ debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue;
}
};
match f(header, stream).await {
Ok(ret) => return Ok(ret),
Err(e) => {
- debug!("Error reading stream from node {:?}: {}", node, e);
+ debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
}
}
}
@@ -302,15 +302,14 @@ impl BlockManager {
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
+ debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ let msg = format!("Get block {:?}: no node returned a valid block", hash);
+ debug!("{}", msg);
+ Err(Error::Message(msg))
}
// ---- Public interface ----
@@ -666,7 +665,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
BlockRpc::PutBlock { hash, header } => Resp::new(
self.handle_put_block(*hash, *header, message.take_stream())
.await
- .map(|_| BlockRpc::Ok),
+ .map(|()| BlockRpc::Ok),
),
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => {
@@ -687,15 +686,14 @@ impl BlockManagerLocked {
let compressed = data.is_compressed();
let data = data.inner_buffer();
- let mut tgt_path = mgr.data_layout.primary_block_dir(hash);
- let directory = tgt_path.clone();
+ let directory = mgr.data_layout.primary_block_dir(hash);
+
+ let mut tgt_path = directory.clone();
tgt_path.push(hex::encode(hash));
if compressed {
tgt_path.set_extension("zst");
}
- fs::create_dir_all(&directory).await?;
-
let to_delete = match (mgr.find_block(hash).await, compressed) {
// If the block is stored in the wrong directory,
// write it again at the correct path and delete the old path
@@ -723,6 +721,8 @@ impl BlockManagerLocked {
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
path_tmp.set_extension(tmp_extension);
+ fs::create_dir_all(&directory).await?;
+
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
diff --git a/src/block/resync.rs b/src/block/resync.rs
index bb43ad7e..9c1da4a7 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// No more than 4 resync workers can be running in the system
-pub(crate) const MAX_RESYNC_WORKERS: usize = 4;
+pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
// Resync tranquility is initially set to 2, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;