aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/cluster.rs2
-rw-r--r--src/block/Cargo.toml3
-rw-r--r--src/block/manager.rs127
-rw-r--r--src/block/metrics.rs4
-rw-r--r--src/block/rc.rs49
-rw-r--r--src/db/Cargo.toml36
-rw-r--r--src/db/bin/convert.rs76
-rw-r--r--src/db/counted_tree_hack.rs127
-rw-r--r--src/db/lib.rs400
-rw-r--r--src/db/lmdb_adapter.rs329
-rw-r--r--src/db/sled_adapter.rs260
-rw-r--r--src/db/sqlite_adapter.rs500
-rw-r--r--src/db/test.rs106
-rw-r--r--src/garage/Cargo.toml3
-rw-r--r--src/garage/admin.rs45
-rw-r--r--src/garage/repair.rs50
-rw-r--r--src/garage/server.rs54
-rw-r--r--src/garage/tests/bucket.rs8
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/garage.rs8
-rw-r--r--src/model/index_counter.rs62
-rw-r--r--src/model/k2v/item_table.rs24
-rw-r--r--src/model/migrate.rs6
-rw-r--r--src/model/s3/block_ref_table.rs21
-rw-r--r--src/model/s3/object_table.rs12
-rw-r--r--src/model/s3/version_table.rs13
-rw-r--r--src/table/Cargo.toml3
-rw-r--r--src/table/data.rs113
-rw-r--r--src/table/gc.rs41
-rw-r--r--src/table/merkle.rs101
-rw-r--r--src/table/metrics.rs21
-rw-r--r--src/table/schema.rs19
-rw-r--r--src/table/sync.rs16
-rw-r--r--src/table/table.rs4
-rw-r--r--src/util/Cargo.toml4
-rw-r--r--src/util/config.rs11
-rw-r--r--src/util/error.rs12
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/sled_counter.rs100
39 files changed, 2362 insertions, 413 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 6d01317d..4b7716a3 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -19,6 +19,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
let res = GetClusterStatusResponse {
node: hex::encode(garage.system.id),
garage_version: garage.system.garage_version(),
+ db_engine: garage.db.engine(),
known_nodes: garage
.system
.get_known_nodes()
@@ -98,6 +99,7 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
struct GetClusterStatusResponse {
node: String,
garage_version: &'static str,
+ db_engine: String,
known_nodes: HashMap<String, KnownNodeResp>,
layout: GetClusterLayoutResponse,
}
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 9cba69ee..80346aca 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.7.0", path = "../table" }
@@ -27,8 +28,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 9b2d9cad..32ba0431 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,5 @@
+use core::ops::Bound;
+
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -17,10 +19,12 @@ use opentelemetry::{
Context, KeyValue,
};
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -91,9 +95,9 @@ pub struct BlockManager {
rc: BlockRc,
- resync_queue: SledCountedTree,
+ resync_queue: CountedTree,
resync_notify: Notify,
- resync_errors: SledCountedTree,
+ resync_errors: CountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -108,7 +112,7 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
- db: &sled::Db,
+ db: &db::Db,
data_dir: PathBuf,
compression_level: Option<i32>,
background_tranquility: u32,
@@ -123,12 +127,14 @@ impl BlockManager {
let resync_queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let resync_queue = SledCountedTree::new(resync_queue);
+ let resync_queue =
+ CountedTree::new(resync_queue).expect("Could not count block_local_resync_queue");
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
- let resync_errors = SledCountedTree::new(resync_errors);
+ let resync_errors =
+ CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
let endpoint = system
.netapp
@@ -219,11 +225,44 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
- for (i, entry) in self.rc.rc.iter().enumerate() {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- if i & 0xFF == 0 && *must_exit.borrow() {
+ let mut next_start: Option<Hash> = None;
+ loop {
+ // We have to do this complicated two-step process where we first read a bunch
+ // of hashes from the RC table, and then insert them in the to-resync queue,
+ // because of SQLite. Basically, as long as we have an iterator on a DB table,
+ // we can't do anything else on the DB. The naive approach (which we had previously)
+ // of just iterating on the RC table and inserting items one to one in the resync
+ // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
+ // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
+ // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
+ // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
+ let mut batch_of_hashes = vec![];
+ let start_bound = match next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
+ }
+ if batch_of_hashes.is_empty() {
+ break;
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.put_to_resync(&hash, Duration::from_secs(0))?;
+ next_start = Some(hash)
+ }
+
+ if *must_exit.borrow() {
return Ok(());
}
}
@@ -264,46 +303,69 @@ impl BlockManager {
}
/// Get lenght of resync queue
- pub fn resync_queue_len(&self) -> usize {
- self.resync_queue.len()
+ pub fn resync_queue_len(&self) -> Result<usize, Error> {
+ // This currently can't return an error because the CountedTree hack
+ // doesn't error on .len(), but this will change when we remove the hack
+ // (hopefully someday!)
+ Ok(self.resync_queue.len())
}
/// Get number of blocks that have an error
- pub fn resync_errors_len(&self) -> usize {
- self.resync_errors.len()
+ pub fn resync_errors_len(&self) -> Result<usize, Error> {
+ // (see resync_queue_len comment)
+ Ok(self.resync_errors.len())
}
/// Get number of items in the refcount table
- pub fn rc_len(&self) -> usize {
- self.rc.rc.len()
+ pub fn rc_len(&self) -> Result<usize, Error> {
+ Ok(self.rc.rc.len()?)
}
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
- pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.block_incref(hash)? {
+ pub fn block_incref(
+ self: &Arc<Self>,
+ tx: &mut db::Transaction,
+ hash: Hash,
+ ) -> db::TxOpResult<()> {
+ if self.rc.block_incref(tx, &hash)? {
// When the reference counter is incremented, there is
// normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not
// we will fecth it from someone.
- self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
+ let this = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
+ error!("Block {:?} could not be put in resync queue: {}.", hash, e);
+ }
+ });
}
Ok(())
}
/// Decrement the number of time a block is used
- pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.block_decref(hash)? {
+ pub fn block_decref(
+ self: &Arc<Self>,
+ tx: &mut db::Transaction,
+ hash: Hash,
+ ) -> db::TxOpResult<()> {
+ if self.rc.block_decref(tx, &hash)? {
// When the RC is decremented, it might drop to zero,
// indicating that we don't need the block.
// There is a delay before we garbage collect it;
// make sure that it is handled in the resync loop
// after that delay has passed.
- self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?;
+ let this = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10))
+ {
+ error!("Block {:?} could not be put in resync queue: {}.", hash, e);
+ }
+ });
}
Ok(())
}
@@ -503,12 +565,12 @@ impl BlockManager {
});
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> {
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
- fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> {
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -547,13 +609,8 @@ impl BlockManager {
// - Ok(true) -> a block was processed (successfully or not)
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(
- &self,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<bool, sled::Error> {
- if let Some(first_pair_res) = self.resync_queue.iter().next() {
- let (time_bytes, hash_bytes) = first_pair_res?;
-
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -561,7 +618,7 @@ impl BlockManager {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
- let ec = ErrorCounter::decode(ec);
+ let ec = ErrorCounter::decode(&ec);
if now < ec.next_try() {
// if next retry after an error is not yet,
// don't do resync and return early, but still
@@ -602,7 +659,7 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.resync_errors.get(hash.as_slice())? {
- Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
+ Some(ec) => ErrorCounter::decode(&ec).add1(now + 1),
None => ErrorCounter::new(now + 1),
};
@@ -966,7 +1023,7 @@ impl ErrorCounter {
}
}
- fn decode(data: sled::IVec) -> Self {
+ fn decode(data: &[u8]) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index f0f541a3..477add66 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,6 +1,6 @@
use opentelemetry::{global, metrics::*};
-use garage_util::sled_counter::SledCountedTree;
+use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
@@ -23,7 +23,7 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
+ pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
diff --git a/src/block/rc.rs b/src/block/rc.rs
index ec3ea44e..ce6defad 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -7,31 +9,41 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
pub struct BlockRc {
- pub(crate) rc: sled::Tree,
+ pub(crate) rc: db::Tree,
}
impl BlockRc {
- pub(crate) fn new(rc: sled::Tree) -> Self {
+ pub(crate) fn new(rc: db::Tree) -> Self {
Self { rc }
}
/// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero.
- pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
- let old_rc = self
- .rc
- .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?;
- let old_rc = RcEntry::parse_opt(old_rc);
+ pub(crate) fn block_incref(
+ &self,
+ tx: &mut db::Transaction,
+ hash: &Hash,
+ ) -> db::TxOpResult<bool> {
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ match old_rc.increment().serialize() {
+ Some(x) => tx.insert(&self.rc, &hash, x)?,
+ None => unreachable!(),
+ };
Ok(old_rc.is_zero())
}
/// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero.
- pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
- let new_rc = self
- .rc
- .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?;
- let new_rc = RcEntry::parse_opt(new_rc);
+ pub(crate) fn block_decref(
+ &self,
+ tx: &mut db::Transaction,
+ hash: &Hash,
+ ) -> db::TxOpResult<bool> {
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
+ match new_rc.serialize() {
+ Some(x) => tx.insert(&self.rc, &hash, x)?,
+ None => tx.remove(&self.rc, &hash)?,
+ };
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
@@ -44,12 +56,15 @@ impl BlockRc {
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.update_and_fetch(&hash, |rcval| {
- let updated = match RcEntry::parse_opt(rcval) {
- RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent,
- v => v,
+ self.rc.db().transaction(|mut tx| {
+ let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ match rcval {
+ RcEntry::Deletable { at_time } if now > at_time => {
+ tx.remove(&self.rc, &hash)?;
+ }
+ _ => (),
};
- updated.serialize()
+ tx.commit(())
})?;
Ok(())
}
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
new file mode 100644
index 00000000..6d8f64be
--- /dev/null
+++ b/src/db/Cargo.toml
@@ -0,0 +1,36 @@
+[package]
+name = "garage_db"
+version = "0.8.0"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Abstraction over multiple key/value storage engines that supports transactions"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+[[bin]]
+name = "convert"
+path = "bin/convert.rs"
+required-features = ["cli"]
+
+[dependencies]
+err-derive = "0.3"
+hexdump = "0.1"
+log = "0.4"
+
+heed = "0.11"
+rusqlite = { version = "0.27", features = ["bundled"] }
+sled = "0.34"
+
+# cli deps
+clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
+pretty_env_logger = { version = "0.4", optional = true }
+
+[dev-dependencies]
+mktemp = "0.4"
+
+[features]
+cli = ["clap", "pretty_env_logger"]
diff --git a/src/db/bin/convert.rs b/src/db/bin/convert.rs
new file mode 100644
index 00000000..9e45e61f
--- /dev/null
+++ b/src/db/bin/convert.rs
@@ -0,0 +1,76 @@
+use std::path::PathBuf;
+
+use garage_db::*;
+
+use clap::Parser;
+
+/// K2V command line interface
+#[derive(Parser, Debug)]
+#[clap(author, version, about, long_about = None)]
+struct Args {
+ /// Input DB path
+ #[clap(short = 'i')]
+ input_path: PathBuf,
+ /// Input DB engine
+ #[clap(short = 'a')]
+ input_engine: String,
+
+ /// Output DB path
+ #[clap(short = 'o')]
+ output_path: PathBuf,
+ /// Output DB engine
+ #[clap(short = 'b')]
+ output_engine: String,
+}
+
+fn main() {
+ let args = Args::parse();
+ pretty_env_logger::init();
+
+ match do_conversion(args) {
+ Ok(()) => println!("Success!"),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+}
+
+fn do_conversion(args: Args) -> Result<()> {
+ let input = open_db(args.input_path, args.input_engine)?;
+ let output = open_db(args.output_path, args.output_engine)?;
+ output.import(&input)?;
+ Ok(())
+}
+
+fn open_db(path: PathBuf, engine: String) -> Result<Db> {
+ match engine.as_str() {
+ "sled" => {
+ let db = sled_adapter::sled::Config::default().path(&path).open()?;
+ Ok(sled_adapter::SledDb::init(db))
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ let db = sqlite_adapter::rusqlite::Connection::open(&path)?;
+ Ok(sqlite_adapter::SqliteDb::init(db))
+ }
+ "lmdb" | "heed" => {
+ std::fs::create_dir_all(&path).map_err(|e| {
+ Error(format!("Unable to create LMDB data directory: {}", e).into())
+ })?;
+
+ let map_size = if u32::MAX as usize == usize::MAX {
+ eprintln!(
+ "LMDB is not recommended on 32-bit systems, database size will be limited"
+ );
+ 1usize << 30 // 1GB for 32-bit systems
+ } else {
+ 1usize << 40 // 1TB for 64-bit systems
+ };
+
+ let db = lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&path)
+ .unwrap();
+ Ok(lmdb_adapter::LmdbDb::init(db))
+ }
+ e => Err(Error(format!("Invalid DB engine: {}", e).into())),
+ }
+}
diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs
new file mode 100644
index 00000000..bbe943a2
--- /dev/null
+++ b/src/db/counted_tree_hack.rs
@@ -0,0 +1,127 @@
+//! This hack allows a db tree to keep in RAM a counter of the number of entries
+//! it contains, which is used to call .len() on it. This is usefull only for
+//! the sled backend where .len() otherwise would have to traverse the whole
+//! tree to count items. For sqlite and lmdb, this is mostly useless (but
+//! hopefully not harmfull!). Note that a CountedTree cannot be part of a
+//! transaction.
+
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use crate::{Result, Tree, TxError, Value, ValueIter};
+
+#[derive(Clone)]
+pub struct CountedTree(Arc<CountedTreeInternal>);
+
+struct CountedTreeInternal {
+ tree: Tree,
+ len: AtomicUsize,
+}
+
+impl CountedTree {
+ pub fn new(tree: Tree) -> Result<Self> {
+ let len = tree.len()?;
+ Ok(Self(Arc::new(CountedTreeInternal {
+ tree,
+ len: AtomicUsize::new(len),
+ })))
+ }
+
+ pub fn len(&self) -> usize {
+ self.0.len.load(Ordering::SeqCst)
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> {
+ self.0.tree.get(key)
+ }
+
+ pub fn first(&self) -> Result<Option<(Value, Value)>> {
+ self.0.tree.first()
+ }
+
+ pub fn iter(&self) -> Result<ValueIter<'_>> {
+ self.0.tree.iter()
+ }
+
+ // ---- writing functions ----
+
+ pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<Value>>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ let old_val = self.0.tree.insert(key, value)?;
+ if old_val.is_none() {
+ self.0.len.fetch_add(1, Ordering::SeqCst);
+ }
+ Ok(old_val)
+ }
+
+ pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> {
+ let old_val = self.0.tree.remove(key)?;
+ if old_val.is_some() {
+ self.0.len.fetch_sub(1, Ordering::SeqCst);
+ }
+ Ok(old_val)
+ }
+
+ pub fn compare_and_swap<K, OV, NV>(
+ &self,
+ key: K,
+ expected_old: Option<OV>,
+ new: Option<NV>,
+ ) -> Result<bool>
+ where
+ K: AsRef<[u8]>,
+ OV: AsRef<[u8]>,
+ NV: AsRef<[u8]>,
+ {
+ let old_some = expected_old.is_some();
+ let new_some = new.is_some();
+
+ let tx_res = self.0.tree.db().transaction(|mut tx| {
+ let old_val = tx.get(&self.0.tree, &key)?;
+ let is_same = match (&old_val, &expected_old) {
+ (None, None) => true,
+ (Some(x), Some(y)) if x == y.as_ref() => true,
+ _ => false,
+ };
+ if is_same {
+ match &new {
+ Some(v) => {
+ tx.insert(&self.0.tree, &key, v)?;
+ }
+ None => {
+ tx.remove(&self.0.tree, &key)?;
+ }
+ }
+ tx.commit(())
+ } else {
+ tx.abort(())
+ }
+ });
+
+ match tx_res {
+ Ok(()) => {
+ match (old_some, new_some) {
+ (false, true) => {
+ self.0.len.fetch_add(1, Ordering::SeqCst);
+ }
+ (true, false) => {
+ self.0.len.fetch_sub(1, Ordering::SeqCst);
+ }
+ _ => (),
+ }
+ Ok(true)
+ }
+ Err(TxError::Abort(())) => Ok(false),
+ Err(TxError::Db(e)) => Err(e),
+ }
+ }
+}
diff --git a/src/db/lib.rs b/src/db/lib.rs
new file mode 100644
index 00000000..e9d3ea18
--- /dev/null
+++ b/src/db/lib.rs
@@ -0,0 +1,400 @@
+pub mod lmdb_adapter;
+pub mod sled_adapter;
+pub mod sqlite_adapter;
+
+pub mod counted_tree_hack;
+
+#[cfg(test)]
+pub mod test;
+
+use core::ops::{Bound, RangeBounds};
+
+use std::borrow::Cow;
+use std::cell::Cell;
+use std::sync::Arc;
+
+use err_derive::Error;
+
+#[derive(Clone)]
+pub struct Db(pub(crate) Arc<dyn IDb>);
+
+pub struct Transaction<'a>(&'a mut dyn ITx);
+
+#[derive(Clone)]
+pub struct Tree(Arc<dyn IDb>, usize);
+
+pub type Value = Vec<u8>;
+pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value, Value)>> + 'a>;
+pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, Value)>> + 'a>;
+
+// ----
+
+#[derive(Debug, Error)]
+#[error(display = "{}", _0)]
+pub struct Error(pub Cow<'static, str>);
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug, Error)]
+#[error(display = "{}", _0)]
+pub struct TxOpError(pub(crate) Error);
+pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
+
+pub enum TxError<E> {
+ Abort(E),
+ Db(Error),
+}
+pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
+
+impl<E> From<TxOpError> for TxError<E> {
+ fn from(e: TxOpError) -> TxError<E> {
+ TxError::Db(e.0)
+ }
+}
+
+pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> {
+ match res {
+ Ok(v) => Ok(Ok(v)),
+ Err(TxError::Abort(e)) => Ok(Err(e)),
+ Err(TxError::Db(e)) => Err(TxOpError(e)),
+ }
+}
+
+// ----
+
+impl Db {
+ pub fn engine(&self) -> String {
+ self.0.engine()
+ }
+
+ pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
+ let tree_id = self.0.open_tree(name.as_ref())?;
+ Ok(Tree(self.0.clone(), tree_id))
+ }
+
+ pub fn list_trees(&self) -> Result<Vec<String>> {
+ self.0.list_trees()
+ }
+
+ pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
+ where
+ F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ {
+ let f = TxFn {
+ function: fun,
+ result: Cell::new(None),
+ };
+ let tx_res = self.0.transaction(&f);
+ let ret = f
+ .result
+ .into_inner()
+ .expect("Transaction did not store result");
+
+ match tx_res {
+ Ok(()) => {
+ assert!(matches!(ret, Ok(_)));
+ ret
+ }
+ Err(TxError::Abort(())) => {
+ assert!(matches!(ret, Err(TxError::Abort(_))));
+ ret
+ }
+ Err(TxError::Db(e2)) => match ret {
+ // Ok was stored -> the error occured when finalizing
+ // transaction
+ Ok(_) => Err(TxError::Db(e2)),
+ // An error was already stored: that's the one we want to
+ // return
+ Err(TxError::Db(e)) => Err(TxError::Db(e)),
+ _ => unreachable!(),
+ },
+ }
+ }
+
+ pub fn import(&self, other: &Db) -> Result<()> {
+ let existing_trees = self.list_trees()?;
+ if !existing_trees.is_empty() {
+ return Err(Error(
+ format!(
+ "destination database already contains data: {:?}",
+ existing_trees
+ )
+ .into(),
+ ));
+ }
+
+ let tree_names = other.list_trees()?;
+ for name in tree_names {
+ let tree = self.open_tree(&name)?;
+ if tree.len()? > 0 {
+ return Err(Error(format!("tree {} already contains data", name).into()));
+ }
+
+ let ex_tree = other.open_tree(&name)?;
+
+ let tx_res = self.transaction(|mut tx| {
+ let mut i = 0;
+ for item in ex_tree.iter().map_err(TxError::Abort)? {
+ let (k, v) = item.map_err(TxError::Abort)?;
+ tx.insert(&tree, k, v)?;
+ i += 1;
+ if i % 1000 == 0 {
+ println!("{}: imported {}", name, i);
+ }
+ }
+ tx.commit(i)
+ });
+ let total = match tx_res {
+ Err(TxError::Db(e)) => return Err(e),
+ Err(TxError::Abort(e)) => return Err(e),
+ Ok(x) => x,
+ };
+
+ println!("{}: finished importing, {} items", name, total);
+ }
+ Ok(())
+ }
+}
+
+#[allow(clippy::len_without_is_empty)]
+impl Tree {
+ #[inline]
+ pub fn db(&self) -> Db {
+ Db(self.0.clone())
+ }
+
+ #[inline]
+ pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
+ self.0.get(self.1, key.as_ref())
+ }
+ #[inline]
+ pub fn len(&self) -> Result<usize> {
+ self.0.len(self.1)
+ }
+
+ #[inline]
+ pub fn first(&self) -> Result<Option<(Value, Value)>> {
+ self.iter()?.next().transpose()
+ }
+ #[inline]
+ pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value, Value)>> {
+ self.range((Bound::Excluded(from), Bound::Unbounded))?
+ .next()
+ .transpose()
+ }
+
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
+ &self,
+ key: T,
+ value: U,
+ ) -> Result<Option<Value>> {
+ self.0.insert(self.1, key.as_ref(), value.as_ref())
+ }
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
+ self.0.remove(self.1, key.as_ref())
+ }
+
+ #[inline]
+ pub fn iter(&self) -> Result<ValueIter<'_>> {
+ self.0.iter(self.1)
+ }
+ #[inline]
+ pub fn iter_rev(&self) -> Result<ValueIter<'_>> {
+ self.0.iter_rev(self.1)
+ }
+
+ #[inline]
+ pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range(self.1, get_bound(sb), get_bound(eb))
+ }
+ #[inline]
+ pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range_rev(self.1, get_bound(sb), get_bound(eb))
+ }
+}
+
+#[allow(clippy::len_without_is_empty)]
+impl<'a> Transaction<'a> {
+ #[inline]
+ pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
+ self.0.get(tree.1, key.as_ref())
+ }
+ #[inline]
+ pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
+ self.0.len(tree.1)
+ }
+
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
+ &mut self,
+ tree: &Tree,
+ key: T,
+ value: U,
+ ) -> TxOpResult<Option<Value>> {
+ self.0.insert(tree.1, key.as_ref(), value.as_ref())
+ }
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
+ self.0.remove(tree.1, key.as_ref())
+ }
+
+ #[inline]
+ pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
+ self.0.iter(tree.1)
+ }
+ #[inline]
+ pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
+ self.0.iter_rev(tree.1)
+ }
+
+ #[inline]
+ pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range(tree.1, get_bound(sb), get_bound(eb))
+ }
+ #[inline]
+ pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
+ }
+
+ // ----
+
+ #[inline]
+ pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
+ Err(TxError::Abort(e))
+ }
+
+ #[inline]
+ pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
+ Ok(r)
+ }
+}
+
+// ---- Internal interfaces
+
+pub(crate) trait IDb: Send + Sync {
+ fn engine(&self) -> String;
+ fn open_tree(&self, name: &str) -> Result<usize>;
+ fn list_trees(&self) -> Result<Vec<String>>;
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
+ fn len(&self, tree: usize) -> Result<usize>;
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>;
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>>;
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>>;
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+}
+
+pub(crate) trait ITx {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
+ fn len(&self, tree: usize) -> TxOpResult<usize>;
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>;
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
+
+ fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
+ fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>>;
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>>;
+}
+
+pub(crate) trait ITxFn {
+ fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
+}
+
+pub(crate) enum TxFnResult {
+ Ok,
+ Abort,
+ DbErr,
+}
+
+struct TxFn<F, R, E>
+where
+ F: Fn(Transaction<'_>) -> TxResult<R, E>,
+{
+ function: F,
+ result: Cell<Option<TxResult<R, E>>>,
+}
+
+impl<F, R, E> ITxFn for TxFn<F, R, E>
+where
+ F: Fn(Transaction<'_>) -> TxResult<R, E>,
+{
+ fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
+ let res = (self.function)(Transaction(tx));
+ let res2 = match &res {
+ Ok(_) => TxFnResult::Ok,
+ Err(TxError::Abort(_)) => TxFnResult::Abort,
+ Err(TxError::Db(_)) => TxFnResult::DbErr,
+ };
+ self.result.set(Some(res));
+ res2
+ }
+}
+
+// ----
+
+fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> {
+ match b {
+ Bound::Included(v) => Bound::Included(v.as_ref()),
+ Bound::Excluded(v) => Bound::Excluded(v.as_ref()),
+ Bound::Unbounded => Bound::Unbounded,
+ }
+}
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
new file mode 100644
index 00000000..74622919
--- /dev/null
+++ b/src/db/lmdb_adapter.rs
@@ -0,0 +1,329 @@
+use core::ops::Bound;
+use core::ptr::NonNull;
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::sync::{Arc, RwLock};
+
+use heed::types::ByteSlice;
+use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
+ TxValueIter, Value, ValueIter,
+};
+
+pub use heed;
+
+// -- err
+
+impl From<heed::Error> for Error {
+ fn from(e: heed::Error) -> Error {
+ Error(format!("LMDB: {}", e).into())
+ }
+}
+
+impl From<heed::Error> for TxOpError {
+ fn from(e: heed::Error) -> TxOpError {
+ TxOpError(e.into())
+ }
+}
+
+// -- db
+
+pub struct LmdbDb {
+ db: heed::Env,
+ trees: RwLock<(Vec<Database>, HashMap<String, usize>)>,
+}
+
+impl LmdbDb {
+ pub fn init(db: Env) -> Db {
+ let s = Self {
+ db,
+ trees: RwLock::new((Vec::new(), HashMap::new())),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<Database> {
+ self.trees
+ .read()
+ .unwrap()
+ .0
+ .get(i)
+ .cloned()
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for LmdbDb {
+ fn engine(&self) -> String {
+ "LMDB (using Heed crate)".into()
+ }
+
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.1.get(name) {
+ Ok(*i)
+ } else {
+ let tree = self.db.create_database(Some(name))?;
+ let i = trees.0.len();
+ trees.0.push(tree);
+ trees.1.insert(name.to_string(), i);
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ let tree0 = match self.db.open_database::<heed::types::Str, ByteSlice>(None)? {
+ Some(x) => x,
+ None => return Ok(vec![]),
+ };
+
+ let mut ret = vec![];
+ let tx = self.db.read_txn()?;
+ for item in tree0.iter(&tx)? {
+ let (tree_name, _) = item?;
+ ret.push(tree_name.to_string());
+ }
+ drop(tx);
+
+ let mut ret2 = vec![];
+ for tree_name in ret {
+ if self
+ .db
+ .open_database::<ByteSlice, ByteSlice>(Some(&tree_name))?
+ .is_some()
+ {
+ ret2.push(tree_name);
+ }
+ }
+
+ Ok(ret2)
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+
+ let tx = self.db.read_txn()?;
+ let val = tree.get(&tx, key)?;
+ match val {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.to_vec())),
+ }
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ Ok(tree.len(&tx)?.try_into().unwrap())
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let mut tx = self.db.write_txn()?;
+ let old_val = tree.get(&tx, key)?.map(Vec::from);
+ tree.put(&mut tx, key, value)?;
+ tx.commit()?;
+ Ok(old_val)
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let mut tx = self.db.write_txn()?;
+ let old_val = tree.get(&tx, key)?.map(Vec::from);
+ tree.delete(&mut tx, key)?;
+ tx.commit()?;
+ Ok(old_val)
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?))
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?))
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?))
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?))
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let mut tx = LmdbTx {
+ trees: &trees.0[..],
+ tx: self
+ .db
+ .write_txn()
+ .map_err(Error::from)
+ .map_err(TxError::Db)?,
+ };
+
+ let res = f.try_on(&mut tx);
+ match res {
+ TxFnResult::Ok => {
+ tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Db(Error(
+ "(this message will be discarded)".into(),
+ )))
+ }
+ }
+ }
+}
+
+// ----
+
+struct LmdbTx<'a> {
+ trees: &'a [Database],
+ tx: RwTxn<'a, 'a>,
+}
+
+impl<'a> LmdbTx<'a> {
+ fn get_tree(&self, i: usize) -> TxOpResult<&Database> {
+ self.trees.get(i).ok_or_else(|| {
+ TxOpError(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ })
+ }
+}
+
+impl<'a> ITx for LmdbTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ match tree.get(&self.tx, key)? {
+ Some(v) => Ok(Some(v.to_vec())),
+ None => Ok(None),
+ }
+ }
+ fn len(&self, _tree: usize) -> TxOpResult<usize> {
+ unimplemented!(".len() in transaction not supported with LMDB backend")
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = *self.get_tree(tree)?;
+ let old_val = tree.get(&self.tx, key)?.map(Vec::from);
+ tree.put(&mut self.tx, key, value)?;
+ Ok(old_val)
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = *self.get_tree(tree)?;
+ let old_val = tree.get(&self.tx, key)?.map(Vec::from);
+ tree.delete(&mut self.tx, key)?;
+ Ok(old_val)
+ }
+
+ fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+ fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+}
+
+// ----
+
+type IteratorItem<'a> = heed::Result<(
+ <ByteSlice as BytesDecode<'a>>::DItem,
+ <ByteSlice as BytesDecode<'a>>::DItem,
+)>;
+
+struct TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ tx: RoTxn<'a>,
+ iter: Option<I>,
+}
+
+impl<'a, I> TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ fn make<F>(tx: RoTxn<'a>, iterfun: F) -> Result<ValueIter<'a>>
+ where
+ F: FnOnce(&'a RoTxn<'a>) -> Result<I>,
+ {
+ let mut res = TxAndIterator { tx, iter: None };
+
+ let tx = unsafe { NonNull::from(&res.tx).as_ref() };
+ res.iter = Some(iterfun(tx)?);
+
+ Ok(Box::new(res))
+ }
+}
+
+impl<'a, I> Drop for TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ fn drop(&mut self) {
+ drop(self.iter.take());
+ }
+}
+
+impl<'a, I> Iterator for TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ type Item = Result<(Value, Value)>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.iter.as_mut().unwrap().next() {
+ None => None,
+ Some(Err(e)) => Some(Err(e.into())),
+ Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))),
+ }
+ }
+}
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
new file mode 100644
index 00000000..982f8d82
--- /dev/null
+++ b/src/db/sled_adapter.rs
@@ -0,0 +1,260 @@
+use core::ops::Bound;
+
+use std::cell::Cell;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use sled::transaction::{
+ ConflictableTransactionError, TransactionError, Transactional, TransactionalTree,
+ UnabortableTransactionError,
+};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
+ TxValueIter, Value, ValueIter,
+};
+
+pub use sled;
+
+// -- err
+
+impl From<sled::Error> for Error {
+ fn from(e: sled::Error) -> Error {
+ Error(format!("Sled: {}", e).into())
+ }
+}
+
+impl From<sled::Error> for TxOpError {
+ fn from(e: sled::Error) -> TxOpError {
+ TxOpError(e.into())
+ }
+}
+
+// -- db
+
+pub struct SledDb {
+ db: sled::Db,
+ trees: RwLock<(Vec<sled::Tree>, HashMap<String, usize>)>,
+}
+
+impl SledDb {
+ pub fn init(db: sled::Db) -> Db {
+ let s = Self {
+ db,
+ trees: RwLock::new((Vec::new(), HashMap::new())),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<sled::Tree> {
+ self.trees
+ .read()
+ .unwrap()
+ .0
+ .get(i)
+ .cloned()
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for SledDb {
+ fn engine(&self) -> String {
+ "Sled".into()
+ }
+
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.1.get(name) {
+ Ok(*i)
+ } else {
+ let tree = self.db.open_tree(name)?;
+ let i = trees.0.len();
+ trees.0.push(tree);
+ trees.1.insert(name.to_string(), i);
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ let mut trees = vec![];
+ for name in self.db.tree_names() {
+ let name = std::str::from_utf8(&name)
+ .map_err(|e| Error(format!("{}", e).into()))?
+ .to_string();
+ if name != "__sled__default" {
+ trees.push(name);
+ }
+ }
+ Ok(trees)
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let val = tree.get(key)?;
+ Ok(val.map(|x| x.to_vec()))
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ let tree = self.get_tree(tree)?;
+ Ok(tree.len())
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = tree.insert(key, value)?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = tree.remove(key)?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.iter().map(|v| {
+ v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into)
+ })))
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.iter().rev().map(|v| {
+ v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into)
+ })))
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| {
+ v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into)
+ })))
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map(
+ |v| v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into),
+ )))
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let res = trees.0.transaction(|txtrees| {
+ let mut tx = SledTx {
+ trees: txtrees,
+ err: Cell::new(None),
+ };
+ match f.try_on(&mut tx) {
+ TxFnResult::Ok => {
+ assert!(tx.err.into_inner().is_none());
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ assert!(tx.err.into_inner().is_none());
+ Err(ConflictableTransactionError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ let e = tx.err.into_inner().expect("No DB error");
+ Err(e.into())
+ }
+ }
+ });
+ match res {
+ Ok(()) => Ok(()),
+ Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
+ Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
+ }
+ }
+}
+
+// ----
+
+struct SledTx<'a> {
+ trees: &'a [TransactionalTree],
+ err: Cell<Option<UnabortableTransactionError>>,
+}
+
+impl<'a> SledTx<'a> {
+ fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalTree> {
+ self.trees.get(i).ok_or_else(|| {
+ TxOpError(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ })
+ }
+
+ fn save_error<R>(
+ &self,
+ v: std::result::Result<R, UnabortableTransactionError>,
+ ) -> TxOpResult<R> {
+ match v {
+ Ok(x) => Ok(x),
+ Err(e) => {
+ let txt = format!("{}", e);
+ self.err.set(Some(e));
+ Err(TxOpError(Error(txt.into())))
+ }
+ }
+ }
+}
+
+impl<'a> ITx for SledTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let tmp = self.save_error(tree.get(key))?;
+ Ok(tmp.map(|x| x.to_vec()))
+ }
+ fn len(&self, _tree: usize) -> TxOpResult<usize> {
+ unimplemented!(".len() in transaction not supported with Sled backend")
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.save_error(tree.insert(key, value))?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.save_error(tree.remove(key))?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+
+ fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+ fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+}
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
new file mode 100644
index 00000000..14bf35ff
--- /dev/null
+++ b/src/db/sqlite_adapter.rs
@@ -0,0 +1,500 @@
+use core::ops::Bound;
+
+use std::borrow::BorrowMut;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::{Arc, Mutex, MutexGuard};
+
+use log::trace;
+
+use rusqlite::{params, Connection, Rows, Statement, Transaction};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
+ TxValueIter, Value, ValueIter,
+};
+
+pub use rusqlite;
+
+// --- err
+
+impl From<rusqlite::Error> for Error {
+ fn from(e: rusqlite::Error) -> Error {
+ Error(format!("Sqlite: {}", e).into())
+ }
+}
+
+impl From<rusqlite::Error> for TxOpError {
+ fn from(e: rusqlite::Error) -> TxOpError {
+ TxOpError(e.into())
+ }
+}
+
+// -- db
+
+pub struct SqliteDb(Mutex<SqliteDbInner>);
+
+struct SqliteDbInner {
+ db: Connection,
+ trees: Vec<String>,
+}
+
+impl SqliteDb {
+ pub fn init(db: rusqlite::Connection) -> Db {
+ let s = Self(Mutex::new(SqliteDbInner {
+ db,
+ trees: Vec::new(),
+ }));
+ Db(Arc::new(s))
+ }
+}
+
+impl SqliteDbInner {
+ fn get_tree(&self, i: usize) -> Result<&'_ str> {
+ self.trees
+ .get(i)
+ .map(String::as_str)
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+
+ fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
+ let mut stmt = self
+ .db
+ .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let mut res_iter = stmt.query([key])?;
+ match res_iter.next()? {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
+ }
+ }
+}
+
+impl IDb for SqliteDb {
+ fn engine(&self) -> String {
+ format!("sqlite3 v{} (using rusqlite crate)", rusqlite::version())
+ }
+
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let name = format!("tree_{}", name.replace(':', "_COLON_"));
+ let mut this = self.0.lock().unwrap();
+
+ if let Some(i) = this.trees.iter().position(|x| x == &name) {
+ Ok(i)
+ } else {
+ trace!("create table {}", name);
+ this.db.execute(
+ &format!(
+ "CREATE TABLE IF NOT EXISTS {} (
+ k BLOB PRIMARY KEY,
+ v BLOB
+ )",
+ name
+ ),
+ [],
+ )?;
+ trace!("table created: {}, unlocking", name);
+
+ let i = this.trees.len();
+ this.trees.push(name.to_string());
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ let mut trees = vec![];
+
+ trace!("list_trees: lock db");
+ let this = self.0.lock().unwrap();
+ trace!("list_trees: lock acquired");
+
+ let mut stmt = this.db.prepare(
+ "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
+ )?;
+ let mut rows = stmt.query([])?;
+ while let Some(row) = rows.next()? {
+ let name = row.get::<_, String>(0)?;
+ let name = name.replace("_COLON_", ":");
+ let name = name.strip_prefix("tree_").unwrap().to_string();
+ trees.push(name);
+ }
+ Ok(trees)
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ trace!("get {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("get {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ this.internal_get(tree, key)
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ trace!("len {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("len {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut res_iter = stmt.query([])?;
+ match res_iter.next()? {
+ None => Ok(0),
+ Some(v) => Ok(v.get::<_, usize>(0)?),
+ }
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
+ trace!("insert {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("insert {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let old_val = this.internal_get(tree, key)?;
+
+ let sql = match &old_val {
+ Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
+ None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
+ };
+ let n = this.db.execute(&sql, params![key, value])?;
+ assert_eq!(n, 1);
+
+ Ok(old_val)
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ trace!("remove {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("remove {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let old_val = this.internal_get(tree, key)?;
+
+ if old_val.is_some() {
+ let n = this
+ .db
+ .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ assert_eq!(n, 1);
+ }
+
+ Ok(old_val)
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ trace!("iter {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("iter {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
+ DbValueIterator::make(this, &sql, [])
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ trace!("iter_rev {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("iter_rev {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
+ DbValueIterator::make(this, &sql, [])
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ trace!("range {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("range {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+
+ let (bounds_sql, params) = bounds_sql(low, high);
+ let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
+
+ let params = params
+ .iter()
+ .map(|x| x as &dyn rusqlite::ToSql)
+ .collect::<Vec<_>>();
+
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ trace!("range_rev {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("range_rev {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+
+ let (bounds_sql, params) = bounds_sql(low, high);
+ let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
+
+ let params = params
+ .iter()
+ .map(|x| x as &dyn rusqlite::ToSql)
+ .collect::<Vec<_>>();
+
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ trace!("transaction: lock db");
+ let mut this = self.0.lock().unwrap();
+ trace!("transaction: lock acquired");
+
+ let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
+
+ let mut tx = SqliteTx {
+ tx: this_mut_ref
+ .db
+ .transaction()
+ .map_err(Error::from)
+ .map_err(TxError::Db)?,
+ trees: &this_mut_ref.trees,
+ };
+ let res = match f.try_on(&mut tx) {
+ TxFnResult::Ok => {
+ tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Db(Error(
+ "(this message will be discarded)".into(),
+ )))
+ }
+ };
+
+ trace!("transaction done");
+ res
+ }
+}
+
+// ----
+
+struct SqliteTx<'a> {
+ tx: Transaction<'a>,
+ trees: &'a [String],
+}
+
+impl<'a> SqliteTx<'a> {
+ fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
+ self.trees.get(i).map(String::as_ref).ok_or_else(|| {
+ TxOpError(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ })
+ }
+
+ fn internal_get(&self, tree: &str, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let mut stmt = self
+ .tx
+ .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let mut res_iter = stmt.query([key])?;
+ match res_iter.next()? {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
+ }
+ }
+}
+
+impl<'a> ITx for SqliteTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ self.internal_get(tree, key)
+ }
+ fn len(&self, tree: usize) -> TxOpResult<usize> {
+ let tree = self.get_tree(tree)?;
+ let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut res_iter = stmt.query([])?;
+ match res_iter.next()? {
+ None => Ok(0),
+ Some(v) => Ok(v.get::<_, usize>(0)?),
+ }
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.internal_get(tree, key)?;
+
+ let sql = match &old_val {
+ Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
+ None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
+ };
+ let n = self.tx.execute(&sql, params![key, value])?;
+ assert_eq!(n, 1);
+
+ Ok(old_val)
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.internal_get(tree, key)?;
+
+ if old_val.is_some() {
+ let n = self
+ .tx
+ .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ assert_eq!(n, 1);
+ }
+
+ Ok(old_val)
+ }
+
+ fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+ fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+}
+
+// ----
+
+struct DbValueIterator<'a> {
+ db: MutexGuard<'a, SqliteDbInner>,
+ stmt: Option<Statement<'a>>,
+ iter: Option<Rows<'a>>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> DbValueIterator<'a> {
+ fn make<P: rusqlite::Params>(
+ db: MutexGuard<'a, SqliteDbInner>,
+ sql: &str,
+ args: P,
+ ) -> Result<ValueIter<'a>> {
+ let res = DbValueIterator {
+ db,
+ stmt: None,
+ iter: None,
+ _pin: PhantomPinned,
+ };
+ let mut boxed = Box::pin(res);
+ trace!("make iterator with sql: {}", sql);
+
+ unsafe {
+ let db = NonNull::from(&boxed.db);
+ let stmt = db.as_ref().db.prepare(sql)?;
+
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
+
+ let mut stmt = NonNull::from(&boxed.stmt);
+ let iter = stmt.as_mut().as_mut().unwrap().query(args)?;
+
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
+ }
+
+ Ok(Box::new(DbValueIteratorPin(boxed)))
+ }
+}
+
+impl<'a> Drop for DbValueIterator<'a> {
+ fn drop(&mut self) {
+ trace!("drop iter");
+ drop(self.iter.take());
+ drop(self.stmt.take());
+ }
+}
+
+struct DbValueIteratorPin<'a>(Pin<Box<DbValueIterator<'a>>>);
+
+impl<'a> Iterator for DbValueIteratorPin<'a> {
+ type Item = Result<(Value, Value)>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let next = unsafe {
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0);
+ Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next()
+ };
+ let row = match next {
+ Err(e) => return Some(Err(e.into())),
+ Ok(None) => return None,
+ Ok(Some(r)) => r,
+ };
+ let k = match row.get::<_, Vec<u8>>(0) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(x) => x,
+ };
+ let v = match row.get::<_, Vec<u8>>(1) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(y) => y,
+ };
+ Some(Ok((k, v)))
+ }
+}
+
+// ----
+
+fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<Vec<u8>>) {
+ let mut sql = String::new();
+ let mut params: Vec<Vec<u8>> = vec![];
+
+ match low {
+ Bound::Included(b) => {
+ sql.push_str(" WHERE k >= ?1");
+ params.push(b.to_vec());
+ }
+ Bound::Excluded(b) => {
+ sql.push_str(" WHERE k > ?1");
+ params.push(b.to_vec());
+ }
+ Bound::Unbounded => (),
+ };
+
+ match high {
+ Bound::Included(b) => {
+ if !params.is_empty() {
+ sql.push_str(" AND k <= ?2");
+ } else {
+ sql.push_str(" WHERE k <= ?1");
+ }
+ params.push(b.to_vec());
+ }
+ Bound::Excluded(b) => {
+ if !params.is_empty() {
+ sql.push_str(" AND k < ?2");
+ } else {
+ sql.push_str(" WHERE k < ?1");
+ }
+ params.push(b.to_vec());
+ }
+ Bound::Unbounded => (),
+ }
+
+ (sql, params)
+}
diff --git a/src/db/test.rs b/src/db/test.rs
new file mode 100644
index 00000000..cfcee643
--- /dev/null
+++ b/src/db/test.rs
@@ -0,0 +1,106 @@
+use crate::*;
+
+use crate::lmdb_adapter::LmdbDb;
+use crate::sled_adapter::SledDb;
+use crate::sqlite_adapter::SqliteDb;
+
+fn test_suite(db: Db) {
+ let tree = db.open_tree("tree").unwrap();
+
+ let ka: &[u8] = &b"test"[..];
+ let kb: &[u8] = &b"zwello"[..];
+ let kint: &[u8] = &b"tz"[..];
+ let va: &[u8] = &b"plop"[..];
+ let vb: &[u8] = &b"plip"[..];
+ let vc: &[u8] = &b"plup"[..];
+
+ assert!(tree.insert(ka, va).unwrap().is_none());
+ assert_eq!(tree.get(ka).unwrap().unwrap(), va);
+
+ let res = db.transaction::<_, (), _>(|mut tx| {
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
+
+ assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
+
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
+
+ tx.commit(12)
+ });
+ assert!(matches!(res, Ok(12)));
+ assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
+
+ let res = db.transaction::<(), _, _>(|mut tx| {
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
+
+ assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
+
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
+
+ tx.abort(42)
+ });
+ assert!(matches!(res, Err(TxError::Abort(42))));
+ assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
+
+ let mut iter = tree.iter().unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ assert!(tree.insert(kb, vc).unwrap().is_none());
+ assert_eq!(tree.get(kb).unwrap().unwrap(), vc);
+
+ let mut iter = tree.iter().unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ let mut iter = tree.range(kint..).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ let mut iter = tree.range_rev(..kint).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ let mut iter = tree.iter_rev().unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ drop(iter);
+}
+
+#[test]
+fn test_lmdb_db() {
+ let path = mktemp::Temp::new_dir().unwrap();
+ let db = heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .open(&path)
+ .unwrap();
+ let db = LmdbDb::init(db);
+ test_suite(db);
+ drop(path);
+}
+
+#[test]
+fn test_sled_db() {
+ let path = mktemp::Temp::new_dir().unwrap();
+ let db = SledDb::init(sled::open(path.to_path_buf()).unwrap());
+ test_suite(db);
+ drop(path);
+}
+
+#[test]
+fn test_sqlite_db() {
+ let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
+ test_suite(db);
+}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 902f67f8..eb643160 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -21,6 +21,7 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
@@ -36,8 +37,6 @@ rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index bc1f494a..c662aa00 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -660,11 +660,11 @@ impl AdminRpcHandler {
}
Ok(AdminRpc::Ok(ret))
} else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)))
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
- fn gather_stats_local(&self, opt: StatsOpt) -> String {
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
@@ -672,6 +672,7 @@ impl AdminRpcHandler {
self.garage.system.garage_version(),
)
.unwrap();
+ writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather ring statistics
let ring = self.garage.system.ring.borrow().clone();
@@ -689,59 +690,71 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
}
- self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.key_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.object_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.version_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt);
+ self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
if opt.detailed {
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
- self.garage.block_manager.rc_len()
+ self.garage.block_manager.rc_len()?
)
.unwrap();
}
writeln!(
&mut ret,
" resync queue length: {}",
- self.garage.block_manager.resync_queue_len()
+ self.garage.block_manager.resync_queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
- self.garage.block_manager.resync_errors_len()
+ self.garage.block_manager.resync_errors_len()?
)
.unwrap();
- ret
+ Ok(ret)
}
- fn gather_table_stats<F, R>(&self, to: &mut String, t: &Arc<Table<F, R>>, opt: &StatsOpt)
+ fn gather_table_stats<F, R>(
+ &self,
+ to: &mut String,
+ t: &Arc<Table<F, R>>,
+ opt: &StatsOpt,
+ ) -> Result<(), Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
if opt.detailed {
- writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
+ writeln!(
+ to,
+ " number of items: {}",
+ t.data.store.len().map_err(GarageError::from)?
+ )
+ .unwrap();
writeln!(
to,
" Merkle tree size: {}",
- t.merkle_updater.merkle_tree_len()
+ t.merkle_updater.merkle_tree_len()?
)
.unwrap();
}
writeln!(
to,
" Merkle updater todo queue length: {}",
- t.merkle_updater.todo_len()
+ t.merkle_updater.todo_len()?
)
.unwrap();
- writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
+
+ Ok(())
}
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 830eac71..17e14b8b 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -64,13 +64,23 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
+ let mut i = 0;
- while let Some((item_key, item_bytes)) =
- self.garage.version_table.data.store.get_gt(&pos)?
- {
- pos = item_key.to_vec();
+ while !*must_exit.borrow() {
+ let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? {
+ Some((k, v)) => {
+ pos = k;
+ v
+ }
+ None => break,
+ };
+
+ i += 1;
+ if i % 1000 == 0 {
+ info!("repair_versions: {}", i);
+ }
- let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
continue;
}
@@ -98,23 +108,30 @@ impl Repair {
))
.await?;
}
-
- if *must_exit.borrow() {
- break;
- }
}
+ info!("repair_versions: finished, done {}", i);
Ok(())
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
+ let mut i = 0;
- while let Some((item_key, item_bytes)) =
- self.garage.block_ref_table.data.store.get_gt(&pos)?
- {
- pos = item_key.to_vec();
+ while !*must_exit.borrow() {
+ let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? {
+ Some((k, v)) => {
+ pos = k;
+ v
+ }
+ None => break,
+ };
+
+ i += 1;
+ if i % 1000 == 0 {
+ info!("repair_block_ref: {}", i);
+ }
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
continue;
}
@@ -139,11 +156,8 @@ impl Repair {
})
.await?;
}
-
- if *must_exit.borrow() {
- break;
- }
}
+ info!("repair_block_ref: finished, done {}", i);
Ok(())
}
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index b58ad286..697d3358 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -2,6 +2,8 @@ use std::path::PathBuf;
use tokio::sync::watch;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
@@ -31,13 +33,51 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
- db_path.push("db");
- let db = sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
+ std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
+ let db = match config.db_engine.as_str() {
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size = if u32::MAX as usize == usize::MAX {
+ warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
+ 1usize << 30 // 1GB for 32-bit systems
+ } else {
+ 1usize << 40 // 1TB for 64-bit systems
+ };
+
+ let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&db_path)
+ .expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
+ e
+ )));
+ }
+ };
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index ff5cc8da..b32af068 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -29,8 +29,7 @@ async fn test_bucket_all() {
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
- .find(|x| x.name.as_ref().unwrap() == "hello")
- .is_some());
+ .any(|x| x.name.as_ref().unwrap() == "hello"));
}
{
// Get its location
@@ -75,13 +74,12 @@ async fn test_bucket_all() {
{
// Check bucket is deleted with List buckets
let r = ctx.client.list_buckets().send().await.unwrap();
- assert!(r
+ assert!(!r
.buckets
.as_ref()
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
- .find(|x| x.name.as_ref().unwrap() == "hello")
- .is_none());
+ .any(|x| x.name.as_ref().unwrap() == "hello"));
}
}
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 133fe44e..d908dc01 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
@@ -30,8 +31,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 2f99bd68..280f3dc7 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -2,6 +2,8 @@ use std::sync::Arc;
use netapp::NetworkKey;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
@@ -33,7 +35,7 @@ pub struct Garage {
pub config: Config,
/// The local database
- pub db: sled::Db,
+ pub db: db::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
@@ -71,7 +73,7 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -199,7 +201,7 @@ impl Garage {
#[cfg(feature = "k2v")]
impl GarageK2V {
- fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self {
+ fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
info!("Initialize K2V subscription manager...");
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 123154d4..2602d5d9 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -6,6 +6,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
+use garage_db as db;
+
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -114,10 +116,6 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
- fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
- // nothing for now
- }
-
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
if filter.0 == DeletedFilter::Any {
return true;
@@ -135,7 +133,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CounterSchema> {
this_node: Uuid,
- local_counter: sled::Tree,
+ local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -144,7 +142,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
- db: &sled::Db,
+ db: &db::Db,
) -> Arc<Self> {
let background = system.background.clone();
@@ -174,36 +172,36 @@ impl<T: CounterSchema> IndexCounter<T> {
this
}
- pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
+ pub fn count(
+ &self,
+ tx: &mut db::Transaction,
+ pk: &T::P,
+ sk: &T::S,
+ counts: &[(&str, i64)],
+ ) -> db::TxResult<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk);
- let new_entry = self.local_counter.transaction(|tx| {
- let mut entry = match tx.get(&tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?
- }
- None => LocalCounterEntry {
- values: BTreeMap::new(),
- },
- };
-
- for (s, inc) in counts.iter() {
- let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
- ent.1 += *inc;
- }
-
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
- .map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- tx.insert(&tree_key[..], new_entry_bytes)?;
+ let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
+ Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?,
+ None => LocalCounterEntry {
+ values: BTreeMap::new(),
+ },
+ };
+
+ for (s, inc) in counts.iter() {
+ let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ ent.0 += 1;
+ ent.1 += *inc;
+ }
- Ok(entry)
- })?;
+ let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
- if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
+ if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
error!(
"Could not propagate updated counter values, failed to send to channel: {}",
e
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 8b7cc08a..991fe66d 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use garage_db as db;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable {
type E = K2VItem;
type Filter = ItemFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
// 1. Count
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0, 0),
@@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable {
.map(|e| &e.partition.partition_key)
.unwrap_or_else(|| &new.unwrap().partition.partition_key);
- if let Err(e) = self.counter_table.count(
+ let counter_res = self.counter_table.count(
+ tx,
&count_pk,
count_sk,
&[
@@ -248,14 +255,23 @@ impl TableSchema for K2VItemTable {
(VALUES, new_values - old_values),
(BYTES, new_bytes - old_bytes),
],
- ) {
- error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
+ );
+ if let Err(e) = db::unabort(counter_res)? {
+ // This result can be returned by `counter_table.count()` for instance
+ // if messagepack serialization or deserialization fails at some step.
+ // Warn admin but ignore this error for now, that's all we can do.
+ error!(
+ "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
+ count_pk, count_sk, e
+ );
}
// 2. Notify
if let Some(new_ent) = new {
self.subscriptions.notify(new_ent);
}
+
+ Ok(())
}
#[allow(clippy::nonminimal_bool)]
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 7e61957a..25acb4b0 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -25,11 +25,15 @@ impl Migrate {
.open_tree("bucket:table")
.map_err(GarageError::from)?;
- for res in tree.iter() {
+ let mut old_buckets = vec![];
+ for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?;
+ old_buckets.push(bucket);
+ }
+ for bucket in old_buckets {
if let old_bucket::BucketState::Present(p) = bucket.state.get() {
self.migrate_buckets050_do_bucket(&bucket, p).await?;
}
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index 9b3991bf..9589b4aa 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::data::*;
use garage_table::crdt::Crdt;
@@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
- #[allow(clippy::or_fun_call)]
- let block = &old.or(new).unwrap().block;
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ let block = old.or(new).unwrap().block;
let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);
let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
- if let Err(e) = self.block_manager.block_incref(block) {
- warn!("block_incref failed for block {:?}: {}", block, e);
- }
+ self.block_manager.block_incref(tx, block)?;
}
if was_before && !is_after {
- if let Err(e) = self.block_manager.block_decref(block) {
- warn!("block_decref failed for block {:?}: {}", block, e);
- }
+ self.block_manager.block_decref(tx, block)?;
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 3d9a89f7..62f5d8d9 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -232,7 +234,12 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ObjectFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -259,7 +266,8 @@ impl TableSchema for ObjectTable {
}
}
Ok(())
- })
+ });
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index ad096772..881c245a 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -137,7 +139,12 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
let block_ref_table = self.block_ref_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -160,7 +167,9 @@ impl TableSchema for VersionTable {
}
}
Ok(())
- })
+ });
+
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ed1a213f..6de37cda 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
@@ -25,8 +26,6 @@ hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/table/data.rs b/src/table/data.rs
index 5cb10066..3212e82b 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -3,12 +3,13 @@ use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
-use sled::{IVec, Transactional};
use tokio::sync::Notify;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
@@ -25,12 +26,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub instance: F,
pub replication: R,
- pub store: sled::Tree,
+ pub store: db::Tree,
- pub(crate) merkle_tree: sled::Tree,
- pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_tree: db::Tree,
+ pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
- pub(crate) gc_todo: SledCountedTree,
+ pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
@@ -40,7 +41,7 @@ where
F: TableSchema,
R: TableReplication,
{
- pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@@ -55,7 +56,7 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
- let gc_todo = SledCountedTree::new(gc_todo);
+ let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@@ -98,30 +99,30 @@ where
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(partition_key, sk),
};
- let range = self.store.range(first_key..);
+ let range = self.store.range(first_key..)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => match start {
Some(sk) => {
let last_key = self.tree_key(partition_key, sk);
- let range = self.store.range(..=last_key).rev();
+ let range = self.store.range_rev(..=last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
None => {
let mut last_key = partition_hash.to_vec();
let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
- let range = self.store.range(..last_key).rev();
+ let range = self.store.range_rev(..last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
},
}
}
- fn read_range_aux(
+ fn read_range_aux<'a>(
&self,
partition_hash: Hash,
- range: impl Iterator<Item = sled::Result<(IVec, IVec)>>,
+ range: db::ValueIter<'a>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
@@ -139,7 +140,7 @@ where
}
};
if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ ret.push(Arc::new(ByteBuf::from(value)));
}
if ret.len() >= limit {
break;
@@ -183,12 +184,10 @@ where
tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
- let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? {
+ let changed = self.store.db().transaction(|mut tx| {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => {
- let old_entry = self
- .decode_entry(&old_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry)
}
@@ -204,24 +203,28 @@ where
// the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ drop(old_bytes);
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.to_vec(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, tree_key, new_bytes)?;
+
+ self.instance
+ .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+
+ Ok(Some((new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
- self.instance.updated(old_entry.as_ref(), Some(&new_entry));
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@@ -244,22 +247,23 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if cur_v == v => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(false)
- })?;
+ _ => Ok(false),
+ })?;
if removed {
self.metrics.internal_delete_counter.add(1);
-
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
@@ -270,25 +274,26 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(None)
- })?;
+ _ => Ok(false),
+ })?;
- if let Some(old_v) = removed {
- let old_entry = self.decode_entry(&old_v[..])?;
- self.instance.updated(Some(&old_entry), None);
+ if removed {
+ self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
- Ok(true)
- } else {
- Ok(false)
}
+ Ok(removed)
}
// ---- Utility functions ----
@@ -315,7 +320,7 @@ where
}
}
- pub fn gc_todo_len(&self) -> usize {
- self.gc_todo.len()
+ pub fn gc_todo_len(&self) -> Result<usize, Error> {
+ Ok(self.gc_todo.len())
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2a05b6ae..e7fbbcb0 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -12,9 +12,10 @@ use futures::select;
use futures_util::future::*;
use tokio::sync::watch;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_rpc::system::System;
@@ -100,18 +101,16 @@ where
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
- let mut entries = vec![];
- let mut excluded = vec![];
-
// List entries in the GC todo list
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
- for entry_kv in self.data.gc_todo.iter() {
+ let mut candidates = vec![];
+ for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
- let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
+ let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
- if entries.is_empty() && excluded.is_empty() {
+ if candidates.is_empty() {
// If the earliest entry in the todo list shouldn't yet be processed,
// return a duration to wait in the loop
return Ok(Some(Duration::from_millis(
@@ -123,15 +122,23 @@ where
}
}
- let vhash = Hash::try_from(&vhash[..]).unwrap();
+ candidates.push(todo_entry);
+ if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ }
+ let mut entries = vec![];
+ let mut excluded = vec![];
+ for mut todo_entry in candidates {
// Check if the tombstone is still the current value of the entry.
// If not, we don't actually want to GC it, and we will remove it
// from the gc_todo table later (below).
+ let vhash = todo_entry.value_hash;
todo_entry.value = self
.data
.store
- .get(&k[..])?
+ .get(&todo_entry.key[..])?
.filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());
@@ -353,17 +360,17 @@ impl GcTodoEntry {
}
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
- pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
+ pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
Self {
- tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()),
- key: sled_k[8..].to_vec(),
- value_hash: Hash::try_from(sled_v).unwrap(),
+ tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
+ key: db_k[8..].to_vec(),
+ value_hash: Hash::try_from(db_v).unwrap(),
value: None,
}
}
/// Saves the GcTodoEntry in the gc_todo tree
- pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
+ pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@@ -373,9 +380,9 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
- pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
- let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
- &self.todo_table_key()[..],
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
+ gc_todo_tree.compare_and_swap::<_, _, &[u8]>(
+ &self.todo_table_key(),
Some(self.value_hash),
None,
)?;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 93bf7e47..7685b193 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -4,11 +4,10 @@ use std::time::Duration;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use sled::transaction::{
- ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
-};
use tokio::sync::watch;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
@@ -90,35 +89,35 @@ where
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some(x) = self.data.merkle_todo.iter().next() {
- match x {
- Ok((key, valhash)) => {
- if let Err(e) = self.update_item(&key[..], &valhash[..]) {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while iterating on Merkle todo tree: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
+ match self.updater_loop_iter() {
+ Ok(true) => (),
+ Ok(false) => {
+ select! {
+ _ = self.data.merkle_todo_notify.notified().fuse() => {},
+ _ = must_exit.changed().fuse() => {},
}
}
- } else {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
+ Err(e) => {
+ warn!(
+ "({}) Error while updating Merkle tree item: {}",
+ F::TABLE_NAME,
+ e
+ );
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
+ fn updater_loop_iter(&self) -> Result<bool, Error> {
+ if let Some((key, valhash)) = self.data.merkle_todo.first()? {
+ self.update_item(&key, &valhash)?;
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
let khash = blake2sum(k);
@@ -137,13 +136,16 @@ where
};
self.data
.merkle_tree
- .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
+ .db()
+ .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
- let deleted = self
- .data
- .merkle_todo
- .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
- .is_ok();
+ let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
+ if remove {
+ tx.remove(&self.data.merkle_todo, k)?;
+ }
+ Ok(remove)
+ })?;
if !deleted {
debug!(
@@ -157,12 +159,12 @@ where
fn update_item_rec(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &[u8],
khash: &Hash,
key: &MerkleNodeKey,
new_vhash: Option<Hash>,
- ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ ) -> db::TxResult<Option<Hash>, Error> {
let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key)
@@ -203,7 +205,7 @@ where
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
- tx.remove(key_sub.encode())?;
+ tx.remove(&self.data.merkle_tree, key_sub.encode())?;
Some(x)
}
}
@@ -283,28 +285,27 @@ where
fn read_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
- ) -> ConflictableTransactionResult<MerkleNode, Error> {
- let ent = tx.get(k.encode())?;
- MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ ) -> db::TxResult<MerkleNode, Error> {
+ let ent = tx.get(&self.data.merkle_tree, k.encode())?;
+ MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort)
}
fn put_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
v: &MerkleNode,
- ) -> ConflictableTransactionResult<Hash, Error> {
+ ) -> db::TxResult<Hash, Error> {
trace!("Put Merkle node: {:?} => {:?}", k, v);
if *v == MerkleNode::Empty {
- tx.remove(k.encode())?;
+ tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v)
- .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
- tx.insert(k.encode(), vby)?;
+ tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
}
}
@@ -312,15 +313,15 @@ where
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
let ent = self.data.merkle_tree.get(k.encode())?;
- MerkleNode::decode_opt(ent)
+ MerkleNode::decode_opt(&ent)
}
- pub fn merkle_tree_len(&self) -> usize {
- self.data.merkle_tree.len()
+ pub fn merkle_tree_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_tree.len()?)
}
- pub fn todo_len(&self) -> usize {
- self.data.merkle_todo.len()
+ pub fn todo_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_todo.len()?)
}
}
@@ -347,7 +348,7 @@ impl MerkleNodeKey {
}
impl MerkleNode {
- fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 752a2a6d..3a1783e0 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -1,6 +1,7 @@
use opentelemetry::{global, metrics::*, KeyValue};
-use garage_util::sled_counter::SledCountedTree;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
@@ -19,21 +20,19 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
- pub fn new(
- table_name: &'static str,
- merkle_todo: sled::Tree,
- gc_todo: SledCountedTree,
- ) -> Self {
+ pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
- observer.observe(
- merkle_todo.len() as u64,
- &[KeyValue::new("table_name", table_name)],
- )
+ if let Ok(v) = merkle_todo.len() {
+ observer.observe(
+ v as u64,
+ &[KeyValue::new("table_name", table_name)],
+ );
+ }
},
)
.with_description("Merkle tree updater TODO queue length")
@@ -45,7 +44,7 @@ impl TableMetrics {
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
- )
+ );
},
)
.with_description("Table garbage collector TODO queue length")
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 37327037..74f57798 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
+use garage_db as db;
use garage_util::data::*;
use crate::crdt::Crdt;
@@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync {
None
}
- // Updated triggers some stuff downstream, but it is not supposed to block or fail,
- // as the update itself is an unchangeable fact that will never go back
- // due to CRDT logic. Typically errors in propagation of info should be logged
- // to stderr.
- fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {}
+ /// Actions triggered by data changing in a table. If such actions
+ /// include updates to the local database that should be applied
+ /// atomically with the item update itself, a db transaction is
+ /// provided on which these changes should be done.
+ /// This function can return a DB error but that's all.
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ _old: Option<&Self::E>,
+ _new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ Ok(())
+ }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 08069ad0..4c83e991 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -258,9 +258,9 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
- for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ for item in self.data.store.range(begin.to_vec()..end.to_vec())? {
let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value))));
if items.len() >= 1024 {
break;
@@ -603,8 +603,16 @@ impl SyncTodo {
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if data.store.range(begin..end).next().is_none() {
- continue;
+ match data.store.range(begin..end) {
+ Ok(mut iter) => {
+ if iter.next().is_none() {
+ continue;
+ }
+ }
+ Err(e) => {
+ warn!("DB error in add_full_sync: {}", e);
+ continue;
+ }
}
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 2a167604..3c211728 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -13,6 +13,8 @@ use opentelemetry::{
Context,
};
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -69,7 +71,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> {
+ pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 95cde531..5d073436 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -14,6 +14,8 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
+
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
@@ -22,8 +24,6 @@ tracing = "0.1.30"
rand = "0.8"
sha2 = "0.9"
-sled = "0.34"
-
chrono = "0.4"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
diff --git a/src/util/config.rs b/src/util/config.rs
index 99ebce31..e8ef4fdd 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -64,14 +64,19 @@ pub struct Config {
#[serde(default)]
pub kubernetes_skip_crd: bool,
+ // -- DB
+ /// Database engine to use for metadata (options: sled, sqlite, lmdb)
+ #[serde(default = "default_db_engine")]
+ pub db_engine: String,
+
/// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")]
pub sled_cache_capacity: u64,
-
/// Sled flush interval in milliseconds
#[serde(default = "default_sled_flush_every_ms")]
pub sled_flush_every_ms: u64,
+ // -- APIs
/// Configuration for S3 api
pub s3_api: S3ApiConfig,
@@ -129,6 +134,10 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
+fn default_db_engine() -> String {
+ "sled".into()
+}
+
fn default_sled_cache_capacity() -> u64 {
128 * 1024 * 1024
}
diff --git a/src/util/error.rs b/src/util/error.rs
index 8734a0c8..9995c746 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -26,8 +26,8 @@ pub enum Error {
#[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error),
- #[error(display = "Sled error: {}", _0)]
- Sled(#[error(source)] sled::Error),
+ #[error(display = "DB error: {}", _0)]
+ Db(#[error(source)] garage_db::Error),
#[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error),
@@ -78,11 +78,11 @@ impl Error {
}
}
-impl From<sled::transaction::TransactionError<Error>> for Error {
- fn from(e: sled::transaction::TransactionError<Error>) -> Error {
+impl From<garage_db::TxError<Error>> for Error {
+ fn from(e: garage_db::TxError<Error>) -> Error {
match e {
- sled::transaction::TransactionError::Abort(x) => x,
- sled::transaction::TransactionError::Storage(x) => Error::Sled(x),
+ garage_db::TxError::Abort(x) => x,
+ garage_db::TxError::Db(x) => Error::Db(x),
}
}
}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index d8ffdd0b..8ca6e310 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,7 +11,7 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
-pub mod sled_counter;
+//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs
deleted file mode 100644
index bc54cea0..00000000
--- a/src/util/sled_counter.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-use std::sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
-};
-
-use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
-
-#[derive(Clone)]
-pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
-
-struct SledCountedTreeInternal {
- tree: Tree,
- len: AtomicUsize,
-}
-
-impl SledCountedTree {
- pub fn new(tree: Tree) -> Self {
- let len = tree.len();
- Self(Arc::new(SledCountedTreeInternal {
- tree,
- len: AtomicUsize::new(len),
- }))
- }
-
- pub fn len(&self) -> usize {
- self.0.len.load(Ordering::Relaxed)
- }
-
- pub fn is_empty(&self) -> bool {
- self.0.tree.is_empty()
- }
-
- pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
- self.0.tree.get(key)
- }
-
- pub fn iter(&self) -> Iter {
- self.0.tree.iter()
- }
-
- // ---- writing functions ----
-
- pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
- where
- K: AsRef<[u8]>,
- V: Into<IVec>,
- {
- let res = self.0.tree.insert(key, value);
- if res == Ok(None) {
- self.0.len.fetch_add(1, Ordering::Relaxed);
- }
- res
- }
-
- pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
- let res = self.0.tree.remove(key);
- if matches!(res, Ok(Some(_))) {
- self.0.len.fetch_sub(1, Ordering::Relaxed);
- }
- res
- }
-
- pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
- let res = self.0.tree.pop_min();
- if let Ok(Some(_)) = &res {
- self.0.len.fetch_sub(1, Ordering::Relaxed);
- };
- res
- }
-
- pub fn compare_and_swap<K, OV, NV>(
- &self,
- key: K,
- old: Option<OV>,
- new: Option<NV>,
- ) -> Result<std::result::Result<(), CompareAndSwapError>>
- where
- K: AsRef<[u8]>,
- OV: AsRef<[u8]>,
- NV: Into<IVec>,
- {
- let old_some = old.is_some();
- let new_some = new.is_some();
-
- let res = self.0.tree.compare_and_swap(key, old, new);
-
- if res == Ok(Ok(())) {
- match (old_some, new_some) {
- (false, true) => {
- self.0.len.fetch_add(1, Ordering::Relaxed);
- }
- (true, false) => {
- self.0.len.fetch_sub(1, Ordering::Relaxed);
- }
- _ => (),
- }
- }
- res
- }
-}