aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs14
-rw-r--r--src/block/rc.rs65
-rw-r--r--src/block/resync.rs17
4 files changed, 90 insertions, 7 deletions
diff --git a/src/block/lib.rs b/src/block/lib.rs
index 6c4711ef..944f0d83 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -11,3 +11,4 @@ mod metrics;
mod rc;
pub use block::zstd_encode;
+pub use rc::CalculateRefcount;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index eeacf8b9..628ffc71 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -88,7 +88,7 @@ pub struct BlockManager {
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
- pub(crate) rc: BlockRc,
+ pub rc: BlockRc,
pub resync: BlockResyncManager,
pub(crate) system: Arc<System>,
@@ -229,6 +229,12 @@ impl BlockManager {
}
}
+ /// Initialization: set how block references are recalculated
+ /// for repair operations
+ pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
+ self.rc.recalc_rc.store(Some(Arc::new(recalc)));
+ }
+
/// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
@@ -316,9 +322,9 @@ impl BlockManager {
};
}
- let msg = format!("Get block {:?}: no node returned a valid block", hash);
- debug!("{}", msg);
- Err(Error::Message(msg))
+ let err = Error::MissingBlock(*hash);
+ debug!("{}", err);
+ Err(err)
}
// ---- Public interface ----
diff --git a/src/block/rc.rs b/src/block/rc.rs
index b6afb277..bf5aeced 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use arc_swap::ArcSwapOption;
+
use garage_db as db;
use garage_util::data::*;
@@ -8,13 +10,20 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
+pub type CalculateRefcount =
+ Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
+
pub struct BlockRc {
- pub(crate) rc: db::Tree,
+ pub rc: db::Tree,
+ pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
}
impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self {
- Self { rc }
+ Self {
+ rc,
+ recalc_rc: ArcSwapOption::new(None),
+ }
}
/// Increment the reference counter associated to a hash.
@@ -68,6 +77,58 @@ impl BlockRc {
})?;
Ok(())
}
+
+ /// Recalculate the reference counter of a block
+ /// to fix potential inconsistencies
+ pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
+ if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
+ trace!("Repair block RC for {:?}", hash);
+ let res = self
+ .rc
+ .db()
+ .transaction(|tx| {
+ let mut cnt = 0;
+ for f in recalc_fns.iter() {
+ cnt += f(&tx, hash)?;
+ }
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ trace!(
+ "Block RC for {:?}: stored={}, calculated={}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ if cnt as u64 != old_rc.as_u64() {
+ warn!(
+ "Fixing inconsistent block RC for {:?}: was {}, should be {}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ let new_rc = if cnt > 0 {
+ RcEntry::Present { count: cnt as u64 }
+ } else {
+ RcEntry::Deletable {
+ at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
+ }
+ };
+ tx.insert(&self.rc, hash, new_rc.serialize().unwrap())?;
+ Ok((cnt, true))
+ } else {
+ Ok((cnt, false))
+ }
+ })
+ .map_err(Error::from);
+ if let Err(e) = &res {
+ error!("Failed to fix RC for block {:?}: {}", hash, e);
+ }
+ res
+ } else {
+ Err(Error::Message(
+ "Block RC recalculation is not available at this point".into(),
+ ))
+ }
+ }
}
/// Describes the state of the reference counter for a block
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 48c2cef1..b4108213 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -367,6 +367,13 @@ impl BlockResyncManager {
}
if exists && rc.is_deletable() {
+ if manager.rc.recalculate_rc(hash)?.0 > 0 {
+ return Err(Error::Message(format!(
+ "Refcount for block {:?} was inconsistent, retrying later",
+ hash
+ )));
+ }
+
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
@@ -453,7 +460,15 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await?;
+ let block_data = manager.rpc_get_raw_block(hash, None).await;
+ if matches!(block_data, Err(Error::MissingBlock(_))) {
+ warn!(
+ "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
+ hash
+ );
+ manager.rc.recalculate_rc(hash)?;
+ }
+ let block_data = block_data?;
manager.metrics.resync_recv_counter.add(1);