diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/Cargo.toml | 3 | ||||
-rw-r--r-- | src/block/manager.rs | 127 | ||||
-rw-r--r-- | src/block/metrics.rs | 4 | ||||
-rw-r--r-- | src/block/rc.rs | 49 |
4 files changed, 127 insertions, 56 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 9cba69ee..80346aca 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_util = { version = "0.7.0", path = "../util" } garage_table = { version = "0.7.0", path = "../table" } @@ -27,8 +28,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/block/manager.rs b/src/block/manager.rs index 9b2d9cad..32ba0431 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,5 @@ +use core::ops::Bound; + use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -17,10 +19,12 @@ use opentelemetry::{ Context, KeyValue, }; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; + use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -91,9 +95,9 @@ pub struct BlockManager { rc: BlockRc, - resync_queue: SledCountedTree, + resync_queue: CountedTree, resync_notify: Notify, - resync_errors: SledCountedTree, + resync_errors: CountedTree, system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, @@ -108,7 +112,7 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( - db: &sled::Db, + db: &db::Db, data_dir: PathBuf, compression_level: Option<i32>, background_tranquility: u32, @@ -123,12 +127,14 @@ impl BlockManager { let resync_queue = db .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let resync_queue = SledCountedTree::new(resync_queue); + let resync_queue = + CountedTree::new(resync_queue).expect("Could not count block_local_resync_queue"); let resync_errors = db .open_tree("block_local_resync_errors") .expect("Unable to open block_local_resync_errors tree"); - let resync_errors = SledCountedTree::new(resync_errors); + let resync_errors = + CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); let endpoint = system .netapp @@ -219,11 +225,44 @@ impl BlockManager { /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { // 1. Repair blocks from RC table. - for (i, entry) in self.rc.rc.iter().enumerate() { - let (hash, _) = entry?; - let hash = Hash::try_from(&hash[..]).unwrap(); - self.put_to_resync(&hash, Duration::from_secs(0))?; - if i & 0xFF == 0 && *must_exit.borrow() { + let mut next_start: Option<Hash> = None; + loop { + // We have to do this complicated two-step process where we first read a bunch + // of hashes from the RC table, and then insert them in the to-resync queue, + // because of SQLite. Basically, as long as we have an iterator on a DB table, + // we can't do anything else on the DB. The naive approach (which we had previously) + // of just iterating on the RC table and inserting items one to one in the resync + // queue can't work here, it would just provoke a deadlock in the SQLite adapter code. + // This is mostly because the Rust bindings for SQLite assume a worst-case scenario + // where SQLite is not compiled in thread-safe mode, so we have to wrap everything + // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). + let mut batch_of_hashes = vec![]; + let start_bound = match next_start.as_ref() { + None => Bound::Unbounded, + Some(x) => Bound::Excluded(x.as_slice()), + }; + for entry in self + .rc + .rc + .range::<&[u8], _>((start_bound, Bound::Unbounded))? + { + let (hash, _) = entry?; + let hash = Hash::try_from(&hash[..]).unwrap(); + batch_of_hashes.push(hash); + if batch_of_hashes.len() >= 1000 { + break; + } + } + if batch_of_hashes.is_empty() { + break; + } + + for hash in batch_of_hashes.into_iter() { + self.put_to_resync(&hash, Duration::from_secs(0))?; + next_start = Some(hash) + } + + if *must_exit.borrow() { return Ok(()); } } @@ -264,46 +303,69 @@ impl BlockManager { } /// Get lenght of resync queue - pub fn resync_queue_len(&self) -> usize { - self.resync_queue.len() + pub fn resync_queue_len(&self) -> Result<usize, Error> { + // This currently can't return an error because the CountedTree hack + // doesn't error on .len(), but this will change when we remove the hack + // (hopefully someday!) + Ok(self.resync_queue.len()) } /// Get number of blocks that have an error - pub fn resync_errors_len(&self) -> usize { - self.resync_errors.len() + pub fn resync_errors_len(&self) -> Result<usize, Error> { + // (see resync_queue_len comment) + Ok(self.resync_errors.len()) } /// Get number of items in the refcount table - pub fn rc_len(&self) -> usize { - self.rc.rc.len() + pub fn rc_len(&self) -> Result<usize, Error> { + Ok(self.rc.rc.len()?) } //// ----- Managing the reference counter ---- /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known - pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.block_incref(hash)? { + pub fn block_incref( + self: &Arc<Self>, + tx: &mut db::Transaction, + hash: Hash, + ) -> db::TxOpResult<()> { + if self.rc.block_incref(tx, &hash)? { // When the reference counter is incremented, there is // normally a node that is responsible for sending us the // data of the block. However that operation may fail, // so in all cases we add the block here to the todo list // to check later that it arrived correctly, and if not // we will fecth it from someone. - self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; + let this = self.clone(); + tokio::spawn(async move { + if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) { + error!("Block {:?} could not be put in resync queue: {}.", hash, e); + } + }); } Ok(()) } /// Decrement the number of time a block is used - pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.block_decref(hash)? { + pub fn block_decref( + self: &Arc<Self>, + tx: &mut db::Transaction, + hash: Hash, + ) -> db::TxOpResult<()> { + if self.rc.block_decref(tx, &hash)? { // When the RC is decremented, it might drop to zero, // indicating that we don't need the block. // There is a delay before we garbage collect it; // make sure that it is handled in the resync loop // after that delay has passed. - self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; + let this = self.clone(); + tokio::spawn(async move { + if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10)) + { + error!("Block {:?} could not be put in resync queue: {}.", hash, e); + } + }); } Ok(()) } @@ -503,12 +565,12 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> { + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } - fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> { + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -547,13 +609,8 @@ impl BlockManager { // - Ok(true) -> a block was processed (successfully or not) // - Ok(false) -> no block was processed, but we are ready for the next iteration // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors - async fn resync_iter( - &self, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<bool, sled::Error> { - if let Some(first_pair_res) = self.resync_queue.iter().next() { - let (time_bytes, hash_bytes) = first_pair_res?; - + async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> { + if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); @@ -561,7 +618,7 @@ impl BlockManager { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); if let Some(ec) = self.resync_errors.get(hash.as_slice())? { - let ec = ErrorCounter::decode(ec); + let ec = ErrorCounter::decode(&ec); if now < ec.next_try() { // if next retry after an error is not yet, // don't do resync and return early, but still @@ -602,7 +659,7 @@ impl BlockManager { warn!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.resync_errors.get(hash.as_slice())? { - Some(ec) => ErrorCounter::decode(ec).add1(now + 1), + Some(ec) => ErrorCounter::decode(&ec).add1(now + 1), None => ErrorCounter::new(now + 1), }; @@ -966,7 +1023,7 @@ impl ErrorCounter { } } - fn decode(data: sled::IVec) -> Self { + fn decode(data: &[u8]) -> Self { Self { errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), diff --git a/src/block/metrics.rs b/src/block/metrics.rs index f0f541a3..477add66 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,6 +1,6 @@ use opentelemetry::{global, metrics::*}; -use garage_util::sled_counter::SledCountedTree; +use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { @@ -23,7 +23,7 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self { + pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { _resync_queue_len: meter diff --git a/src/block/rc.rs b/src/block/rc.rs index ec3ea44e..ce6defad 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -1,5 +1,7 @@ use std::convert::TryInto; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -7,31 +9,41 @@ use garage_util::time::*; use crate::manager::BLOCK_GC_DELAY; pub struct BlockRc { - pub(crate) rc: sled::Tree, + pub(crate) rc: db::Tree, } impl BlockRc { - pub(crate) fn new(rc: sled::Tree) -> Self { + pub(crate) fn new(rc: db::Tree) -> Self { Self { rc } } /// Increment the reference counter associated to a hash. /// Returns true if the RC goes from zero to nonzero. - pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> { - let old_rc = self - .rc - .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; - let old_rc = RcEntry::parse_opt(old_rc); + pub(crate) fn block_incref( + &self, + tx: &mut db::Transaction, + hash: &Hash, + ) -> db::TxOpResult<bool> { + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + match old_rc.increment().serialize() { + Some(x) => tx.insert(&self.rc, &hash, x)?, + None => unreachable!(), + }; Ok(old_rc.is_zero()) } /// Decrement the reference counter associated to a hash. /// Returns true if the RC is now zero. - pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> { - let new_rc = self - .rc - .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; - let new_rc = RcEntry::parse_opt(new_rc); + pub(crate) fn block_decref( + &self, + tx: &mut db::Transaction, + hash: &Hash, + ) -> db::TxOpResult<bool> { + let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); + match new_rc.serialize() { + Some(x) => tx.insert(&self.rc, &hash, x)?, + None => tx.remove(&self.rc, &hash)?, + }; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } @@ -44,12 +56,15 @@ impl BlockRc { /// deletion time has passed pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); - self.rc.update_and_fetch(&hash, |rcval| { - let updated = match RcEntry::parse_opt(rcval) { - RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, - v => v, + self.rc.db().transaction(|mut tx| { + let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + match rcval { + RcEntry::Deletable { at_time } if now > at_time => { + tx.remove(&self.rc, &hash)?; + } + _ => (), }; - updated.serialize() + tx.commit(()) })?; Ok(()) } |