diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/lib.rs | 1 | ||||
-rw-r--r-- | src/block/manager.rs | 18 | ||||
-rw-r--r-- | src/block/rc.rs | 83 | ||||
-rw-r--r-- | src/block/repair.rs | 2 | ||||
-rw-r--r-- | src/block/resync.rs | 17 |
5 files changed, 102 insertions, 19 deletions
diff --git a/src/block/lib.rs b/src/block/lib.rs index 6c4711ef..944f0d83 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -11,3 +11,4 @@ mod metrics; mod rc; pub use block::zstd_encode; +pub use rc::CalculateRefcount; diff --git a/src/block/manager.rs b/src/block/manager.rs index eeacf8b9..8ee33096 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,7 +88,7 @@ pub struct BlockManager { mutation_lock: Vec<Mutex<BlockManagerLocked>>, - pub(crate) rc: BlockRc, + pub rc: BlockRc, pub resync: BlockResyncManager, pub(crate) system: Arc<System>, @@ -156,7 +156,7 @@ impl BlockManager { let metrics = BlockManagerMetrics::new( config.compression_level, - rc.rc.clone(), + rc.rc_table.clone(), resync.queue.clone(), resync.errors.clone(), ); @@ -229,6 +229,12 @@ impl BlockManager { } } + /// Initialization: set how block references are recalculated + /// for repair operations + pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) { + self.rc.recalc_rc.store(Some(Arc::new(recalc))); + } + /// Ask nodes that might have a (possibly compressed) block for it /// Return it as a stream with a header async fn rpc_get_raw_block_streaming( @@ -316,9 +322,9 @@ impl BlockManager { }; } - let msg = format!("Get block {:?}: no node returned a valid block", hash); - debug!("{}", msg); - Err(Error::Message(msg)) + let err = Error::MissingBlock(*hash); + debug!("{}", err); + Err(err) } // ---- Public interface ---- @@ -381,7 +387,7 @@ impl BlockManager { /// Get number of items in the refcount table pub fn rc_len(&self) -> Result<usize, Error> { - Ok(self.rc.rc.len()?) + Ok(self.rc.rc_table.len()?) } /// Send command to start/stop/manager scrub worker diff --git a/src/block/rc.rs b/src/block/rc.rs index b6afb277..4a55ee29 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -1,5 +1,7 @@ use std::convert::TryInto; +use arc_swap::ArcSwapOption; + use garage_db as db; use garage_util::data::*; @@ -8,13 +10,20 @@ use garage_util::time::*; use crate::manager::BLOCK_GC_DELAY; +pub type CalculateRefcount = + Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>; + pub struct BlockRc { - pub(crate) rc: db::Tree, + pub rc_table: db::Tree, + pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>, } impl BlockRc { pub(crate) fn new(rc: db::Tree) -> Self { - Self { rc } + Self { + rc_table: rc, + recalc_rc: ArcSwapOption::new(None), + } } /// Increment the reference counter associated to a hash. @@ -24,9 +33,9 @@ impl BlockRc { tx: &mut db::Transaction, hash: &Hash, ) -> db::TxOpResult<bool> { - let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?); + let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?); match old_rc.increment().serialize() { - Some(x) => tx.insert(&self.rc, hash, x)?, + Some(x) => tx.insert(&self.rc_table, hash, x)?, None => unreachable!(), }; Ok(old_rc.is_zero()) @@ -39,28 +48,28 @@ impl BlockRc { tx: &mut db::Transaction, hash: &Hash, ) -> db::TxOpResult<bool> { - let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement(); + let new_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?).decrement(); match new_rc.serialize() { - Some(x) => tx.insert(&self.rc, hash, x)?, - None => tx.remove(&self.rc, hash)?, + Some(x) => tx.insert(&self.rc_table, hash, x)?, + None => tx.remove(&self.rc_table, hash)?, }; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } /// Read a block's reference count pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> { - Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?)) + Ok(RcEntry::parse_opt(self.rc_table.get(hash.as_ref())?)) } /// Delete an entry in the RC table if it is deletable and the /// deletion time has passed pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); - self.rc.db().transaction(|tx| { - let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?); + self.rc_table.db().transaction(|tx| { + let rcval = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?); match rcval { RcEntry::Deletable { at_time } if now > at_time => { - tx.remove(&self.rc, hash)?; + tx.remove(&self.rc_table, hash)?; } _ => (), }; @@ -68,6 +77,58 @@ impl BlockRc { })?; Ok(()) } + + /// Recalculate the reference counter of a block + /// to fix potential inconsistencies + pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> { + if let Some(recalc_fns) = self.recalc_rc.load().as_ref() { + trace!("Repair block RC for {:?}", hash); + let res = self + .rc_table + .db() + .transaction(|tx| { + let mut cnt = 0; + for f in recalc_fns.iter() { + cnt += f(&tx, hash)?; + } + let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?); + trace!( + "Block RC for {:?}: stored={}, calculated={}", + hash, + old_rc.as_u64(), + cnt + ); + if cnt as u64 != old_rc.as_u64() { + warn!( + "Fixing inconsistent block RC for {:?}: was {}, should be {}", + hash, + old_rc.as_u64(), + cnt + ); + let new_rc = if cnt > 0 { + RcEntry::Present { count: cnt as u64 } + } else { + RcEntry::Deletable { + at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, + } + }; + tx.insert(&self.rc_table, hash, new_rc.serialize().unwrap())?; + Ok((cnt, true)) + } else { + Ok((cnt, false)) + } + }) + .map_err(Error::from); + if let Err(e) = &res { + error!("Failed to fix RC for block {:?}: {}", hash, e); + } + res + } else { + Err(Error::Message( + "Block RC recalculation is not available at this point".into(), + )) + } + } } /// Describes the state of the reference counter for a block diff --git a/src/block/repair.rs b/src/block/repair.rs index 2c8acbc9..ef271094 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -107,7 +107,7 @@ impl Worker for RepairWorker { for entry in self .manager .rc - .rc + .rc_table .range::<&[u8], _>((start_bound, Bound::Unbounded))? { let (hash, _) = entry?; diff --git a/src/block/resync.rs b/src/block/resync.rs index 48c2cef1..b4108213 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -367,6 +367,13 @@ impl BlockResyncManager { } if exists && rc.is_deletable() { + if manager.rc.recalculate_rc(hash)?.0 > 0 { + return Err(Error::Message(format!( + "Refcount for block {:?} was inconsistent, retrying later", + hash + ))); + } + info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); @@ -453,7 +460,15 @@ impl BlockResyncManager { hash ); - let block_data = manager.rpc_get_raw_block(hash, None).await?; + let block_data = manager.rpc_get_raw_block(hash, None).await; + if matches!(block_data, Err(Error::MissingBlock(_))) { + warn!( + "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.", + hash + ); + manager.rc.recalculate_rc(hash)?; + } + let block_data = block_data?; manager.metrics.resync_recv_counter.add(1); |