From 8fd6745745f5676b5e80920792fa23453f3a20d7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 15 Mar 2022 12:23:33 +0100 Subject: Move block RC code to separate `rc.rs` --- src/block/lib.rs | 1 + src/block/manager.rs | 151 +++++------------------------------------------- src/block/rc.rs | 159 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 136 deletions(-) create mode 100644 src/block/rc.rs diff --git a/src/block/lib.rs b/src/block/lib.rs index 0c67c956..dc685657 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -5,3 +5,4 @@ pub mod manager; mod block; mod metrics; +mod rc; diff --git a/src/block/manager.rs b/src/block/manager.rs index 9665a306..feb6fb9d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -31,6 +31,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::metrics::*; use crate::block::*; +use crate::rc::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -51,7 +52,7 @@ const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); // The delay between the moment when the reference counter // drops to zero, and the moment where we allow ourselves // to delete the block locally. -const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); +pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] @@ -86,7 +87,7 @@ pub struct BlockManager { mutation_lock: Mutex, - rc: sled::Tree, + pub rc: BlockRc, resync_queue: SledCountedTree, resync_notify: Notify, @@ -114,6 +115,7 @@ impl BlockManager { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); + let rc = BlockRc::new(rc); let resync_queue = db .open_tree("block_local_resync_queue") @@ -213,7 +215,7 @@ impl BlockManager { /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { // 1. Repair blocks from RC table. - for (i, entry) in self.rc.iter().enumerate() { + for (i, entry) in self.rc.rc.iter().enumerate() { let (hash, _) = entry?; let hash = Hash::try_from(&hash[..]).unwrap(); self.put_to_resync(&hash, Duration::from_secs(0))?; @@ -261,7 +263,7 @@ impl BlockManager { /// Get number of items in the refcount table pub fn rc_len(&self) -> usize { - self.rc.len() + self.rc.rc.len() } //// ----- Managing the reference counter ---- @@ -269,11 +271,7 @@ impl BlockManager { /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self - .rc - .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; - let old_rc = RcEntry::parse_opt(old_rc); - if old_rc.is_zero() { + if self.rc.block_incref(hash)? { // When the reference counter is incremented, there is // normally a node that is responsible for sending us the // data of the block. However that operation may fail, @@ -287,35 +285,17 @@ impl BlockManager { /// Decrement the number of time a block is used pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self - .rc - .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; - let new_rc = RcEntry::parse_opt(new_rc); - if let RcEntry::Deletable { .. } = new_rc { + if self.rc.block_decref(hash)? { + // When the RC is decremented, it might drop to zero, + // indicating that we don't need the block. + // There is a delay before we garbage collect it; + // make sure that it is handled in the resync loop + // after that delay has passed. self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; } Ok(()) } - /// Read a block's reference count - fn get_block_rc(&self, hash: &Hash) -> Result { - Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?)) - } - - /// Delete an entry in the RC table if it is deletable and the - /// deletion time has passed - fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { - let now = now_msec(); - self.rc.update_and_fetch(&hash, |rcval| { - let updated = match RcEntry::parse_opt(rcval) { - RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, - v => v, - }; - updated.serialize() - })?; - Ok(()) - } - // ---- Reading and writing blocks locally ---- /// Write a block to disk @@ -659,7 +639,7 @@ impl BlockManager { .delete_if_unneeded(hash, self) .await?; - self.clear_deleted_block_rc(hash)?; + self.rc.clear_deleted_block_rc(hash)?; } if needed.is_nonzero() && !exists { @@ -773,7 +753,7 @@ impl BlockManagerLocked { mgr: &BlockManager, ) -> Result { let exists = mgr.is_block_compressed(hash).await.is_ok(); - let needed = mgr.get_block_rc(hash)?; + let needed = mgr.rc.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) } @@ -869,107 +849,6 @@ impl BlockManagerLocked { } } -/// Describes the state of the reference counter for a block -#[derive(Clone, Copy, Debug)] -enum RcEntry { - /// Present: the block has `count` references, with `count` > 0. - /// - /// This is stored as u64::to_be_bytes(count) - Present { count: u64 }, - - /// Deletable: the block has zero references, and can be deleted - /// once time (returned by now_msec) is larger than at_time - /// (in millis since Unix epoch) - /// - /// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time), - /// (this allows for the data format to be backwards compatible with - /// previous Garage versions that didn't have this intermediate state) - Deletable { at_time: u64 }, - - /// Absent: the block has zero references, and can be deleted - /// immediately - Absent, -} - -impl RcEntry { - fn parse(bytes: &[u8]) -> Self { - if bytes.len() == 8 { - RcEntry::Present { - count: u64::from_be_bytes(bytes.try_into().unwrap()), - } - } else if bytes.len() == 16 { - RcEntry::Deletable { - at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), - } - } else { - panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.", - bytes - ) - } - } - - fn parse_opt>(bytes: Option) -> Self { - bytes - .map(|b| Self::parse(b.as_ref())) - .unwrap_or(Self::Absent) - } - - fn serialize(self) -> Option> { - match self { - RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()), - RcEntry::Deletable { at_time } => { - Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat()) - } - RcEntry::Absent => None, - } - } - - fn increment(self) -> Self { - let old_count = match self { - RcEntry::Present { count } => count, - _ => 0, - }; - RcEntry::Present { - count: old_count + 1, - } - } - - fn decrement(self) -> Self { - match self { - RcEntry::Present { count } => { - if count > 1 { - RcEntry::Present { count: count - 1 } - } else { - RcEntry::Deletable { - at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, - } - } - } - del => del, - } - } - - fn is_zero(&self) -> bool { - matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent) - } - - fn is_nonzero(&self) -> bool { - !self.is_zero() - } - - fn is_deletable(&self) -> bool { - match self { - RcEntry::Present { .. } => false, - RcEntry::Deletable { at_time } => now_msec() > *at_time, - RcEntry::Absent => true, - } - } - - fn is_needed(&self) -> bool { - !self.is_deletable() - } -} - /// Counts the number of errors when resyncing a block, /// and the time of the last try. /// Used to implement exponential backoff. diff --git a/src/block/rc.rs b/src/block/rc.rs new file mode 100644 index 00000000..0f497c9b --- /dev/null +++ b/src/block/rc.rs @@ -0,0 +1,159 @@ +use std::convert::TryInto; + +use garage_util::error::*; +use garage_util::data::*; +use garage_util::time::*; + +use crate::manager::BLOCK_GC_DELAY; + +pub struct BlockRc { + pub(crate) rc: sled::Tree, +} + +impl BlockRc { + pub(crate) fn new(rc: sled::Tree) -> Self { + Self { + rc + } + } + + /// Increment the reference counter associated to a hash. + /// Returns true if the RC goes from zero to nonzero. + pub(crate) fn block_incref(&self, hash: &Hash) -> Result { + let old_rc = self + .rc + .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; + let old_rc = RcEntry::parse_opt(old_rc); + Ok(old_rc.is_zero()) + } + + /// Decrement the reference counter associated to a hash. + /// Returns true if the RC is now zero. + pub(crate) fn block_decref(&self, hash: &Hash) -> Result { + let new_rc = self + .rc + .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; + let new_rc = RcEntry::parse_opt(new_rc); + Ok(matches!(new_rc, RcEntry::Deletable {..})) + } + + /// Read a block's reference count + pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result { + Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?)) + } + + /// Delete an entry in the RC table if it is deletable and the + /// deletion time has passed + pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + self.rc.update_and_fetch(&hash, |rcval| { + let updated = match RcEntry::parse_opt(rcval) { + RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, + v => v, + }; + updated.serialize() + })?; + Ok(()) + } +} + +/// Describes the state of the reference counter for a block +#[derive(Clone, Copy, Debug)] +pub(crate) enum RcEntry { + /// Present: the block has `count` references, with `count` > 0. + /// + /// This is stored as u64::to_be_bytes(count) + Present { count: u64 }, + + /// Deletable: the block has zero references, and can be deleted + /// once time (returned by now_msec) is larger than at_time + /// (in millis since Unix epoch) + /// + /// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time), + /// (this allows for the data format to be backwards compatible with + /// previous Garage versions that didn't have this intermediate state) + Deletable { at_time: u64 }, + + /// Absent: the block has zero references, and can be deleted + /// immediately + Absent, +} + +impl RcEntry { + fn parse(bytes: &[u8]) -> Self { + if bytes.len() == 8 { + RcEntry::Present { + count: u64::from_be_bytes(bytes.try_into().unwrap()), + } + } else if bytes.len() == 16 { + RcEntry::Deletable { + at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + } + } else { + panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.", + bytes + ) + } + } + + fn parse_opt>(bytes: Option) -> Self { + bytes + .map(|b| Self::parse(b.as_ref())) + .unwrap_or(Self::Absent) + } + + fn serialize(self) -> Option> { + match self { + RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()), + RcEntry::Deletable { at_time } => { + Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat()) + } + RcEntry::Absent => None, + } + } + + fn increment(self) -> Self { + let old_count = match self { + RcEntry::Present { count } => count, + _ => 0, + }; + RcEntry::Present { + count: old_count + 1, + } + } + + fn decrement(self) -> Self { + match self { + RcEntry::Present { count } => { + if count > 1 { + RcEntry::Present { count: count - 1 } + } else { + RcEntry::Deletable { + at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, + } + } + } + del => del, + } + } + + pub(crate) fn is_zero(&self) -> bool { + matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent) + } + + pub(crate) fn is_nonzero(&self) -> bool { + !self.is_zero() + } + + pub(crate) fn is_deletable(&self) -> bool { + match self { + RcEntry::Present { .. } => false, + RcEntry::Deletable { at_time } => now_msec() > *at_time, + RcEntry::Absent => true, + } + } + + pub(crate) fn is_needed(&self) -> bool { + !self.is_deletable() + } +} -- cgit v1.2.3