aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs14
-rw-r--r--src/block/rc.rs65
-rw-r--r--src/block/resync.rs17
-rw-r--r--src/garage/cli/structs.rs5
-rw-r--r--src/garage/repair/online.rs106
-rw-r--r--src/model/garage.rs8
-rw-r--r--src/model/s3/block_ref_table.rs39
-rw-r--r--src/util/data.rs35
-rw-r--r--src/util/error.rs3
10 files changed, 285 insertions, 8 deletions
diff --git a/src/block/lib.rs b/src/block/lib.rs
index 6c4711ef..944f0d83 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -11,3 +11,4 @@ mod metrics;
mod rc;
pub use block::zstd_encode;
+pub use rc::CalculateRefcount;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index eeacf8b9..628ffc71 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -88,7 +88,7 @@ pub struct BlockManager {
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
- pub(crate) rc: BlockRc,
+ pub rc: BlockRc,
pub resync: BlockResyncManager,
pub(crate) system: Arc<System>,
@@ -229,6 +229,12 @@ impl BlockManager {
}
}
+ /// Initialization: set how block references are recalculated
+ /// for repair operations
+ pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
+ self.rc.recalc_rc.store(Some(Arc::new(recalc)));
+ }
+
/// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
@@ -316,9 +322,9 @@ impl BlockManager {
};
}
- let msg = format!("Get block {:?}: no node returned a valid block", hash);
- debug!("{}", msg);
- Err(Error::Message(msg))
+ let err = Error::MissingBlock(*hash);
+ debug!("{}", err);
+ Err(err)
}
// ---- Public interface ----
diff --git a/src/block/rc.rs b/src/block/rc.rs
index b6afb277..bf5aeced 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use arc_swap::ArcSwapOption;
+
use garage_db as db;
use garage_util::data::*;
@@ -8,13 +10,20 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
+pub type CalculateRefcount =
+ Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
+
pub struct BlockRc {
- pub(crate) rc: db::Tree,
+ pub rc: db::Tree,
+ pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
}
impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self {
- Self { rc }
+ Self {
+ rc,
+ recalc_rc: ArcSwapOption::new(None),
+ }
}
/// Increment the reference counter associated to a hash.
@@ -68,6 +77,58 @@ impl BlockRc {
})?;
Ok(())
}
+
+ /// Recalculate the reference counter of a block
+ /// to fix potential inconsistencies
+ pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
+ if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
+ trace!("Repair block RC for {:?}", hash);
+ let res = self
+ .rc
+ .db()
+ .transaction(|tx| {
+ let mut cnt = 0;
+ for f in recalc_fns.iter() {
+ cnt += f(&tx, hash)?;
+ }
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
+ trace!(
+ "Block RC for {:?}: stored={}, calculated={}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ if cnt as u64 != old_rc.as_u64() {
+ warn!(
+ "Fixing inconsistent block RC for {:?}: was {}, should be {}",
+ hash,
+ old_rc.as_u64(),
+ cnt
+ );
+ let new_rc = if cnt > 0 {
+ RcEntry::Present { count: cnt as u64 }
+ } else {
+ RcEntry::Deletable {
+ at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
+ }
+ };
+ tx.insert(&self.rc, hash, new_rc.serialize().unwrap())?;
+ Ok((cnt, true))
+ } else {
+ Ok((cnt, false))
+ }
+ })
+ .map_err(Error::from);
+ if let Err(e) = &res {
+ error!("Failed to fix RC for block {:?}: {}", hash, e);
+ }
+ res
+ } else {
+ Err(Error::Message(
+ "Block RC recalculation is not available at this point".into(),
+ ))
+ }
+ }
}
/// Describes the state of the reference counter for a block
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 48c2cef1..b4108213 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -367,6 +367,13 @@ impl BlockResyncManager {
}
if exists && rc.is_deletable() {
+ if manager.rc.recalculate_rc(hash)?.0 > 0 {
+ return Err(Error::Message(format!(
+ "Refcount for block {:?} was inconsistent, retrying later",
+ hash
+ )));
+ }
+
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
@@ -453,7 +460,15 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await?;
+ let block_data = manager.rpc_get_raw_block(hash, None).await;
+ if matches!(block_data, Err(Error::MissingBlock(_))) {
+ warn!(
+ "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
+ hash
+ );
+ manager.rc.recalculate_rc(hash)?;
+ }
+ let block_data = block_data?;
manager.metrics.resync_recv_counter.add(1);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 1f572a9a..8380b5e2 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -473,8 +473,11 @@ pub enum RepairWhat {
#[structopt(name = "mpu", version = garage_version())]
MultipartUploads,
/// Repropagate version deletions to the block ref table
- #[structopt(name = "block_refs", version = garage_version())]
+ #[structopt(name = "block-refs", version = garage_version())]
BlockRefs,
+ /// Recalculate block reference counters
+ #[structopt(name = "block-rc", version = garage_version())]
+ BlockRc,
/// Verify integrity of all blocks on disc
#[structopt(name = "scrub", version = garage_version())]
Scrub {
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 9e4de873..ecccdf6d 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -4,6 +4,7 @@ use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
+use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
@@ -16,11 +17,14 @@ use garage_table::replication::*;
use garage_table::*;
use garage_util::background::*;
+use garage_util::data::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
use crate::*;
+const RC_REPAIR_ITER_COUNT: usize = 64;
+
pub async fn launch_online_repair(
garage: &Arc<Garage>,
bg: &BackgroundRunner,
@@ -47,6 +51,13 @@ pub async fn launch_online_repair(
info!("Repairing the block refs table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
+ RepairWhat::BlockRc => {
+ info!("Repairing the block reference counters");
+ bg.spawn_worker(BlockRcRepair::new(
+ garage.block_manager.clone(),
+ garage.block_ref_table.clone(),
+ ));
+ }
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
bg.spawn_worker(garage_block::repair::RepairWorker::new(
@@ -282,3 +293,98 @@ impl TableRepair for RepairMpu {
Ok(false)
}
}
+
+// ===== block reference counter repair =====
+
+pub struct BlockRcRepair {
+ block_manager: Arc<BlockManager>,
+ block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ cursor: Hash,
+ counter: u64,
+ repairs: u64,
+}
+
+impl BlockRcRepair {
+ fn new(
+ block_manager: Arc<BlockManager>,
+ block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ ) -> Self {
+ Self {
+ block_manager,
+ block_ref_table,
+ cursor: [0u8; 32].into(),
+ counter: 0,
+ repairs: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for BlockRcRepair {
+ fn name(&self) -> String {
+ format!("Block refcount repair worker")
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(format!("{} ({})", self.counter, self.repairs)),
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ for _i in 0..RC_REPAIR_ITER_COUNT {
+ let next1 = self
+ .block_manager
+ .rc
+ .rc
+ .range(self.cursor.as_slice()..)?
+ .next()
+ .transpose()?
+ .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap());
+ let next2 = self
+ .block_ref_table
+ .data
+ .store
+ .range(self.cursor.as_slice()..)?
+ .next()
+ .transpose()?
+ .map(|(k, _)| Hash::try_from(&k[..32]).unwrap());
+ let next = match (next1, next2) {
+ (Some(k1), Some(k2)) => std::cmp::min(k1, k2),
+ (Some(k), None) | (None, Some(k)) => k,
+ (None, None) => {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ if self.block_manager.rc.recalculate_rc(&next)?.1 {
+ self.repairs += 1;
+ }
+ self.counter += 1;
+ if let Some(next_incr) = next.increment() {
+ self.cursor = next_incr;
+ } else {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
+}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 4405d22d..273690db 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -247,6 +247,14 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
+ // ---- setup block refcount recalculation ----
+ // this function can be used to fix inconsistencies in the RC table
+ block_manager.set_recalc_rc(vec![
+ block_ref_recount_fn(&block_ref_table),
+ // other functions could be added here if we had other tables
+ // that hold references to data blocks
+ ]);
+
// -- done --
Ok(Arc::new(Self {
config,
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index 7b023d87..57eb7b16 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -3,8 +3,12 @@ use std::sync::Arc;
use garage_db as db;
use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::migrate::Migrate;
+use garage_block::CalculateRefcount;
use garage_table::crdt::Crdt;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use garage_block::manager::*;
@@ -84,3 +88,38 @@ impl TableSchema for BlockRefTable {
filter.apply(entry.deleted.get())
}
}
+
+pub fn block_ref_recount_fn(
+ block_ref_table: &Arc<Table<BlockRefTable, TableShardedReplication>>,
+) -> CalculateRefcount {
+ let table = Arc::downgrade(block_ref_table);
+ Box::new(move |tx: &db::Transaction, block: &Hash| {
+ let table = table
+ .upgrade()
+ .ok_or_message("cannot upgrade weak ptr to block_ref_table")
+ .map_err(db::TxError::Abort)?;
+ Ok(calculate_refcount(&table, tx, block)?)
+ })
+}
+
+fn calculate_refcount(
+ block_ref_table: &Table<BlockRefTable, TableShardedReplication>,
+ tx: &db::Transaction,
+ block: &Hash,
+) -> db::TxResult<usize, Error> {
+ let mut result = 0;
+ for entry in tx.range(&block_ref_table.data.store, block.as_slice()..)? {
+ let (key, value) = entry?;
+ if &key[..32] != block.as_slice() {
+ break;
+ }
+ let value = BlockRef::decode(&value)
+ .ok_or_message("could not decode block_ref")
+ .map_err(db::TxError::Abort)?;
+ assert_eq!(value.block, *block);
+ if !value.deleted.get() {
+ result += 1;
+ }
+ }
+ Ok(result)
+}
diff --git a/src/util/data.rs b/src/util/data.rs
index 2579fd1b..1fe7dfe0 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -83,6 +83,19 @@ impl FixedBytes32 {
ret.copy_from_slice(by);
Some(Self(ret))
}
+ /// Return the next hash
+ pub fn increment(&self) -> Option<Self> {
+ let mut ret = *self;
+ for byte in ret.0.iter_mut().rev() {
+ if *byte == u8::MAX {
+ *byte = 0;
+ } else {
+ *byte = *byte + 1;
+ return Some(ret);
+ }
+ }
+ return None;
+ }
}
impl From<garage_net::NodeID> for FixedBytes32 {
@@ -140,3 +153,25 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_increment() {
+ let zero: FixedBytes32 = [0u8; 32].into();
+ let mut one: FixedBytes32 = [0u8; 32].into();
+ one.0[31] = 1;
+ let max: FixedBytes32 = [0xFFu8; 32].into();
+ assert_eq!(zero.increment(), Some(one));
+ assert_eq!(max.increment(), None);
+
+ let mut test: FixedBytes32 = [0u8; 32].into();
+ let i = 0x198DF97209F8FFFFu64;
+ test.0[24..32].copy_from_slice(&u64::to_be_bytes(i));
+ let mut test2: FixedBytes32 = [0u8; 32].into();
+ test2.0[24..32].copy_from_slice(&u64::to_be_bytes(i + 1));
+ assert_eq!(test.increment(), Some(test2));
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index da9eda10..75fd3f9c 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -70,6 +70,9 @@ pub enum Error {
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
CorruptData(Hash),
+ #[error(display = "Missing block {:?}: no node returned a valid block", _0)]
+ MissingBlock(Hash),
+
#[error(display = "{}", _0)]
Message(String),
}