aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-03-15 12:23:33 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-23 10:25:19 +0100
commit8fd6745745f5676b5e80920792fa23453f3a20d7 (patch)
tree1c463c705a2ddbe7b73ce3ee87d3ce3e5a8b0fea /src/block/manager.rs
parentc3982a90b6b9cece7c765070c2c2be22c816ff70 (diff)
downloadgarage-8fd6745745f5676b5e80920792fa23453f3a20d7.tar.gz
garage-8fd6745745f5676b5e80920792fa23453f3a20d7.zip
Move block RC code to separate `rc.rs`
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs151
1 files changed, 15 insertions, 136 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 9665a306..feb6fb9d 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -31,6 +31,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::metrics::*;
use crate::block::*;
+use crate::rc::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -51,7 +52,7 @@ const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
-const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
+pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
@@ -86,7 +87,7 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- rc: sled::Tree,
+ pub rc: BlockRc,
resync_queue: SledCountedTree,
resync_notify: Notify,
@@ -114,6 +115,7 @@ impl BlockManager {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
+ let rc = BlockRc::new(rc);
let resync_queue = db
.open_tree("block_local_resync_queue")
@@ -213,7 +215,7 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
- for (i, entry) in self.rc.iter().enumerate() {
+ for (i, entry) in self.rc.rc.iter().enumerate() {
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
self.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -261,7 +263,7 @@ impl BlockManager {
/// Get number of items in the refcount table
pub fn rc_len(&self) -> usize {
- self.rc.len()
+ self.rc.rc.len()
}
//// ----- Managing the reference counter ----
@@ -269,11 +271,7 @@ impl BlockManager {
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- let old_rc = self
- .rc
- .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?;
- let old_rc = RcEntry::parse_opt(old_rc);
- if old_rc.is_zero() {
+ if self.rc.block_incref(hash)? {
// When the reference counter is incremented, there is
// normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
@@ -287,35 +285,17 @@ impl BlockManager {
/// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- let new_rc = self
- .rc
- .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?;
- let new_rc = RcEntry::parse_opt(new_rc);
- if let RcEntry::Deletable { .. } = new_rc {
+ if self.rc.block_decref(hash)? {
+ // When the RC is decremented, it might drop to zero,
+ // indicating that we don't need the block.
+ // There is a delay before we garbage collect it;
+ // make sure that it is handled in the resync loop
+ // after that delay has passed.
self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?;
}
Ok(())
}
- /// Read a block's reference count
- fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
- Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
- }
-
- /// Delete an entry in the RC table if it is deletable and the
- /// deletion time has passed
- fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
- let now = now_msec();
- self.rc.update_and_fetch(&hash, |rcval| {
- let updated = match RcEntry::parse_opt(rcval) {
- RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent,
- v => v,
- };
- updated.serialize()
- })?;
- Ok(())
- }
-
// ---- Reading and writing blocks locally ----
/// Write a block to disk
@@ -659,7 +639,7 @@ impl BlockManager {
.delete_if_unneeded(hash, self)
.await?;
- self.clear_deleted_block_rc(hash)?;
+ self.rc.clear_deleted_block_rc(hash)?;
}
if needed.is_nonzero() && !exists {
@@ -773,7 +753,7 @@ impl BlockManagerLocked {
mgr: &BlockManager,
) -> Result<BlockStatus, Error> {
let exists = mgr.is_block_compressed(hash).await.is_ok();
- let needed = mgr.get_block_rc(hash)?;
+ let needed = mgr.rc.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed })
}
@@ -869,107 +849,6 @@ impl BlockManagerLocked {
}
}
-/// Describes the state of the reference counter for a block
-#[derive(Clone, Copy, Debug)]
-enum RcEntry {
- /// Present: the block has `count` references, with `count` > 0.
- ///
- /// This is stored as u64::to_be_bytes(count)
- Present { count: u64 },
-
- /// Deletable: the block has zero references, and can be deleted
- /// once time (returned by now_msec) is larger than at_time
- /// (in millis since Unix epoch)
- ///
- /// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
- /// (this allows for the data format to be backwards compatible with
- /// previous Garage versions that didn't have this intermediate state)
- Deletable { at_time: u64 },
-
- /// Absent: the block has zero references, and can be deleted
- /// immediately
- Absent,
-}
-
-impl RcEntry {
- fn parse(bytes: &[u8]) -> Self {
- if bytes.len() == 8 {
- RcEntry::Present {
- count: u64::from_be_bytes(bytes.try_into().unwrap()),
- }
- } else if bytes.len() == 16 {
- RcEntry::Deletable {
- at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
- }
- } else {
- panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.",
- bytes
- )
- }
- }
-
- fn parse_opt<V: AsRef<[u8]>>(bytes: Option<V>) -> Self {
- bytes
- .map(|b| Self::parse(b.as_ref()))
- .unwrap_or(Self::Absent)
- }
-
- fn serialize(self) -> Option<Vec<u8>> {
- match self {
- RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()),
- RcEntry::Deletable { at_time } => {
- Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat())
- }
- RcEntry::Absent => None,
- }
- }
-
- fn increment(self) -> Self {
- let old_count = match self {
- RcEntry::Present { count } => count,
- _ => 0,
- };
- RcEntry::Present {
- count: old_count + 1,
- }
- }
-
- fn decrement(self) -> Self {
- match self {
- RcEntry::Present { count } => {
- if count > 1 {
- RcEntry::Present { count: count - 1 }
- } else {
- RcEntry::Deletable {
- at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
- }
- }
- }
- del => del,
- }
- }
-
- fn is_zero(&self) -> bool {
- matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent)
- }
-
- fn is_nonzero(&self) -> bool {
- !self.is_zero()
- }
-
- fn is_deletable(&self) -> bool {
- match self {
- RcEntry::Present { .. } => false,
- RcEntry::Deletable { at_time } => now_msec() > *at_time,
- RcEntry::Absent => true,
- }
- }
-
- fn is_needed(&self) -> bool {
- !self.is_deletable()
- }
-}
-
/// Counts the number of errors when resyncing a block,
/// and the time of the last try.
/// Used to implement exponential backoff.