aboutsummaryrefslogtreecommitdiff
path: root/src/block/rc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/rc.rs')
-rw-r--r--src/block/rc.rs65
1 files changed, 63 insertions, 2 deletions
diff --git a/src/block/rc.rs b/src/block/rc.rs
index b6afb277..bf5aeced 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use arc_swap::ArcSwapOption;
+
use garage_db as db;
use garage_util::data::*;
@@ -8,13 +10,20 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
+pub type CalculateRefcount =
+ Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
+
pub struct BlockRc {
- pub(crate) rc: db::Tree,
+ pub rc: db::Tree,
+ pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
}
impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self {
- Self { rc }
+ Self {
+ rc,
+ recalc_rc: ArcSwapOption::new(None),
+ }
}
/// Increment the reference counter associated to a hash.
@@ -68,6 +77,58 @@ impl BlockRc {
})?;
Ok(())
}
+
+ /// Recalculate the reference counter of a block
+ /// to fix potential inconsistencies
+ pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
+ if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
+ trace!("Repair block RC for {:?}", hash);
+ let res = self
+ .rc
+ .db()
+ .transaction(|tx| {
+ let mut cnt = 0;
+ for f in recalc_fns.iter() {
+ cnt += f(&tx, hash)?;
+ }
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ trace!(
+ "Block RC for {:?}: stored={}, calculated={}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ if cnt as u64 != old_rc.as_u64() {
+ warn!(
+ "Fixing inconsistent block RC for {:?}: was {}, should be {}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ let new_rc = if cnt > 0 {
+ RcEntry::Present { count: cnt as u64 }
+ } else {
+ RcEntry::Deletable {
+ at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
+ }
+ };
+ tx.insert(&self.rc, hash, new_rc.serialize().unwrap())?;
+ Ok((cnt, true))
+ } else {
+ Ok((cnt, false))
+ }
+ })
+ .map_err(Error::from);
+ if let Err(e) = &res {
+ error!("Failed to fix RC for block {:?}: {}", hash, e);
+ }
+ res
+ } else {
+ Err(Error::Message(
+ "Block RC recalculation is not available at this point".into(),
+ ))
+ }
+ }
}
/// Describes the state of the reference counter for a block