aboutsummaryrefslogtreecommitdiff
path: root/src/block/rc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/rc.rs')
-rw-r--r--src/block/rc.rs83
1 files changed, 72 insertions, 11 deletions
diff --git a/src/block/rc.rs b/src/block/rc.rs
index b6afb277..4a55ee29 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use arc_swap::ArcSwapOption;
+
use garage_db as db;
use garage_util::data::*;
@@ -8,13 +10,20 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
+pub type CalculateRefcount =
+ Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
+
pub struct BlockRc {
- pub(crate) rc: db::Tree,
+ pub rc_table: db::Tree,
+ pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
}
impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self {
- Self { rc }
+ Self {
+ rc_table: rc,
+ recalc_rc: ArcSwapOption::new(None),
+ }
}
/// Increment the reference counter associated to a hash.
@@ -24,9 +33,9 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match old_rc.increment().serialize() {
- Some(x) => tx.insert(&self.rc, hash, x)?,
+ Some(x) => tx.insert(&self.rc_table, hash, x)?,
None => unreachable!(),
};
Ok(old_rc.is_zero())
@@ -39,28 +48,28 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?).decrement();
match new_rc.serialize() {
- Some(x) => tx.insert(&self.rc, hash, x)?,
- None => tx.remove(&self.rc, hash)?,
+ Some(x) => tx.insert(&self.rc_table, hash, x)?,
+ None => tx.remove(&self.rc_table, hash)?,
};
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
/// Read a block's reference count
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
- Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
+ Ok(RcEntry::parse_opt(self.rc_table.get(hash.as_ref())?))
}
/// Delete an entry in the RC table if it is deletable and the
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.db().transaction(|tx| {
- let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ self.rc_table.db().transaction(|tx| {
+ let rcval = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
- tx.remove(&self.rc, hash)?;
+ tx.remove(&self.rc_table, hash)?;
}
_ => (),
};
@@ -68,6 +77,58 @@ impl BlockRc {
})?;
Ok(())
}
+
+ /// Recalculate the reference counter of a block
+ /// to fix potential inconsistencies
+ pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
+ if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
+ trace!("Repair block RC for {:?}", hash);
+ let res = self
+ .rc_table
+ .db()
+ .transaction(|tx| {
+ let mut cnt = 0;
+ for f in recalc_fns.iter() {
+ cnt += f(&tx, hash)?;
+ }
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
+ trace!(
+ "Block RC for {:?}: stored={}, calculated={}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ if cnt as u64 != old_rc.as_u64() {
+ warn!(
+ "Fixing inconsistent block RC for {:?}: was {}, should be {}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ let new_rc = if cnt > 0 {
+ RcEntry::Present { count: cnt as u64 }
+ } else {
+ RcEntry::Deletable {
+ at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
+ }
+ };
+ tx.insert(&self.rc_table, hash, new_rc.serialize().unwrap())?;
+ Ok((cnt, true))
+ } else {
+ Ok((cnt, false))
+ }
+ })
+ .map_err(Error::from);
+ if let Err(e) = &res {
+ error!("Failed to fix RC for block {:?}: {}", hash, e);
+ }
+ res
+ } else {
+ Err(Error::Message(
+ "Block RC recalculation is not available at this point".into(),
+ ))
+ }
+ }
}
/// Describes the state of the reference counter for a block