aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs18
-rw-r--r--src/block/rc.rs83
-rw-r--r--src/block/repair.rs2
-rw-r--r--src/block/resync.rs17
5 files changed, 102 insertions, 19 deletions
diff --git a/src/block/lib.rs b/src/block/lib.rs
index 6c4711ef..944f0d83 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -11,3 +11,4 @@ mod metrics;
mod rc;
pub use block::zstd_encode;
+pub use rc::CalculateRefcount;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index eeacf8b9..8ee33096 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -88,7 +88,7 @@ pub struct BlockManager {
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
- pub(crate) rc: BlockRc,
+ pub rc: BlockRc,
pub resync: BlockResyncManager,
pub(crate) system: Arc<System>,
@@ -156,7 +156,7 @@ impl BlockManager {
let metrics = BlockManagerMetrics::new(
config.compression_level,
- rc.rc.clone(),
+ rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
);
@@ -229,6 +229,12 @@ impl BlockManager {
}
}
+ /// Initialization: set how block references are recalculated
+ /// for repair operations
+ pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
+ self.rc.recalc_rc.store(Some(Arc::new(recalc)));
+ }
+
/// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
@@ -316,9 +322,9 @@ impl BlockManager {
};
}
- let msg = format!("Get block {:?}: no node returned a valid block", hash);
- debug!("{}", msg);
- Err(Error::Message(msg))
+ let err = Error::MissingBlock(*hash);
+ debug!("{}", err);
+ Err(err)
}
// ---- Public interface ----
@@ -381,7 +387,7 @@ impl BlockManager {
/// Get number of items in the refcount table
pub fn rc_len(&self) -> Result<usize, Error> {
- Ok(self.rc.rc.len()?)
+ Ok(self.rc.rc_table.len()?)
}
/// Send command to start/stop/manager scrub worker
diff --git a/src/block/rc.rs b/src/block/rc.rs
index b6afb277..4a55ee29 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use arc_swap::ArcSwapOption;
+
use garage_db as db;
use garage_util::data::*;
@@ -8,13 +10,20 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
+pub type CalculateRefcount =
+ Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
+
pub struct BlockRc {
- pub(crate) rc: db::Tree,
+ pub rc_table: db::Tree,
+ pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
}
impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self {
- Self { rc }
+ Self {
+ rc_table: rc,
+ recalc_rc: ArcSwapOption::new(None),
+ }
}
/// Increment the reference counter associated to a hash.
@@ -24,9 +33,9 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match old_rc.increment().serialize() {
- Some(x) => tx.insert(&self.rc, hash, x)?,
+ Some(x) => tx.insert(&self.rc_table, hash, x)?,
None => unreachable!(),
};
Ok(old_rc.is_zero())
@@ -39,28 +48,28 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?).decrement();
match new_rc.serialize() {
- Some(x) => tx.insert(&self.rc, hash, x)?,
- None => tx.remove(&self.rc, hash)?,
+ Some(x) => tx.insert(&self.rc_table, hash, x)?,
+ None => tx.remove(&self.rc_table, hash)?,
};
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
/// Read a block's reference count
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
- Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
+ Ok(RcEntry::parse_opt(self.rc_table.get(hash.as_ref())?))
}
/// Delete an entry in the RC table if it is deletable and the
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.db().transaction(|tx| {
- let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ self.rc_table.db().transaction(|tx| {
+ let rcval = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
- tx.remove(&self.rc, hash)?;
+ tx.remove(&self.rc_table, hash)?;
}
_ => (),
};
@@ -68,6 +77,58 @@ impl BlockRc {
})?;
Ok(())
}
+
+ /// Recalculate the reference counter of a block
+ /// to fix potential inconsistencies
+ pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
+ if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
+ trace!("Repair block RC for {:?}", hash);
+ let res = self
+ .rc_table
+ .db()
+ .transaction(|tx| {
+ let mut cnt = 0;
+ for f in recalc_fns.iter() {
+ cnt += f(&tx, hash)?;
+ }
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
+ trace!(
+ "Block RC for {:?}: stored={}, calculated={}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ if cnt as u64 != old_rc.as_u64() {
+ warn!(
+ "Fixing inconsistent block RC for {:?}: was {}, should be {}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ let new_rc = if cnt > 0 {
+ RcEntry::Present { count: cnt as u64 }
+ } else {
+ RcEntry::Deletable {
+ at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
+ }
+ };
+ tx.insert(&self.rc_table, hash, new_rc.serialize().unwrap())?;
+ Ok((cnt, true))
+ } else {
+ Ok((cnt, false))
+ }
+ })
+ .map_err(Error::from);
+ if let Err(e) = &res {
+ error!("Failed to fix RC for block {:?}: {}", hash, e);
+ }
+ res
+ } else {
+ Err(Error::Message(
+ "Block RC recalculation is not available at this point".into(),
+ ))
+ }
+ }
}
/// Describes the state of the reference counter for a block
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 2c8acbc9..ef271094 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -107,7 +107,7 @@ impl Worker for RepairWorker {
for entry in self
.manager
.rc
- .rc
+ .rc_table
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 48c2cef1..b4108213 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -367,6 +367,13 @@ impl BlockResyncManager {
}
if exists && rc.is_deletable() {
+ if manager.rc.recalculate_rc(hash)?.0 > 0 {
+ return Err(Error::Message(format!(
+ "Refcount for block {:?} was inconsistent, retrying later",
+ hash
+ )));
+ }
+
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
@@ -453,7 +460,15 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await?;
+ let block_data = manager.rpc_get_raw_block(hash, None).await;
+ if matches!(block_data, Err(Error::MissingBlock(_))) {
+ warn!(
+ "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
+ hash
+ );
+ manager.rc.recalculate_rc(hash)?;
+ }
+ let block_data = block_data?;
manager.metrics.resync_recv_counter.add(1);