aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs32
1 files changed, 14 insertions, 18 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 9b2d9cad..50039d2b 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -17,10 +17,11 @@ use opentelemetry::{
Context, KeyValue,
};
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -91,9 +92,9 @@ pub struct BlockManager {
rc: BlockRc,
- resync_queue: SledCountedTree,
+ resync_queue: db::Tree,
resync_notify: Notify,
- resync_errors: SledCountedTree,
+ resync_errors: db::Tree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -108,7 +109,7 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
- db: &sled::Db,
+ db: &db::Db,
data_dir: PathBuf,
compression_level: Option<i32>,
background_tranquility: u32,
@@ -123,12 +124,10 @@ impl BlockManager {
let resync_queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let resync_queue = SledCountedTree::new(resync_queue);
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
- let resync_errors = SledCountedTree::new(resync_errors);
let endpoint = system
.netapp
@@ -219,7 +218,7 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
- for (i, entry) in self.rc.rc.iter().enumerate() {
+ for (i, entry) in self.rc.rc.iter()?.enumerate() {
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
self.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -265,17 +264,17 @@ impl BlockManager {
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
- self.resync_queue.len()
+ self.resync_queue.len().unwrap() // TODO fix unwrap
}
/// Get number of blocks that have an error
pub fn resync_errors_len(&self) -> usize {
- self.resync_errors.len()
+ self.resync_errors.len().unwrap() // TODO fix unwrap
}
/// Get number of items in the refcount table
pub fn rc_len(&self) -> usize {
- self.rc.rc.len()
+ self.rc.rc.len().unwrap() // TODO fix unwrap
}
//// ----- Managing the reference counter ----
@@ -503,12 +502,12 @@ impl BlockManager {
});
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> {
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
- fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> {
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -547,11 +546,8 @@ impl BlockManager {
// - Ok(true) -> a block was processed (successfully or not)
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(
- &self,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<bool, sled::Error> {
- if let Some(first_pair_res) = self.resync_queue.iter().next() {
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ if let Some(first_pair_res) = self.resync_queue.iter()?.next() {
let (time_bytes, hash_bytes) = first_pair_res?;
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
@@ -966,7 +962,7 @@ impl ErrorCounter {
}
}
- fn decode(data: sled::IVec) -> Self {
+ fn decode<'a>(data: db::Value<'a>) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),