aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs39
1 files changed, 38 insertions, 1 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index e0fbfe74..ea70b19c 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -645,6 +645,19 @@ impl BlockManager {
None
}
+ /// Rewrite a block at the primary location for its path and delete the old path.
+ /// Returns the number of bytes read/written
+ pub(crate) async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ ) -> Result<usize, Error> {
+ self.lock_mutate(hash)
+ .await
+ .fix_block_location(hash, wrong_path, self)
+ .await
+ }
+
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
@@ -683,6 +696,17 @@ impl BlockManagerLocked {
data: &DataBlock,
mgr: &BlockManager,
) -> Result<(), Error> {
+ let existing_path = mgr.find_block(hash).await;
+ self.write_block_inner(hash, data, mgr, existing_path).await
+ }
+
+ async fn write_block_inner(
+ &self,
+ hash: &Hash,
+ data: &DataBlock,
+ mgr: &BlockManager,
+ existing_path: Option<DataBlockPath>,
+ ) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
@@ -694,7 +718,7 @@ impl BlockManagerLocked {
tgt_path.set_extension("zst");
}
- let to_delete = match (mgr.find_block(hash).await, compressed) {
+ let to_delete = match (existing_path, compressed) {
// If the block is stored in the wrong directory,
// write it again at the correct path and delete the old path
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
@@ -716,6 +740,7 @@ impl BlockManagerLocked {
// If the block isn't stored already, just store what is given to us
(None, _) => None,
};
+ assert!(to_delete.as_ref() != Some(&tgt_path));
let mut path_tmp = tgt_path.clone();
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
@@ -792,6 +817,18 @@ impl BlockManagerLocked {
}
Ok(())
}
+
+ async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ mgr: &BlockManager,
+ ) -> Result<usize, Error> {
+ let data = mgr.read_block_from(hash, &wrong_path).await?;
+ self.write_block_inner(hash, &data, mgr, Some(wrong_path))
+ .await?;
+ Ok(data.inner_buffer().len())
+ }
}
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {