aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-05 15:04:59 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-06 16:35:28 +0200
commit1b8c265c14b2f788693aed6c15f65684c72d2d1c (patch)
treef7114189c8cbb99aa435a08688b7d3471d92b766
parent3199cab4c89cac7351925303f8bb6408ffe56ff0 (diff)
downloadgarage-1b8c265c14b2f788693aed6c15f65684c72d2d1c.tar.gz
garage-1b8c265c14b2f788693aed6c15f65684c72d2d1c.zip
block manager: get rid of check_block_status
-rw-r--r--src/block/manager.rs84
-rw-r--r--src/block/resync.rs17
2 files changed, 45 insertions, 56 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 798cedf9..5bad34d4 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -543,8 +543,8 @@ impl BlockManager {
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let block_path = match self.find_block(hash).await {
- Some(p) => p,
+ match self.find_block(hash).await {
+ Some(p) => self.read_block_from(hash, &p).await,
None => {
// Not found but maybe we should have had it ??
self.resync
@@ -554,9 +554,15 @@ impl BlockManager {
hash
)));
}
- };
+ }
+ }
- let (path, compressed) = match &block_path {
+ pub(crate) async fn read_block_from(
+ &self,
+ hash: &Hash,
+ block_path: &DataBlockPath,
+ ) -> Result<DataBlock, Error> {
+ let (path, compressed) = match block_path {
DataBlockPath::Plain(p) => (p, false),
DataBlockPath::Compressed(p) => (p, true),
};
@@ -581,7 +587,7 @@ impl BlockManager {
);
self.lock_mutate(hash)
.await
- .move_block_to_corrupted(&block_path)
+ .move_block_to_corrupted(block_path)
.await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
@@ -591,18 +597,11 @@ impl BlockManager {
Ok(data)
}
- /// Check if this node has a block and whether it needs it
- pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.lock_mutate(hash)
- .await
- .check_block_status(hash, self)
- .await
- }
-
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash).await?;
- Ok(needed.is_nonzero() && !exists)
+ let rc = self.rc.get_block_rc(hash)?;
+ let exists = self.find_block(hash).await.is_some();
+ Ok(rc.is_nonzero() && !exists)
}
/// Delete block if it is not needed anymore
@@ -613,8 +612,8 @@ impl BlockManager {
.await
}
- /// Utility: check if block is stored compressed.
- async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
+ /// Find the path where a block is currently stored
+ pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
let dirs = Some(self.data_layout.primary_block_dir(hash))
.into_iter()
.chain(self.data_layout.secondary_block_dirs(hash));
@@ -687,17 +686,6 @@ pub(crate) struct BlockStatus {
}
impl BlockManagerLocked {
- async fn check_block_status(
- &self,
- hash: &Hash,
- mgr: &BlockManager,
- ) -> Result<BlockStatus, Error> {
- let exists = mgr.find_block(hash).await.is_some();
- let needed = mgr.rc.get_block_rc(hash)?;
-
- Ok(BlockStatus { exists, needed })
- }
-
async fn write_block(
&self,
hash: &Hash,
@@ -710,32 +698,32 @@ impl BlockManagerLocked {
let mut tgt_path = mgr.data_layout.primary_block_dir(hash);
let directory = tgt_path.clone();
tgt_path.push(hex::encode(hash));
- if compressed {
- tgt_path.set_extension("zst");
- }
+ if compressed {
+ tgt_path.set_extension("zst");
+ }
fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.find_block(hash).await, compressed) {
- // If the block is stored in the wrong directory,
- // write it again at the correct path and delete the old path
+ // If the block is stored in the wrong directory,
+ // write it again at the correct path and delete the old path
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
- // If the block is already stored not compressed but we have a compressed
- // copy, write the compressed copy and delete the uncompressed one
+ // If the block is already stored not compressed but we have a compressed
+ // copy, write the compressed copy and delete the uncompressed one
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
- // If the block is already stored compressed,
- // keep the stored copy, we have nothing to do
+ // If the block is already stored compressed,
+ // keep the stored copy, we have nothing to do
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
- // If the block is already stored not compressed,
- // and we don't have a compressed copy either,
- // keep the stored copy, we have nothing to do
+ // If the block is already stored not compressed,
+ // and we don't have a compressed copy either,
+ // keep the stored copy, we have nothing to do
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
- // If the block isn't stored already, just store what is given to us
+ // If the block isn't stored already, just store what is given to us
(None, _) => None,
};
@@ -799,14 +787,12 @@ impl BlockManagerLocked {
}
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
-
- if exists && needed.is_deletable() {
- let path_opt = match mgr.find_block(hash).await {
- Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p),
- None => None,
- };
- if let Some(path) = path_opt {
+ let rc = mgr.rc.get_block_rc(hash)?;
+ if rc.is_deletable() {
+ while let Some(path) = mgr.find_block(hash).await {
+ let path = match path {
+ DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
+ };
fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
}
diff --git a/src/block/resync.rs b/src/block/resync.rs
index ea280ad4..bb43ad7e 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -359,20 +359,23 @@ impl BlockResyncManager {
}
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
- let BlockStatus { exists, needed } = manager.check_block_status(hash).await?;
+ let existing_path = manager.find_block(hash).await;
+ let exists = existing_path.is_some();
+ let rc = manager.rc.get_block_rc(hash)?;
- if exists != needed.is_needed() || exists != needed.is_nonzero() {
+ if exists != rc.is_needed() || exists != rc.is_nonzero() {
debug!(
"Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
hash,
exists,
- needed.is_nonzero(),
- needed.is_deletable(),
+ rc.is_nonzero(),
+ rc.is_deletable(),
);
}
- if exists && needed.is_deletable() {
+ if exists && rc.is_deletable() {
info!("Resync block {:?}: offloading and deleting", hash);
+ let existing_path = existing_path.unwrap();
let mut who = manager.replication.write_nodes(hash);
if who.len() < manager.replication.write_quorum() {
@@ -419,7 +422,7 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
- let block = manager.read_block(hash).await?;
+ let block = manager.read_block_from(hash, &existing_path).await?;
let (header, bytes) = block.into_parts();
let put_block_message = Req::new(BlockRpc::PutBlock {
hash: *hash,
@@ -451,7 +454,7 @@ impl BlockResyncManager {
manager.rc.clear_deleted_block_rc(hash)?;
}
- if needed.is_nonzero() && !exists {
+ if rc.is_nonzero() && !exists {
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash