diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 32 |
1 files changed, 14 insertions, 18 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 9b2d9cad..50039d2b 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -17,10 +17,11 @@ use opentelemetry::{ Context, KeyValue, }; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -91,9 +92,9 @@ pub struct BlockManager { rc: BlockRc, - resync_queue: SledCountedTree, + resync_queue: db::Tree, resync_notify: Notify, - resync_errors: SledCountedTree, + resync_errors: db::Tree, system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, @@ -108,7 +109,7 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( - db: &sled::Db, + db: &db::Db, data_dir: PathBuf, compression_level: Option<i32>, background_tranquility: u32, @@ -123,12 +124,10 @@ impl BlockManager { let resync_queue = db .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let resync_queue = SledCountedTree::new(resync_queue); let resync_errors = db .open_tree("block_local_resync_errors") .expect("Unable to open block_local_resync_errors tree"); - let resync_errors = SledCountedTree::new(resync_errors); let endpoint = system .netapp @@ -219,7 +218,7 @@ impl BlockManager { /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { // 1. Repair blocks from RC table. - for (i, entry) in self.rc.rc.iter().enumerate() { + for (i, entry) in self.rc.rc.iter()?.enumerate() { let (hash, _) = entry?; let hash = Hash::try_from(&hash[..]).unwrap(); self.put_to_resync(&hash, Duration::from_secs(0))?; @@ -265,17 +264,17 @@ impl BlockManager { /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { - self.resync_queue.len() + self.resync_queue.len().unwrap() // TODO fix unwrap } /// Get number of blocks that have an error pub fn resync_errors_len(&self) -> usize { - self.resync_errors.len() + self.resync_errors.len().unwrap() // TODO fix unwrap } /// Get number of items in the refcount table pub fn rc_len(&self) -> usize { - self.rc.rc.len() + self.rc.rc.len().unwrap() // TODO fix unwrap } //// ----- Managing the reference counter ---- @@ -503,12 +502,12 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> { + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } - fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> { + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -547,11 +546,8 @@ impl BlockManager { // - Ok(true) -> a block was processed (successfully or not) // - Ok(false) -> no block was processed, but we are ready for the next iteration // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors - async fn resync_iter( - &self, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<bool, sled::Error> { - if let Some(first_pair_res) = self.resync_queue.iter().next() { + async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> { + if let Some(first_pair_res) = self.resync_queue.iter()?.next() { let (time_bytes, hash_bytes) = first_pair_res?; let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); @@ -966,7 +962,7 @@ impl ErrorCounter { } } - fn decode(data: sled::IVec) -> Self { + fn decode<'a>(data: db::Value<'a>) -> Self { Self { errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), |