diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-02 16:58:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-02 16:58:00 +0200 |
commit | 9f0f5b2e372a720a807914747fd48ddc93928e04 (patch) | |
tree | a4346766c46469758b9138a09c65fa052e9ad253 | |
parent | 04901093e7315558bdc147d27adc3f56ec2c98a1 (diff) | |
download | garage-9f0f5b2e372a720a807914747fd48ddc93928e04.tar.gz garage-9f0f5b2e372a720a807914747fd48ddc93928e04.zip |
Adapt Garage to use new DB abstraction
-rw-r--r-- | Cargo.lock | 8 | ||||
-rw-r--r-- | src/block/Cargo.toml | 3 | ||||
-rw-r--r-- | src/block/manager.rs | 32 | ||||
-rw-r--r-- | src/block/metrics.rs | 8 | ||||
-rw-r--r-- | src/block/rc.rs | 51 | ||||
-rw-r--r-- | src/db/lib.rs | 114 | ||||
-rw-r--r-- | src/db/sled_adapter.rs | 89 | ||||
-rw-r--r-- | src/db/test.rs | 21 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/admin.rs | 2 | ||||
-rw-r--r-- | src/garage/repair.rs | 22 | ||||
-rw-r--r-- | src/garage/server.rs | 1 | ||||
-rw-r--r-- | src/model/Cargo.toml | 3 | ||||
-rw-r--r-- | src/model/garage.rs | 8 | ||||
-rw-r--r-- | src/model/index_counter.rs | 16 | ||||
-rw-r--r-- | src/model/migrate.rs | 2 | ||||
-rw-r--r-- | src/table/Cargo.toml | 2 | ||||
-rw-r--r-- | src/table/data.rs | 63 | ||||
-rw-r--r-- | src/table/gc.rs | 30 | ||||
-rw-r--r-- | src/table/merkle.rs | 54 | ||||
-rw-r--r-- | src/table/metrics.rs | 12 | ||||
-rw-r--r-- | src/table/sync.rs | 5 | ||||
-rw-r--r-- | src/table/table.rs | 4 | ||||
-rw-r--r-- | src/util/Cargo.toml | 4 | ||||
-rw-r--r-- | src/util/error.rs | 12 | ||||
-rw-r--r-- | src/util/lib.rs | 2 |
26 files changed, 355 insertions, 214 deletions
@@ -889,6 +889,7 @@ dependencies = [ "futures", "futures-util", "garage_api", + "garage_db", "garage_model 0.7.0", "garage_rpc 0.7.0", "garage_table 0.7.0", @@ -972,6 +973,7 @@ dependencies = [ "bytes 1.1.0", "futures", "futures-util", + "garage_db", "garage_rpc 0.7.0", "garage_table 0.7.0", "garage_util 0.7.0", @@ -981,7 +983,6 @@ dependencies = [ "rmp-serde 0.15.5", "serde", "serde_bytes", - "sled", "tokio", "tracing", "zstd", @@ -1034,6 +1035,7 @@ dependencies = [ "futures", "futures-util", "garage_block", + "garage_db", "garage_model 0.5.1", "garage_rpc 0.7.0", "garage_table 0.7.0", @@ -1045,7 +1047,6 @@ dependencies = [ "rmp-serde 0.15.5", "serde", "serde_bytes", - "sled", "tokio", "tracing", "zstd", @@ -1149,7 +1150,6 @@ dependencies = [ "rmp-serde 0.15.5", "serde", "serde_bytes", - "sled", "tokio", "tracing", ] @@ -1188,6 +1188,7 @@ dependencies = [ "chrono", "err-derive 0.3.1", "futures", + "garage_db", "hex", "http", "hyper", @@ -1198,7 +1199,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sled", "tokio", "toml", "tracing", diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 9cba69ee..80346aca 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_util = { version = "0.7.0", path = "../util" } garage_table = { version = "0.7.0", path = "../table" } @@ -27,8 +28,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/block/manager.rs b/src/block/manager.rs index 9b2d9cad..50039d2b 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -17,10 +17,11 @@ use opentelemetry::{ Context, KeyValue, }; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -91,9 +92,9 @@ pub struct BlockManager { rc: BlockRc, - resync_queue: SledCountedTree, + resync_queue: db::Tree, resync_notify: Notify, - resync_errors: SledCountedTree, + resync_errors: db::Tree, system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, @@ -108,7 +109,7 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( - db: &sled::Db, + db: &db::Db, data_dir: PathBuf, compression_level: Option<i32>, background_tranquility: u32, @@ -123,12 +124,10 @@ impl BlockManager { let resync_queue = db .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let resync_queue = SledCountedTree::new(resync_queue); let resync_errors = db .open_tree("block_local_resync_errors") .expect("Unable to open block_local_resync_errors tree"); - let resync_errors = SledCountedTree::new(resync_errors); let endpoint = system .netapp @@ -219,7 +218,7 @@ impl BlockManager { /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { // 1. Repair blocks from RC table. - for (i, entry) in self.rc.rc.iter().enumerate() { + for (i, entry) in self.rc.rc.iter()?.enumerate() { let (hash, _) = entry?; let hash = Hash::try_from(&hash[..]).unwrap(); self.put_to_resync(&hash, Duration::from_secs(0))?; @@ -265,17 +264,17 @@ impl BlockManager { /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { - self.resync_queue.len() + self.resync_queue.len().unwrap() // TODO fix unwrap } /// Get number of blocks that have an error pub fn resync_errors_len(&self) -> usize { - self.resync_errors.len() + self.resync_errors.len().unwrap() // TODO fix unwrap } /// Get number of items in the refcount table pub fn rc_len(&self) -> usize { - self.rc.rc.len() + self.rc.rc.len().unwrap() // TODO fix unwrap } //// ----- Managing the reference counter ---- @@ -503,12 +502,12 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> { + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } - fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> { + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -547,11 +546,8 @@ impl BlockManager { // - Ok(true) -> a block was processed (successfully or not) // - Ok(false) -> no block was processed, but we are ready for the next iteration // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors - async fn resync_iter( - &self, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<bool, sled::Error> { - if let Some(first_pair_res) = self.resync_queue.iter().next() { + async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> { + if let Some(first_pair_res) = self.resync_queue.iter()?.next() { let (time_bytes, hash_bytes) = first_pair_res?; let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); @@ -966,7 +962,7 @@ impl ErrorCounter { } } - fn decode(data: sled::IVec) -> Self { + fn decode<'a>(data: db::Value<'a>) -> Self { Self { errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), diff --git a/src/block/metrics.rs b/src/block/metrics.rs index f0f541a3..1d4d0028 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,6 +1,6 @@ use opentelemetry::{global, metrics::*}; -use garage_util::sled_counter::SledCountedTree; +use garage_db as db; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { @@ -23,12 +23,12 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self { + pub fn new(resync_queue: db::Tree, resync_errors: db::Tree) -> Self { let meter = global::meter("garage_model/block"); Self { _resync_queue_len: meter .u64_value_observer("block.resync_queue_length", move |observer| { - observer.observe(resync_queue.len() as u64, &[]) + observer.observe(resync_queue.len().unwrap() as u64, &[]) // TODO fix unwrap }) .with_description( "Number of block hashes queued for local check and possible resync", @@ -36,7 +36,7 @@ impl BlockManagerMetrics { .init(), _resync_errored_blocks: meter .u64_value_observer("block.resync_errored_blocks", move |observer| { - observer.observe(resync_errors.len() as u64, &[]) + observer.observe(resync_errors.len().unwrap() as u64, &[]) // TODO fix unwrap }) .with_description("Number of block hashes whose last resync resulted in an error") .init(), diff --git a/src/block/rc.rs b/src/block/rc.rs index ec3ea44e..f6d8c2aa 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -1,5 +1,7 @@ use std::convert::TryInto; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -7,31 +9,47 @@ use garage_util::time::*; use crate::manager::BLOCK_GC_DELAY; pub struct BlockRc { - pub(crate) rc: sled::Tree, + pub(crate) rc: db::Tree, } impl BlockRc { - pub(crate) fn new(rc: sled::Tree) -> Self { + pub(crate) fn new(rc: db::Tree) -> Self { Self { rc } } /// Increment the reference counter associated to a hash. /// Returns true if the RC goes from zero to nonzero. pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> { - let old_rc = self - .rc - .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; - let old_rc = RcEntry::parse_opt(old_rc); + let old_rc = self.rc.db().transaction(|tx| { + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + match old_rc.increment().serialize() { + Some(x) => { + tx.insert(&self.rc, &hash, x)?; + } + None => { + tx.remove(&self.rc, &hash)?; + } + }; + tx.commit(old_rc) + })?; Ok(old_rc.is_zero()) } /// Decrement the reference counter associated to a hash. /// Returns true if the RC is now zero. pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> { - let new_rc = self - .rc - .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; - let new_rc = RcEntry::parse_opt(new_rc); + let new_rc = self.rc.db().transaction(|tx| { + let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); + match new_rc.serialize() { + Some(x) => { + tx.insert(&self.rc, &hash, x)?; + } + None => { + tx.remove(&self.rc, &hash)?; + } + }; + tx.commit(new_rc) + })?; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } @@ -44,12 +62,15 @@ impl BlockRc { /// deletion time has passed pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); - self.rc.update_and_fetch(&hash, |rcval| { - let updated = match RcEntry::parse_opt(rcval) { - RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, - v => v, + self.rc.db().transaction(|tx| { + let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + match rcval { + RcEntry::Deletable { at_time } if now > at_time => { + tx.remove(&self.rc, &hash)?; + } + _ => (), }; - updated.serialize() + tx.commit(()) })?; Ok(()) } diff --git a/src/db/lib.rs b/src/db/lib.rs index 4e400a1d..75c6ffa2 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -3,26 +3,31 @@ pub mod sled_adapter; #[cfg(test)] pub mod test; +use core::ops::{Bound, RangeBounds}; + use std::borrow::Cow; use std::sync::Arc; use arc_swap::ArcSwapOption; +use err_derive::Error; #[derive(Clone)] pub struct Db(pub(crate) Arc<dyn IDb>); -#[derive(Clone)] +#[derive(Clone, Copy)] pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); #[derive(Clone)] pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize); pub type Value<'a> = Cow<'a, [u8]>; -pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + 'a>; +pub type ValueIter<'a> = + Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + Sync + 'a>; // ---- -#[derive(Debug)] +#[derive(Debug, Error)] +#[error(display = "{}", _0)] pub struct Error(Cow<'static, str>); pub type Result<T> = std::result::Result<T, Error>; @@ -43,14 +48,14 @@ impl<E> From<Error> for TxError<E> { // ---- impl Db { - pub fn tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> { - let tree_id = self.0.tree(name.as_ref())?; + pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> { + let tree_id = self.0.open_tree(name.as_ref())?; Ok(Tree(self.0.clone(), tree_id)) } pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, + F: Fn(Transaction<'_>) -> TxResult<R, E>, R: Send + Sync, E: Send + Sync, { @@ -83,20 +88,50 @@ impl Db { } impl Tree { + pub fn db(&self) -> Db { + Db(self.0.clone()) + } + pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> { self.0.get(self.1, key.as_ref()) } - pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { - self.0.put(self.1, key.as_ref(), value.as_ref()) + pub fn len(&self) -> Result<usize> { + self.0.len(self.1) + } + + pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { + self.0.insert(self.1, key.as_ref(), value.as_ref()) } - pub fn iter<'a>(&'a self, reverse: bool) -> Result<ValueIter<'a>> { - self.0.range(self.1, None, reverse) + pub fn remove<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<bool> { + self.0.remove(self.1, key.as_ref()) } - pub fn range<'a, T: AsRef<[u8]>>(&'a self, start: T, reverse: bool) -> Result<ValueIter<'a>> { - self.0.range(self.1, Some(start.as_ref()), reverse) + pub fn iter<'a>(&'a self) -> Result<ValueIter<'a>> { + self.0.iter(self.1) + } + pub fn iter_rev<'a>(&'a self) -> Result<ValueIter<'a>> { + self.0.iter_rev(self.1) + } + + pub fn range<'a, K, R>(&'a self, range: R) -> Result<ValueIter<'a>> + where + K: AsRef<[u8]>, + R: RangeBounds<K>, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range(self.1, get_bound(sb), get_bound(eb)) + } + pub fn range_rev<'a, K, R>(&'a self, range: R) -> Result<ValueIter<'a>> + where + K: AsRef<[u8]>, + R: RangeBounds<K>, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range_rev(self.1, get_bound(sb), get_bound(eb)) } } @@ -105,8 +140,17 @@ impl<'a> Transaction<'a> { self.0.get(tree.1, key.as_ref()) } - pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, tree: &Tree, key: T, value: U) -> Result<()> { - self.0.put(tree.1, key.as_ref(), value.as_ref()) + pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>( + &self, + tree: &Tree, + key: T, + value: U, + ) -> Result<()> { + self.0.insert(tree.1, key.as_ref(), value.as_ref()) + } + + pub fn remove<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<bool> { + self.0.remove(tree.1, key.as_ref()) } #[must_use] @@ -131,15 +175,28 @@ impl<'a> Transaction<'a> { // ---- Internal interfaces pub(crate) trait IDb: Send + Sync { - fn tree(&self, name: &str) -> Result<usize>; + fn open_tree(&self, name: &str) -> Result<usize>; fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; - fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; - fn range<'a>( + fn len(&self, tree: usize) -> Result<usize>; + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>; + + fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>>; + fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>>; + + fn range<'a, 'r>( + &'a self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result<ValueIter<'a>>; + fn range_rev<'a, 'r>( &'a self, tree: usize, - start: Option<&[u8]>, - reverse: bool, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, ) -> Result<ValueIter<'a>>; fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; @@ -147,10 +204,11 @@ pub(crate) trait IDb: Send + Sync { pub(crate) trait ITx<'a> { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; - fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>; } -pub(crate) trait ITxFn: Send + Sync { +pub(crate) trait ITxFn { fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult; } @@ -162,7 +220,7 @@ enum TxFnResult { struct TxFn<F, R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, + F: Fn(Transaction<'_>) -> TxResult<R, E>, R: Send + Sync, E: Send + Sync, { @@ -172,7 +230,7 @@ where impl<F, R, E> ITxFn for TxFn<F, R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, + F: Fn(Transaction<'_>) -> TxResult<R, E>, R: Send + Sync, E: Send + Sync, { @@ -187,3 +245,13 @@ where retval } } + +// ---- + +fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> { + match b { + Bound::Included(v) => Bound::Included(v.as_ref()), + Bound::Excluded(v) => Bound::Excluded(v.as_ref()), + Bound::Unbounded => Bound::Unbounded, + } +} diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index 6e375f03..b1da1c2b 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -1,3 +1,5 @@ +use core::ops::Bound; + use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -42,7 +44,7 @@ impl SledDb { } impl IDb for SledDb { - fn tree(&self, name: &str) -> Result<usize> { + fn open_tree(&self, name: &str) -> Result<usize> { let mut trees = self.trees.write().unwrap(); if let Some(i) = trees.1.get(name) { Ok(*i) @@ -60,42 +62,63 @@ impl IDb for SledDb { Ok(tree.get(key)?.map(|v| v.to_vec().into())) } - fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { + let tree = self.get_tree(tree)?; + Ok(tree.remove(key)?.is_some()) + } + + fn len(&self, tree: usize) -> Result<usize> { + let tree = self.get_tree(tree)?; + Ok(tree.len()) + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; tree.insert(key, value)?; Ok(()) } - fn range<'a>( + fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.iter().map(|v| { + v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) + .map_err(Into::into) + }))) + } + + fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.iter().rev().map(|v| { + v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) + .map_err(Into::into) + }))) + } + + fn range<'a, 'r>( &'a self, tree: usize, - start: Option<&[u8]>, - reverse: bool, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, ) -> Result<ValueIter<'a>> { let tree = self.get_tree(tree)?; - if reverse { - match start { - Some(start) => Ok(Box::new(tree.range(..=start).rev().map(|v| { - v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) - .map_err(Into::into) - }))), - None => Ok(Box::new(tree.iter().rev().map(|v| { - v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) - .map_err(Into::into) - }))), - } - } else { - match start { - Some(start) => Ok(Box::new(tree.range(start..).map(|v| { - v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) - .map_err(Into::into) - }))), - None => Ok(Box::new(tree.iter().map(|v| { - v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) - .map_err(Into::into) - }))), - } - } + Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| { + v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) + .map_err(Into::into) + }))) + } + fn range_rev<'a, 'r>( + &'a self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result<ValueIter<'a>> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map( + |v| { + v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) + .map_err(Into::into) + }, + ))) } fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { @@ -162,7 +185,7 @@ impl<'a> ITx<'a> for SledTx<'a> { Ok(tmp.map(|v| v.to_vec().into())) } - fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self .trees .get(tree) @@ -170,4 +193,12 @@ impl<'a> ITx<'a> for SledTx<'a> { self.save_error(tree.insert(key, value))?; Ok(()) } + + fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { + let tree = self + .trees + .get(tree) + .ok_or(Error("invalid tree id".into()))?; + Ok(self.save_error(tree.remove(key))?.is_some()) + } } diff --git a/src/db/test.rs b/src/db/test.rs index 7e389271..69e1d12c 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -3,21 +3,22 @@ use crate::*; use crate::sled_adapter::SledDb; fn test_suite(db: Db) -> Result<()> { - let tree = db.tree("tree")?; + let tree = db.open_tree("tree")?; let ka: &[u8] = &b"test"[..]; let kb: &[u8] = &b"zwello"[..]; + let kint: &[u8] = &b"tz"[..]; let va: &[u8] = &b"plop"[..]; let vb: &[u8] = &b"plip"[..]; let vc: &[u8] = &b"plup"[..]; - tree.put(ka, va)?; + tree.insert(ka, va)?; assert_eq!(tree.get(ka)?, Some(va.into())); let res = db.transaction::<_, (), _>(|tx| { assert_eq!(tx.get(&tree, ka)?, Some(va.into())); - tx.put(&tree, ka, vb)?; + tx.insert(&tree, ka, vb)?; assert_eq!(tx.get(&tree, ka)?, Some(vb.into())); @@ -29,7 +30,7 @@ fn test_suite(db: Db) -> Result<()> { let res = db.transaction::<(), _, _>(|tx| { assert_eq!(tx.get(&tree, ka)?, Some(vb.into())); - tx.put(&tree, ka, vc)?; + tx.insert(&tree, ka, vc)?; assert_eq!(tx.get(&tree, ka)?, Some(vc.into())); @@ -38,27 +39,27 @@ fn test_suite(db: Db) -> Result<()> { assert!(matches!(res, Err(TxError::Abort(42)))); assert_eq!(tree.get(ka)?, Some(vb.into())); - let mut iter = tree.iter(false)?; + let mut iter = tree.iter()?; assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert!(iter.next().is_none()); - tree.put(kb, vc)?; + tree.insert(kb, vc)?; assert_eq!(tree.get(kb)?, Some(vc.into())); - let mut iter = tree.iter(false)?; + let mut iter = tree.iter()?; assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into())); assert!(iter.next().is_none()); - let mut iter = tree.range("tz", false)?; + let mut iter = tree.range(kint..)?; assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into())); assert!(iter.next().is_none()); - let mut iter = tree.range("tz", true)?; + let mut iter = tree.range_rev(..kint)?; assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert!(iter.next().is_none()); - let mut iter = tree.iter(true)?; + let mut iter = tree.iter_rev()?; assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into())); assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert!(iter.next().is_none()); diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 902f67f8..d34a7fa4 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -21,6 +21,7 @@ path = "tests/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_api = { version = "0.7.0", path = "../api" } garage_model = { version = "0.7.0", path = "../model" } garage_rpc = { version = "0.7.0", path = "../rpc" } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index bc1f494a..cce88b35 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -727,7 +727,7 @@ impl AdminRpcHandler { { writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); if opt.detailed { - writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); + writeln!(to, " number of items: {}", t.data.store.len().unwrap()).unwrap(); // TODO fix len unwrap writeln!( to, " Merkle tree size: {}", diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 830eac71..04d9ee72 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::sync::Arc; use tokio::sync::watch; @@ -65,9 +66,15 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = - self.garage.version_table.data.store.get_gt(&pos)? + while let Some(item) = self + .garage + .version_table + .data + .store + .range((Bound::Excluded(pos), Bound::Unbounded))? + .next() { + let (item_key, item_bytes) = item?; pos = item_key.to_vec(); let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; @@ -109,9 +116,16 @@ impl Repair { async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = - self.garage.block_ref_table.data.store.get_gt(&pos)? + while let Some(item) = self + .garage + .block_ref_table + .data + .store + .range((Bound::Excluded(pos), Bound::Unbounded))? + .next() { + let (item_key, item_bytes) = item?; + pos = item_key.to_vec(); let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; diff --git a/src/garage/server.rs b/src/garage/server.rs index b58ad286..69f5d60c 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -38,6 +38,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .flush_every_ms(Some(config.sled_flush_every_ms)) .open() .expect("Unable to open sled DB"); + let db = garage_db::sled_adapter::SledDb::new(db); info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 133fe44e..d908dc01 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_block = { version = "0.7.0", path = "../block" } @@ -30,8 +31,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/garage.rs b/src/model/garage.rs index 2f99bd68..280f3dc7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use netapp::NetworkKey; +use garage_db as db; + use garage_util::background::*; use garage_util::config::*; @@ -33,7 +35,7 @@ pub struct Garage { pub config: Config, /// The local database - pub db: sled::Db, + pub db: db::Db, /// A background job runner pub background: Arc<BackgroundRunner>, /// The membership manager @@ -71,7 +73,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -199,7 +201,7 @@ impl Garage { #[cfg(feature = "k2v")] impl GarageK2V { - fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { + fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); info!("Initialize K2V subscription manager..."); diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 123154d4..33de797d 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use garage_db as db; + use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; @@ -135,7 +137,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CounterSchema> { this_node: Uuid, - local_counter: sled::Tree, + local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -144,7 +146,7 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, - db: &sled::Db, + db: &db::Db, ) -> Arc<Self> { let background = system.background.clone(); @@ -177,12 +179,12 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.transaction(|tx| { - let mut entry = match tx.get(&tree_key[..])? { + let new_entry = self.local_counter.db().transaction(|tx| { + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { Some(old_bytes) => { rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) .map_err(Error::RmpDecode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)? + .map_err(db::TxError::Abort)? } None => LocalCounterEntry { values: BTreeMap::new(), @@ -197,8 +199,8 @@ impl<T: CounterSchema> IndexCounter<T> { let new_entry_bytes = rmp_to_vec_all_named(&entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - tx.insert(&tree_key[..], new_entry_bytes)?; + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; Ok(entry) })?; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 7e61957a..1f063265 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -25,7 +25,7 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; - for res in tree.iter() { + for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6ae50366..6de37cda 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -26,8 +26,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/data.rs b/src/table/data.rs index 5cb10066..ebfae551 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -3,12 +3,12 @@ use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::{IVec, Transactional}; use tokio::sync::Notify; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -25,12 +25,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub instance: F, pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: db::Tree, pub(crate) metrics: TableMetrics, } @@ -40,7 +40,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -55,7 +55,6 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -98,30 +97,30 @@ where None => partition_hash.to_vec(), Some(sk) => self.tree_key(partition_key, sk), }; - let range = self.store.range(first_key..); + let range = self.store.range(first_key..)?; self.read_range_aux(partition_hash, range, filter, limit) } EnumerationOrder::Reverse => match start { Some(sk) => { let last_key = self.tree_key(partition_key, sk); - let range = self.store.range(..=last_key).rev(); + let range = self.store.range_rev(..=last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } None => { let mut last_key = partition_hash.to_vec(); let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); - let range = self.store.range(..last_key).rev(); + let range = self.store.range_rev(..last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } }, } } - fn read_range_aux( + fn read_range_aux<'a>( &self, partition_hash: Hash, - range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, + range: db::ValueIter<'a>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { @@ -183,12 +182,10 @@ where tree_key: &[u8], f: impl Fn(Option<F::E>) -> F::E, ) -> Result<Option<F::E>, Error> { - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { + let changed = self.store.db().transaction(|tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } @@ -204,13 +201,17 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; - store.insert(tree_key.to_vec(), new_bytes)?; + tx.insert( + &self.merkle_todo, + tree_key.to_vec(), + new_bytes_hash.as_slice(), + )?; + tx.insert(&self.store, tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -244,11 +245,11 @@ where } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { + let removed = self.store.db().transaction(|tx| { + if let Some(cur_v) = tx.get(&self.store, k)? { if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; return Ok(true); } } @@ -270,12 +271,12 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { + let removed = self.store.db().transaction(|tx| { + if let Some(cur_v) = tx.get(&self.store, k)? { if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + return Ok(Some(cur_v.into_owned())); } } Ok(None) @@ -316,6 +317,6 @@ where } pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + self.gc_todo.len().unwrap() // TODO fix unwrap } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 2a05b6ae..04872a38 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -12,9 +12,10 @@ use futures::select; use futures_util::future::*; use tokio::sync::watch; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_rpc::system::System; @@ -106,7 +107,7 @@ where // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table // (see update_entry in data.rs) - for entry_kv in self.data.gc_todo.iter() { + for entry_kv in self.data.gc_todo.iter()? { let (k, vhash) = entry_kv?; let mut todo_entry = GcTodoEntry::parse(&k, &vhash); @@ -353,17 +354,17 @@ impl GcTodoEntry { } /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree - pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { + pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self { Self { - tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), - key: sled_k[8..].to_vec(), - value_hash: Hash::try_from(sled_v).unwrap(), + tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()), + key: db_k[8..].to_vec(), + value_hash: Hash::try_from(db_v).unwrap(), value: None, } } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { + pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -373,12 +374,15 @@ impl GcTodoEntry { /// This is usefull to remove a todo entry only under the condition /// that it has not changed since the time it was read, i.e. /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { - let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( - &self.todo_table_key()[..], - Some(self.value_hash), - None, - )?; + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { + let key = self.todo_table_key(); + gc_todo_tree.db().transaction(|tx| { + let old_val = tx.get(gc_todo_tree, &key)?; + if old_val == Some(self.value_hash.as_slice().into()) { + tx.remove(gc_todo_tree, &key)?; + } + tx.commit(()) + })?; Ok(()) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..4b0b44ce 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,11 +4,10 @@ use std::time::Duration; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; @@ -90,7 +89,8 @@ where async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { + if let Some(x) = self.data.merkle_todo.iter().unwrap().next() { + // TODO unwrap to remove match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { @@ -137,13 +137,18 @@ where }; self.data .merkle_tree + .db() .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|tx| { + let old_val = tx.get(&self.data.merkle_todo, k)?; + if old_val == Some(vhash_by.into()) { + tx.remove(&self.data.merkle_todo, k)?; + tx.commit(true) + } else { + tx.commit(false) + } + })?; if !deleted { debug!( @@ -157,12 +162,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option<Hash>, - ) -> ConflictableTransactionResult<Option<Hash>, Error> { + ) -> db::TxResult<Option<Hash>, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +208,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +288,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult<MerkleNode, Error> { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult<MerkleNode, Error> { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult<Hash, Error> { + ) -> db::TxResult<Hash, Error> { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -316,11 +320,11 @@ where } pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + self.data.merkle_tree.len().unwrap() // TODO fix unwrap } pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() + self.data.merkle_todo.len().unwrap() // TODO fix unwrap } } @@ -347,7 +351,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + fn decode_opt(ent: Option<db::Value<'_>>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 752a2a6d..3318de88 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,6 +1,6 @@ use opentelemetry::{global, metrics::*, KeyValue}; -use garage_util::sled_counter::SledCountedTree; +use garage_db as db; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { @@ -19,11 +19,7 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new( - table_name: &'static str, - merkle_todo: sled::Tree, - gc_todo: SledCountedTree, - ) -> Self { + pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: db::Tree) -> Self { let meter = global::meter(table_name); TableMetrics { _merkle_todo_len: meter @@ -31,7 +27,7 @@ impl TableMetrics { "table.merkle_updater_todo_queue_length", move |observer| { observer.observe( - merkle_todo.len() as u64, + merkle_todo.len().unwrap() as u64, // TODO fix unwrap &[KeyValue::new("table_name", table_name)], ) }, @@ -43,7 +39,7 @@ impl TableMetrics { "table.gc_todo_queue_length", move |observer| { observer.observe( - gc_todo.len() as u64, + gc_todo.len().unwrap() as u64, // TODO fix unwrap &[KeyValue::new("table_name", table_name)], ) }, diff --git a/src/table/sync.rs b/src/table/sync.rs index 08069ad0..87dfd1d8 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -258,7 +258,7 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec())? { let (key, value) = item?; items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); @@ -603,7 +603,8 @@ impl SyncTodo { let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { + if data.store.range(begin..end).unwrap().next().is_none() { + // TODO fix unwrap continue; } } diff --git a/src/table/table.rs b/src/table/table.rs index 2a167604..3c211728 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -13,6 +13,8 @@ use opentelemetry::{ Context, }; +use garage_db as db; + use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -69,7 +71,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> { + pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 95cde531..5d073436 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -14,6 +14,8 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } + blake2 = "0.9" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } @@ -22,8 +24,6 @@ tracing = "0.1.30" rand = "0.8" sha2 = "0.9" -sled = "0.34" - chrono = "0.4" rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/util/error.rs b/src/util/error.rs index 8734a0c8..9995c746 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -26,8 +26,8 @@ pub enum Error { #[error(display = "Netapp error: {}", _0)] Netapp(#[error(source)] netapp::error::Error), - #[error(display = "Sled error: {}", _0)] - Sled(#[error(source)] sled::Error), + #[error(display = "DB error: {}", _0)] + Db(#[error(source)] garage_db::Error), #[error(display = "Messagepack encode error: {}", _0)] RmpEncode(#[error(source)] rmp_serde::encode::Error), @@ -78,11 +78,11 @@ impl Error { } } -impl From<sled::transaction::TransactionError<Error>> for Error { - fn from(e: sled::transaction::TransactionError<Error>) -> Error { +impl From<garage_db::TxError<Error>> for Error { + fn from(e: garage_db::TxError<Error>) -> Error { match e { - sled::transaction::TransactionError::Abort(x) => x, - sled::transaction::TransactionError::Storage(x) => Error::Sled(x), + garage_db::TxError::Abort(x) => x, + garage_db::TxError::Db(x) => Error::Db(x), } } } diff --git a/src/util/lib.rs b/src/util/lib.rs index d8ffdd0b..8ca6e310 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,7 +11,7 @@ pub mod error; pub mod formater; pub mod metrics; pub mod persister; -pub mod sled_counter; +//pub mod sled_counter; pub mod time; pub mod token_bucket; pub mod tranquilizer; |