aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-05 14:27:39 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-06 16:35:28 +0200
commit887b3233f45ade24def08b3faa2d6da5fe85a3a1 (patch)
tree90d50a4eb4794aac04eb1f2a5a784aca57b13bae /src/block/manager.rs
parent6c420c0880de742b2b6416da1178df828fd977bf (diff)
downloadgarage-887b3233f45ade24def08b3faa2d6da5fe85a3a1.tar.gz
garage-887b3233f45ade24def08b3faa2d6da5fe85a3a1.zip
block manager: use data paths from layout
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs147
1 files changed, 75 insertions, 72 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 45729a00..73fefa0c 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -543,21 +543,25 @@ impl BlockManager {
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let mut path = self.block_path(hash);
- let compressed = match self.is_block_compressed(hash).await {
- Ok(c) => c,
- Err(e) => {
+ let block_path = match self.find_block(hash).await {
+ Some(p) => p,
+ None => {
// Not found but maybe we should have had it ??
self.resync
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
- return Err(Into::into(e));
+ return Err(Error::Message(format!(
+ "block {:?} not found on node",
+ hash
+ )));
}
};
- if compressed {
- path.set_extension("zst");
- }
- let mut f = fs::File::open(&path).await?;
+ let (path, compressed) = match &block_path {
+ DataBlockPath::Plain(p) => (p, false),
+ DataBlockPath::Compressed(p) => (p, true),
+ };
+
+ let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
drop(f);
@@ -571,11 +575,16 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
self.lock_mutate(hash)
.await
- .move_block_to_corrupted(hash, self)
+ .move_block_to_corrupted(&block_path)
.await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
+
return Err(Error::CorruptData(*hash));
}
@@ -604,56 +613,51 @@ impl BlockManager {
.await
}
- /// Utility: gives the path of the directory in which a block should be found
- fn block_dir(&self, hash: &Hash) -> PathBuf {
- self.data_layout.primary_data_dir(hash)
- }
-
- /// Utility: give the full path where a block should be found, minus extension if block is
- /// compressed
- fn block_path(&self, hash: &Hash) -> PathBuf {
- let mut path = self.block_dir(hash);
- path.push(hex::encode(hash.as_ref()));
- path
- }
+ /// Utility: check if block is stored compressed.
+ async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
+ let dirs = Some(self.data_layout.primary_block_dir(hash))
+ .into_iter()
+ .chain(self.data_layout.secondary_block_dirs(hash));
+ let filename = hex::encode(hash.as_ref());
- /// Utility: check if block is stored compressed. Error if block is not stored
- async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
- let mut path = self.block_path(hash);
+ for dir in dirs {
+ let mut path = dir;
+ path.push(&filename);
- // If compression is disabled on node - check for the raw block
- // first and then a compressed one (as compression may have been
- // previously enabled).
- match self.compression_level {
- None => {
+ if self.compression_level.is_none() {
+ // If compression is disabled on node - check for the raw block
+ // first and then a compressed one (as compression may have been
+ // previously enabled).
if fs::metadata(&path).await.is_ok() {
- return Ok(false);
+ return Some(DataBlockPath::Plain(path));
}
-
path.set_extension("zst");
-
- fs::metadata(&path).await.map(|_| true).map_err(Into::into)
- }
- _ => {
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Compressed(path));
+ }
+ } else {
path.set_extension("zst");
-
if fs::metadata(&path).await.is_ok() {
- return Ok(true);
+ return Some(DataBlockPath::Compressed(path));
}
-
path.set_extension("");
-
- fs::metadata(&path).await.map(|_| false).map_err(Into::into)
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Plain(path));
+ }
}
}
+
+ None
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
- self.mutation_lock[hash.as_slice()[0] as usize]
+ let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
+ % self.mutation_lock.len();
+ self.mutation_lock[ilock]
.lock()
.with_context(Context::current_with_span(
- tracer.start("Acquire mutation_lock"),
+ tracer.start(format!("Acquire mutation_lock #{}", ilock)),
))
.await
}
@@ -688,7 +692,7 @@ impl BlockManagerLocked {
hash: &Hash,
mgr: &BlockManager,
) -> Result<BlockStatus, Error> {
- let exists = mgr.is_block_compressed(hash).await.is_ok();
+ let exists = mgr.find_block(hash).await.is_some();
let needed = mgr.rc.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed })
@@ -703,21 +707,17 @@ impl BlockManagerLocked {
let compressed = data.is_compressed();
let data = data.inner_buffer();
- let mut path = mgr.block_dir(hash);
+ let mut path = mgr.data_layout.primary_block_dir(hash);
let directory = path.clone();
path.push(hex::encode(hash));
fs::create_dir_all(&directory).await?;
- let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
- (Ok(true), _) => return Ok(()),
- (Ok(false), false) => return Ok(()),
- (Ok(false), true) => {
- let path_to_delete = path.clone();
- path.set_extension("zst");
- Some(path_to_delete)
- }
- (Err(_), compressed) => {
+ let to_delete = match (mgr.find_block(hash).await, compressed) {
+ (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
+ (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
+ (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
+ (None, compressed) => {
if compressed {
path.set_extension("zst");
}
@@ -766,19 +766,20 @@ impl BlockManagerLocked {
Ok(())
}
- async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- warn!(
- "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
- hash
- );
- let mut path = mgr.block_path(hash);
- let mut path2 = path.clone();
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
- path2.set_extension("zst.corrupted");
- } else {
- path2.set_extension("corrupted");
- }
+ async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
+ let (path, path2) = match block_path {
+ DataBlockPath::Plain(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("corrupted");
+ (p, p2)
+ }
+ DataBlockPath::Compressed(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("zst.corrupted");
+ (p, p2)
+ }
+ };
+
fs::rename(path, path2).await?;
Ok(())
}
@@ -787,12 +788,14 @@ impl BlockManagerLocked {
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
if exists && needed.is_deletable() {
- let mut path = mgr.block_path(hash);
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
+ let path_opt = match mgr.find_block(hash).await {
+ Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p),
+ None => None,
+ };
+ if let Some(path) = path_opt {
+ fs::remove_file(path).await?;
+ mgr.metrics.delete_counter.add(1);
}
- fs::remove_file(path).await?;
- mgr.metrics.delete_counter.add(1);
}
Ok(())
}