aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--src/db/Cargo.toml1
-rw-r--r--src/db/bin/convert.rs8
-rw-r--r--src/db/lib.rs8
-rw-r--r--src/db/sqlite_adapter.rs80
-rw-r--r--src/garage/server.rs35
-rw-r--r--src/table/merkle.rs9
-rw-r--r--src/util/config.rs11
8 files changed, 119 insertions, 34 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 73879369..c43aa81f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1017,6 +1017,7 @@ dependencies = [
"clap 3.1.18",
"err-derive 0.3.1",
"hexdump",
+ "log",
"mktemp",
"rusqlite",
"sled",
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 22abc0b9..36b96229 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -19,6 +19,7 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
+log = "0.4"
sled = "0.34"
rusqlite = "0.27"
diff --git a/src/db/bin/convert.rs b/src/db/bin/convert.rs
index 8c4f0ddc..7525bcc9 100644
--- a/src/db/bin/convert.rs
+++ b/src/db/bin/convert.rs
@@ -2,7 +2,7 @@ use std::path::PathBuf;
use garage_db::*;
-use clap::{Parser};
+use clap::Parser;
/// K2V command line interface
#[derive(Parser, Debug)]
@@ -41,12 +41,10 @@ fn do_conversion(args: Args) -> Result<()> {
fn open_db(path: PathBuf, engine: String) -> Result<Db> {
match engine.as_str() {
"sled" => {
- let db = sled_adapter::sled::Config::default()
- .path(&path)
- .open()?;
+ let db = sled_adapter::sled::Config::default().path(&path).open()?;
Ok(sled_adapter::SledDb::init(db))
}
- "sqlite" | "rusqlite" => {
+ "sqlite" | "sqlite3" | "rusqlite" => {
let db = sqlite_adapter::rusqlite::Connection::open(&path)?;
Ok(sqlite_adapter::SqliteDb::init(db))
}
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 49ec0765..1b31df43 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -180,7 +180,13 @@ impl Db {
pub fn import(&self, other: &Db) -> Result<()> {
let existing_trees = self.list_trees()?;
if !existing_trees.is_empty() {
- return Err(Error(format!("destination database already contains data: {:?}", existing_trees).into()));
+ return Err(Error(
+ format!(
+ "destination database already contains data: {:?}",
+ existing_trees
+ )
+ .into(),
+ ));
}
let tree_names = other.list_trees()?;
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 386eb951..49c07562 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -5,6 +5,8 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
+use log::trace;
+
use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter};
@@ -53,13 +55,17 @@ impl SqliteDb {
impl IDb for SqliteDb {
fn open_tree(&self, name: &str) -> Result<usize> {
- let name = format!("tree_{}", name.replace(":", "_COLON_"));
+ let name = format!("tree_{}", name.replace(':', "_COLON_"));
let mut trees = self.trees.write().unwrap();
if let Some(i) = trees.iter().position(|x| x == &name) {
Ok(i)
} else {
- self.db.lock().unwrap().execute(
+ trace!("open tree {}: lock db", name);
+ let db = self.db.lock().unwrap();
+ trace!("create table {}", name);
+
+ db.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
@@ -69,6 +75,8 @@ impl IDb for SqliteDb {
),
[],
)?;
+ trace!("table created: {}", name);
+
let i = trees.len();
trees.push(name.to_string());
Ok(i)
@@ -77,7 +85,11 @@ impl IDb for SqliteDb {
fn list_trees(&self) -> Result<Vec<String>> {
let mut trees = vec![];
+
+ trace!("list_trees: lock db");
let db = self.db.lock().unwrap();
+ trace!("list_trees: lock acquired");
+
let mut stmt = db.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
)?;
@@ -94,7 +106,11 @@ impl IDb for SqliteDb {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
+
+ trace!("get: lock db");
let db = self.db.lock().unwrap();
+ trace!("get: lock acquired");
+
let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
@@ -105,14 +121,22 @@ impl IDb for SqliteDb {
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
+
+ trace!("remove: lock db");
let db = self.db.lock().unwrap();
+ trace!("remove: lock acquired");
+
let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
Ok(res > 0)
}
fn len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
+
+ trace!("len: lock db");
let db = self.db.lock().unwrap();
+ trace!("len: lock acquired");
+
let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
@@ -123,7 +147,11 @@ impl IDb for SqliteDb {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
+
+ trace!("insert: lock db");
let db = self.db.lock().unwrap();
+ trace!("insert: lock acquired");
+
db.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
params![key, value],
@@ -134,13 +162,23 @@ impl IDb for SqliteDb {
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
- DbValueIterator::make(self.db.lock().unwrap(), &sql, [])
+
+ trace!("iter {}: lock db", tree);
+ let db = self.db.lock().unwrap();
+ trace!("iter {}: lock acquired", tree);
+
+ DbValueIterator::make(db, &sql, [])
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
- DbValueIterator::make(self.db.lock().unwrap(), &sql, [])
+
+ trace!("iter_rev {}: lock db", tree);
+ let db = self.db.lock().unwrap();
+ trace!("iter_rev {}: lock acquired", tree);
+
+ DbValueIterator::make(db, &sql, [])
}
fn range<'r>(
@@ -158,11 +196,12 @@ impl IDb for SqliteDb {
.iter()
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(
- self.db.lock().unwrap(),
- &sql,
- params.as_ref(),
- )
+
+ trace!("range {}: lock db", tree);
+ let db = self.db.lock().unwrap();
+ trace!("range {}: lock acquired", tree);
+
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
}
fn range_rev<'r>(
&self,
@@ -179,23 +218,28 @@ impl IDb for SqliteDb {
.iter()
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(
- self.db.lock().unwrap(),
- &sql,
- params.as_ref(),
- )
+
+ trace!("range_rev {}: lock db", tree);
+ let db = self.db.lock().unwrap();
+ trace!("range_rev {}: lock acquired", tree);
+
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
let trees = self.trees.read().unwrap();
+
+ trace!("transaction: lock db");
let mut db = self.db.lock().unwrap();
+ trace!("transaction: lock acquired");
+
let tx = SqliteTx {
tx: db.transaction()?,
trees: trees.as_ref(),
};
- match f.try_on(&tx) {
+ let res = match f.try_on(&tx) {
TxFnResult::Ok => {
tx.tx.commit()?;
Ok(())
@@ -210,7 +254,10 @@ impl IDb for SqliteDb {
"(this message will be discarded)".into(),
)))
}
- }
+ };
+
+ trace!("transaction done");
+ res
}
}
@@ -337,6 +384,7 @@ impl<'a> DbValueIterator<'a> {
impl<'a> Drop for DbValueIterator<'a> {
fn drop(&mut self) {
+ trace!("drop iter");
drop(self.iter.take());
drop(self.stmt.take());
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index b102067e..bd34456d 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -32,15 +32,32 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let config = read_config(config_file).expect("Unable to read config file");
info!("Opening database...");
- let mut db_path = config.metadata_dir.clone();
- db_path.push("db");
- let db = db::sled_adapter::sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
- let db = db::sled_adapter::SledDb::init(db);
+ let db = match config.db_engine.as_str() {
+ "sled" => {
+ let mut db_path = config.metadata_dir.clone();
+ db_path.push("db");
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ "sqlite" => {
+ let mut db_path = config.metadata_dir.clone();
+ db_path.push("db.sqlite");
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: sled, sqlite)",
+ e
+ )));
+ }
+ };
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index f7dca97b..48d2c5dd 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -110,9 +110,14 @@ where
}
fn updater_loop_iter(&self) -> Result<bool, Error> {
- if let Some(x) = self.data.merkle_todo.iter()?.next() {
+ // TODO undo this iter hack
+ let mut iter = self.data.merkle_todo.iter()?;
+ if let Some(x) = iter.next() {
let (key, valhash) = x?;
- self.update_item(&key[..], &valhash[..])?;
+ let key = key.to_vec();
+ let valhash = valhash.to_vec();
+ drop(iter);
+ self.update_item(&key, &valhash)?;
Ok(true)
} else {
Ok(false)
diff --git a/src/util/config.rs b/src/util/config.rs
index 99ebce31..3b37adbb 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -64,14 +64,19 @@ pub struct Config {
#[serde(default)]
pub kubernetes_skip_crd: bool,
+ // -- DB
+ /// Database engine to use for metadata (options: sled, sqlite)
+ #[serde(default = "default_db_engine")]
+ pub db_engine: String,
+
/// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")]
pub sled_cache_capacity: u64,
-
/// Sled flush interval in milliseconds
#[serde(default = "default_sled_flush_every_ms")]
pub sled_flush_every_ms: u64,
+ // -- APIs
/// Configuration for S3 api
pub s3_api: S3ApiConfig,
@@ -129,6 +134,10 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
+fn default_db_engine() -> String {
+ "sled".into()
+}
+
fn default_sled_cache_capacity() -> u64 {
128 * 1024 * 1024
}