aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-18 20:17:54 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-18 20:19:30 +0100
commit0038ca8a78f147b9c0ec07ef0121773aaf110dc9 (patch)
tree43f39f30c63a6affa62eeea62cfec674f217c2b4 /src
parent81191d2d92e58ff82ace0f4d82b275c157673ade (diff)
parent1a0bffae3491fae6af5a8d4defc5c6b84839e197 (diff)
downloadgarage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.tar.gz
garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.zip
Merge branch 'main' into next-0.10
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs68
-rw-r--r--src/db/Cargo.toml6
-rw-r--r--src/db/lib.rs12
-rw-r--r--src/db/lmdb_adapter.rs10
-rw-r--r--src/db/open.rs10
-rw-r--r--src/db/sqlite_adapter.rs212
-rw-r--r--src/db/test.rs3
-rw-r--r--src/garage/admin/mod.rs39
-rw-r--r--src/garage/cli/cmd.rs3
-rw-r--r--src/garage/cli/structs.rs15
-rw-r--r--src/garage/server.rs2
-rw-r--r--src/garage/tests/common/garage.rs6
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/garage.rs28
-rw-r--r--src/model/lib.rs1
-rw-r--r--src/model/snapshot.rs136
-rw-r--r--src/util/config.rs8
17 files changed, 401 insertions, 159 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 18fadf85..eeacf8b9 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -22,7 +22,7 @@ use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
-use garage_util::config::DataDirEnum;
+use garage_util::config::Config;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -84,6 +84,7 @@ pub struct BlockManager {
data_fsync: bool,
compression_level: Option<i32>,
+ disable_scrub: bool,
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
@@ -119,9 +120,7 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
- data_dir: DataDirEnum,
- data_fsync: bool,
- compression_level: Option<i32>,
+ config: &Config,
replication: TableShardedReplication,
system: Arc<System>,
) -> Result<Arc<Self>, Error> {
@@ -131,11 +130,13 @@ impl BlockManager {
let data_layout = match data_layout_persister.load() {
Ok(mut layout) => {
layout
- .update(&data_dir)
+ .update(&config.data_dir)
.ok_or_message("invalid data_dir config")?;
layout
}
- Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
+ Err(_) => {
+ DataLayout::initialize(&config.data_dir).ok_or_message("invalid data_dir config")?
+ }
};
data_layout_persister
.save(&data_layout)
@@ -154,7 +155,7 @@ impl BlockManager {
.endpoint("garage_block/manager.rs/Rpc".to_string());
let metrics = BlockManagerMetrics::new(
- compression_level,
+ config.compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
@@ -166,8 +167,9 @@ impl BlockManager {
replication,
data_layout: ArcSwap::new(Arc::new(data_layout)),
data_layout_persister,
- data_fsync,
- compression_level,
+ data_fsync: config.data_fsync,
+ disable_scrub: config.disable_scrub,
+ compression_level: config.compression_level,
mutation_lock: vec![(); MUTEX_COUNT]
.iter()
.map(|_| Mutex::new(BlockManagerLocked()))
@@ -194,33 +196,37 @@ impl BlockManager {
}
// Spawn scrub worker
- let (scrub_tx, scrub_rx) = mpsc::channel(1);
- self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
- bg.spawn_worker(ScrubWorker::new(
- self.clone(),
- scrub_rx,
- self.scrub_persister.clone(),
- ));
+ if !self.disable_scrub {
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ bg.spawn_worker(ScrubWorker::new(
+ self.clone(),
+ scrub_rx,
+ self.scrub_persister.clone(),
+ ));
+ }
}
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
self.resync.register_bg_vars(vars);
- vars.register_rw(
- &self.scrub_persister,
- "scrub-tranquility",
- |p| p.get_with(|x| x.tranquility),
- |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
- );
- vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
- p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
- });
- vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
- p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
- });
- vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
- p.get_with(|x| x.corruptions_detected)
- });
+ if !self.disable_scrub {
+ vars.register_rw(
+ &self.scrub_persister,
+ "scrub-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
+ );
+ vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
+ p.get_with(|x| x.corruptions_detected)
+ });
+ }
}
/// Ask nodes that might have a (possibly compressed) block for it
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index a8f6d586..b88298ee 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -17,7 +17,9 @@ hexdump.workspace = true
tracing.workspace = true
heed = { workspace = true, optional = true }
-rusqlite = { workspace = true, optional = true }
+rusqlite = { workspace = true, optional = true, features = ["backup"] }
+r2d2 = { workspace = true, optional = true }
+r2d2_sqlite = { workspace = true, optional = true }
[dev-dependencies]
mktemp.workspace = true
@@ -26,4 +28,4 @@ mktemp.workspace = true
default = [ "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ]
lmdb = [ "heed" ]
-sqlite = [ "rusqlite" ]
+sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
diff --git a/src/db/lib.rs b/src/db/lib.rs
index ff511b5f..c8f9e13f 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -15,6 +15,7 @@ use core::ops::{Bound, RangeBounds};
use std::borrow::Cow;
use std::cell::Cell;
+use std::path::PathBuf;
use std::sync::Arc;
use err_derive::Error;
@@ -44,6 +45,12 @@ pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value,
#[error(display = "{}", _0)]
pub struct Error(pub Cow<'static, str>);
+impl From<std::io::Error> for Error {
+ fn from(e: std::io::Error) -> Error {
+ Error(format!("IO: {}", e).into())
+ }
+}
+
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Error)]
@@ -126,6 +133,10 @@ impl Db {
}
}
+ pub fn snapshot(&self, path: &PathBuf) -> Result<()> {
+ self.0.snapshot(path)
+ }
+
pub fn import(&self, other: &Db) -> Result<()> {
let existing_trees = self.list_trees()?;
if !existing_trees.is_empty() {
@@ -323,6 +334,7 @@ pub(crate) trait IDb: Send + Sync {
fn engine(&self) -> String;
fn open_tree(&self, name: &str) -> Result<usize>;
fn list_trees(&self) -> Result<Vec<String>>;
+ fn snapshot(&self, path: &PathBuf) -> Result<()>;
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>;
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index 5ce7d3e3..d5066664 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -3,6 +3,7 @@ use core::ptr::NonNull;
use std::collections::HashMap;
use std::convert::TryInto;
+use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
@@ -103,6 +104,15 @@ impl IDb for LmdbDb {
Ok(ret2)
}
+ fn snapshot(&self, to: &PathBuf) -> Result<()> {
+ std::fs::create_dir_all(to)?;
+ let mut path = to.clone();
+ path.push("data.mdb");
+ self.db
+ .copy_to_path(path, heed::CompactionOption::Disabled)?;
+ Ok(())
+ }
+
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
diff --git a/src/db/open.rs b/src/db/open.rs
index 03476a42..19bc96cc 100644
--- a/src/db/open.rs
+++ b/src/db/open.rs
@@ -68,14 +68,8 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
#[cfg(feature = "sqlite")]
Engine::Sqlite => {
info!("Opening Sqlite database at: {}", path.display());
- let db = crate::sqlite_adapter::rusqlite::Connection::open(&path)?;
- db.pragma_update(None, "journal_mode", "WAL")?;
- if opt.fsync {
- db.pragma_update(None, "synchronous", "NORMAL")?;
- } else {
- db.pragma_update(None, "synchronous", "OFF")?;
- }
- Ok(crate::sqlite_adapter::SqliteDb::init(db))
+ let manager = r2d2_sqlite::SqliteConnectionManager::file(path);
+ Ok(crate::sqlite_adapter::SqliteDb::new(manager, opt.fsync)?)
}
// ---- LMDB DB ----
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 6c556c97..a91b9011 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -1,12 +1,14 @@
use core::ops::Bound;
-use std::borrow::BorrowMut;
use std::marker::PhantomPinned;
+use std::path::PathBuf;
use std::pin::Pin;
use std::ptr::NonNull;
-use std::sync::{Arc, Mutex, MutexGuard};
+use std::sync::{Arc, Mutex, RwLock};
-use rusqlite::{params, Connection, Rows, Statement, Transaction};
+use r2d2::Pool;
+use r2d2_sqlite::SqliteConnectionManager;
+use rusqlite::{params, Rows, Statement, Transaction};
use crate::{
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
@@ -15,6 +17,8 @@ use crate::{
pub use rusqlite;
+type Connection = r2d2::PooledConnection<SqliteConnectionManager>;
+
// --- err
impl From<rusqlite::Error> for Error {
@@ -23,6 +27,12 @@ impl From<rusqlite::Error> for Error {
}
}
+impl From<r2d2::Error> for Error {
+ fn from(e: r2d2::Error) -> Error {
+ Error(format!("Sqlite: {}", e).into())
+ }
+}
+
impl From<rusqlite::Error> for TxOpError {
fn from(e: rusqlite::Error) -> TxOpError {
TxOpError(e.into())
@@ -31,35 +41,47 @@ impl From<rusqlite::Error> for TxOpError {
// -- db
-pub struct SqliteDb(Mutex<SqliteDbInner>);
-
-struct SqliteDbInner {
- db: Connection,
- trees: Vec<String>,
+pub struct SqliteDb {
+ db: Pool<SqliteConnectionManager>,
+ trees: RwLock<Vec<Arc<str>>>,
+ // All operations that might write on the DB must take this lock first.
+ // This emulates LMDB's approach where a single writer can be
+ // active at once.
+ write_lock: Mutex<()>,
}
impl SqliteDb {
- pub fn init(db: rusqlite::Connection) -> Db {
- let s = Self(Mutex::new(SqliteDbInner {
- db,
- trees: Vec::new(),
- }));
- Db(Arc::new(s))
+ pub fn new(manager: SqliteConnectionManager, sync_mode: bool) -> Result<Db> {
+ let manager = manager.with_init(move |db| {
+ db.pragma_update(None, "journal_mode", "WAL")?;
+ if sync_mode {
+ db.pragma_update(None, "synchronous", "NORMAL")?;
+ } else {
+ db.pragma_update(None, "synchronous", "OFF")?;
+ }
+ Ok(())
+ });
+ let s = Self {
+ db: Pool::builder().build(manager)?,
+ trees: RwLock::new(vec![]),
+ write_lock: Mutex::new(()),
+ };
+ Ok(Db(Arc::new(s)))
}
}
-impl SqliteDbInner {
- fn get_tree(&self, i: usize) -> Result<&'_ str> {
+impl SqliteDb {
+ fn get_tree(&self, i: usize) -> Result<Arc<str>> {
self.trees
+ .read()
+ .unwrap()
.get(i)
- .map(String::as_str)
+ .cloned()
.ok_or_else(|| Error("invalid tree id".into()))
}
- fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
- let mut stmt = self
- .db
- .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ fn internal_get(&self, db: &Connection, tree: &str, key: &[u8]) -> Result<Option<Value>> {
+ let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
None => Ok(None),
@@ -75,13 +97,14 @@ impl IDb for SqliteDb {
fn open_tree(&self, name: &str) -> Result<usize> {
let name = format!("tree_{}", name.replace(':', "_COLON_"));
- let mut this = self.0.lock().unwrap();
+ let mut trees = self.trees.write().unwrap();
- if let Some(i) = this.trees.iter().position(|x| x == &name) {
+ if let Some(i) = trees.iter().position(|x| x.as_ref() == &name) {
Ok(i)
} else {
+ let db = self.db.get()?;
trace!("create table {}", name);
- this.db.execute(
+ db.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
@@ -93,8 +116,8 @@ impl IDb for SqliteDb {
)?;
trace!("table created: {}, unlocking", name);
- let i = this.trees.len();
- this.trees.push(name.to_string());
+ let i = trees.len();
+ trees.push(name.to_string().into_boxed_str().into());
Ok(i)
}
}
@@ -102,11 +125,8 @@ impl IDb for SqliteDb {
fn list_trees(&self) -> Result<Vec<String>> {
let mut trees = vec![];
- trace!("list_trees: lock db");
- let this = self.0.lock().unwrap();
- trace!("list_trees: lock acquired");
-
- let mut stmt = this.db.prepare(
+ let db = self.db.get()?;
+ let mut stmt = db.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
)?;
let mut rows = stmt.query([])?;
@@ -119,24 +139,29 @@ impl IDb for SqliteDb {
Ok(trees)
}
+ fn snapshot(&self, to: &PathBuf) -> Result<()> {
+ fn progress(p: rusqlite::backup::Progress) {
+ let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
+ info!("Sqlite snapshot progres: {}%", percent);
+ }
+ self.db
+ .get()?
+ .backup(rusqlite::DatabaseName::Main, to, Some(progress))?;
+ Ok(())
+ }
+
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
- trace!("get {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("get {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
- this.internal_get(tree, key)
+ let tree = self.get_tree(tree)?;
+ self.internal_get(&self.db.get()?, &tree, key)
}
fn len(&self, tree: usize) -> Result<usize> {
- trace!("len {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("len {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
- let tree = this.get_tree(tree)?;
- let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
None => Ok(0),
@@ -145,69 +170,60 @@ impl IDb for SqliteDb {
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
- trace!("insert {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("insert {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
- let tree = this.get_tree(tree)?;
- let old_val = this.internal_get(tree, key)?;
+ let old_val = self.internal_get(&db, &tree, key)?;
let sql = match &old_val {
Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
};
- let n = this.db.execute(&sql, params![key, value])?;
+ let n = db.execute(&sql, params![key, value])?;
assert_eq!(n, 1);
+ drop(lock);
Ok(old_val)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
- trace!("remove {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("remove {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
- let tree = this.get_tree(tree)?;
- let old_val = this.internal_get(tree, key)?;
+ let old_val = self.internal_get(&db, &tree, key)?;
if old_val.is_some() {
- let n = this
- .db
- .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ let n = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
assert_eq!(n, 1);
}
+ drop(lock);
Ok(old_val)
}
fn clear(&self, tree: usize) -> Result<()> {
- trace!("clear {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("clear {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
+
+ db.execute(&format!("DELETE FROM {}", tree), [])?;
- let tree = this.get_tree(tree)?;
- this.db.execute(&format!("DELETE FROM {}", tree), [])?;
+ drop(lock);
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
- trace!("iter {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("iter {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
- DbValueIterator::make(this, &sql, [])
+ DbValueIterator::make(self.db.get()?, &sql, [])
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
- trace!("iter_rev {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("iter_rev {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
- DbValueIterator::make(this, &sql, [])
+ DbValueIterator::make(self.db.get()?, &sql, [])
}
fn range<'r>(
@@ -216,11 +232,7 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- trace!("range {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("range {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
@@ -230,7 +242,7 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref())
}
fn range_rev<'r>(
&self,
@@ -238,11 +250,7 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- trace!("range_rev {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("range_rev {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
@@ -252,25 +260,20 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref())
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
- trace!("transaction: lock db");
- let mut this = self.0.lock().unwrap();
- trace!("transaction: lock acquired");
-
- let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
+ let mut db = self.db.get().map_err(Error::from).map_err(TxError::Db)?;
+ let trees = self.trees.read().unwrap();
+ let lock = self.write_lock.lock();
+ trace!("trying transaction");
let mut tx = SqliteTx {
- tx: this_mut_ref
- .db
- .transaction()
- .map_err(Error::from)
- .map_err(TxError::Db)?,
- trees: &this_mut_ref.trees,
+ tx: db.transaction().map_err(Error::from).map_err(TxError::Db)?,
+ trees: &trees,
};
let res = match f.try_on(&mut tx) {
TxFnResult::Ok(on_commit) => {
@@ -290,7 +293,8 @@ impl IDb for SqliteDb {
};
trace!("transaction done");
- res
+ drop(lock);
+ return res;
}
}
@@ -298,12 +302,12 @@ impl IDb for SqliteDb {
struct SqliteTx<'a> {
tx: Transaction<'a>,
- trees: &'a [String],
+ trees: &'a [Arc<str>],
}
impl<'a> SqliteTx<'a> {
fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
- self.trees.get(i).map(String::as_ref).ok_or_else(|| {
+ self.trees.get(i).map(Arc::as_ref).ok_or_else(|| {
TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
))
@@ -423,18 +427,14 @@ impl<'a> ITx for SqliteTx<'a> {
// therefore quite some unsafe code (it is a self-referential struct)
struct DbValueIterator<'a> {
- db: MutexGuard<'a, SqliteDbInner>,
+ db: Connection,
stmt: Option<Statement<'a>>,
iter: Option<Rows<'a>>,
_pin: PhantomPinned,
}
impl<'a> DbValueIterator<'a> {
- fn make<P: rusqlite::Params>(
- db: MutexGuard<'a, SqliteDbInner>,
- sql: &str,
- args: P,
- ) -> Result<ValueIter<'a>> {
+ fn make<P: rusqlite::Params>(db: Connection, sql: &str, args: P) -> Result<ValueIter<'a>> {
let res = DbValueIterator {
db,
stmt: None,
@@ -446,7 +446,7 @@ impl<'a> DbValueIterator<'a> {
// This unsafe allows us to bypass lifetime checks
let db = unsafe { NonNull::from(&boxed.db).as_ref() };
- let stmt = db.db.prepare(sql)?;
+ let stmt = db.prepare(sql)?;
let mut_ref = Pin::as_mut(&mut boxed);
// This unsafe allows us to write in a field of the pinned struct
diff --git a/src/db/test.rs b/src/db/test.rs
index 3add89fb..adb429e7 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -144,6 +144,7 @@ fn test_lmdb_db() {
fn test_sqlite_db() {
use crate::sqlite_adapter::SqliteDb;
- let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
+ let manager = r2d2_sqlite::SqliteConnectionManager::memory();
+ let db = SqliteDb::new(manager, false).unwrap();
test_suite(db);
}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index 300e1211..e2468143 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -44,6 +44,7 @@ pub enum AdminRpc {
Stats(StatsOpt),
Worker(WorkerOperation),
BlockOperation(BlockOperation),
+ MetaOperation(MetaOperation),
// Replies
Ok(String),
@@ -465,6 +466,43 @@ impl AdminRpcHandler {
)]))
}
}
+
+ // ================ META DB COMMANDS ====================
+
+ async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
+ match mo {
+ MetaOperation::Snapshot { all: true } => {
+ let to = self.garage.system.cluster_layout().all_nodes().to_vec();
+
+ let resps = futures::future::join_all(to.iter().map(|to| async move {
+ let to = (*to).into();
+ self.endpoint
+ .call(
+ &to,
+ AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
+ PRIO_NORMAL,
+ )
+ .await
+ }))
+ .await;
+
+ let mut ret = vec![];
+ for (to, resp) in to.iter().zip(resps.iter()) {
+ let res_str = match resp {
+ Ok(_) => "ok".to_string(),
+ Err(e) => format!("error: {}", e),
+ };
+ ret.push(format!("{:?}\t{}", to, res_str));
+ }
+
+ Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ }
+ MetaOperation::Snapshot { all: false } => {
+ garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
+ Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
+ }
+ }
+ }
}
#[async_trait]
@@ -481,6 +519,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
+ AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 7440457f..a84061a7 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -41,6 +41,9 @@ pub async fn cli_command_dispatch(
Command::Block(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
}
+ Command::Meta(mo) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
+ }
_ => unreachable!(),
}
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index a5e2e6e8..1f572a9a 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -52,6 +52,10 @@ pub enum Command {
#[structopt(name = "block", version = garage_version())]
Block(BlockOperation),
+ /// Operations on the metadata db
+ #[structopt(name = "meta", version = garage_version())]
+ Meta(MetaOperation),
+
/// Convert metadata db between database engine formats
#[structopt(name = "convert-db", version = garage_version())]
ConvertDb(convert_db::ConvertDbOpt),
@@ -611,3 +615,14 @@ pub enum BlockOperation {
blocks: Vec<String>,
},
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub enum MetaOperation {
+ /// Save a snapshot of the metadata db file
+ #[structopt(name = "snapshot", version = garage_version())]
+ Snapshot {
+ /// Run on all nodes instead of only local node
+ #[structopt(long = "all")]
+ all: bool,
+ },
+}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 6323f957..65bf34db 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -51,7 +51,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone());
info!("Spawning Garage workers...");
- garage.spawn_workers(&background);
+ garage.spawn_workers(&background)?;
if config.admin.trace_sink.is_some() {
info!("Initialize tracing...");
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index f1c1efc8..db23d316 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -42,6 +42,10 @@ impl Instance {
.ok()
.unwrap_or_else(|| env::temp_dir().join(format!("garage-integ-test-{}", port)));
+ let db_engine = env::var("GARAGE_TEST_INTEGRATION_DB_ENGINE")
+ .ok()
+ .unwrap_or_else(|| "lmdb".into());
+
// Clean test runtime directory
if path.exists() {
fs::remove_dir_all(&path).expect("Could not clean test runtime directory");
@@ -52,7 +56,7 @@ impl Instance {
r#"
metadata_dir = "{path}/meta"
data_dir = "{path}/data"
-db_engine = "lmdb"
+db_engine = "{db_engine}"
replication_factor = 1
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index a6bcfbe7..1e1ce0f7 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -29,6 +29,7 @@ err-derive.workspace = true
hex.workspace = true
http.workspace = true
base64.workspace = true
+parse_duration.workspace = true
tracing.workspace = true
rand.workspace = true
zstd.workspace = true
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 8987c594..4405d22d 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -170,14 +170,7 @@ impl Garage {
};
info!("Initialize block manager...");
- let block_manager = BlockManager::new(
- &db,
- config.data_dir.clone(),
- config.data_fsync,
- config.compression_level,
- data_rep_param,
- system.clone(),
- )?;
+ let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?;
block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----
@@ -278,7 +271,7 @@ impl Garage {
}))
}
- pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> {
self.block_manager.spawn_workers(bg);
self.bucket_table.spawn_workers(bg);
@@ -299,6 +292,23 @@ impl Garage {
#[cfg(feature = "k2v")]
self.k2v.spawn_workers(bg);
+
+ if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() {
+ let interval = parse_duration::parse(itv)
+ .ok_or_message("Invalid `metadata_auto_snapshot_interval`")?;
+ if interval < std::time::Duration::from_secs(600) {
+ return Err(Error::Message(
+ "metadata_auto_snapshot_interval too small or negative".into(),
+ ));
+ }
+
+ bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new(
+ self.clone(),
+ interval,
+ ));
+ }
+
+ Ok(())
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 2166105f..1939a7a9 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -15,3 +15,4 @@ pub mod s3;
pub mod garage;
pub mod helper;
+pub mod snapshot;
diff --git a/src/model/snapshot.rs b/src/model/snapshot.rs
new file mode 100644
index 00000000..36f9ec7d
--- /dev/null
+++ b/src/model/snapshot.rs
@@ -0,0 +1,136 @@
+use std::fs;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use rand::prelude::*;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::*;
+
+use crate::garage::Garage;
+
+// The two most recent snapshots are kept
+const KEEP_SNAPSHOTS: usize = 2;
+
+static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(());
+
+// ================ snapshotting logic =====================
+
+/// Run snashot_metadata in a blocking thread and async await on it
+pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
+ let garage = garage.clone();
+ let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
+ worker.await.unwrap()?;
+ Ok(())
+}
+
+/// Take a snapshot of the metadata database, and erase older
+/// snapshots if necessary.
+/// This is not an async function, it should be spawned on a thread pool
+pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> {
+ let lock = match SNAPSHOT_MUTEX.try_lock() {
+ Ok(lock) => lock,
+ Err(_) => {
+ return Err(Error::Message(
+ "Cannot acquire lock, another snapshot might be in progress".into(),
+ ))
+ }
+ };
+
+ let mut snapshots_dir = garage.config.metadata_dir.clone();
+ snapshots_dir.push("snapshots");
+ fs::create_dir_all(&snapshots_dir)?;
+
+ let mut new_path = snapshots_dir.clone();
+ new_path.push(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true));
+
+ info!("Snapshotting metadata db to {}", new_path.display());
+ garage.db.snapshot(&new_path)?;
+ info!("Metadata db snapshot finished");
+
+ if let Err(e) = cleanup_snapshots(&snapshots_dir) {
+ error!("Failed to do cleanup in snapshots directory: {}", e);
+ }
+
+ drop(lock);
+
+ Ok(())
+}
+
+fn cleanup_snapshots(snapshots_dir: &PathBuf) -> Result<(), Error> {
+ let mut snapshots =
+ fs::read_dir(&snapshots_dir)?.collect::<Result<Vec<fs::DirEntry>, std::io::Error>>()?;
+
+ snapshots.retain(|x| x.file_name().len() > 8);
+ snapshots.sort_by_key(|x| x.file_name());
+
+ for to_delete in snapshots.iter().rev().skip(KEEP_SNAPSHOTS) {
+ let path = snapshots_dir.join(to_delete.path());
+ if to_delete.metadata()?.file_type().is_dir() {
+ for file in fs::read_dir(&path)? {
+ let file = file?;
+ if file.metadata()?.is_file() {
+ fs::remove_file(path.join(file.path()))?;
+ }
+ }
+ std::fs::remove_dir(&path)?;
+ } else {
+ std::fs::remove_file(&path)?;
+ }
+ }
+ Ok(())
+}
+
+// ================ auto snapshot worker =====================
+
+pub struct AutoSnapshotWorker {
+ garage: Arc<Garage>,
+ next_snapshot: Instant,
+ snapshot_interval: Duration,
+}
+
+impl AutoSnapshotWorker {
+ pub(crate) fn new(garage: Arc<Garage>, snapshot_interval: Duration) -> Self {
+ Self {
+ garage,
+ snapshot_interval,
+ next_snapshot: Instant::now() + (snapshot_interval / 2),
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for AutoSnapshotWorker {
+ fn name(&self) -> String {
+ "Metadata snapshot worker".into()
+ }
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ freeform: vec![format!(
+ "Next snapshot: {}",
+ (chrono::Utc::now() + (self.next_snapshot - Instant::now())).to_rfc3339()
+ )],
+ ..Default::default()
+ }
+ }
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if Instant::now() < self.next_snapshot {
+ return Ok(WorkerState::Idle);
+ }
+
+ async_snapshot_metadata(&self.garage).await?;
+
+ let rand_factor = 1f32 + thread_rng().gen::<f32>() / 5f32;
+ self.next_snapshot = Instant::now() + self.snapshot_interval.mul_f32(rand_factor);
+
+ Ok(WorkerState::Idle)
+ }
+ async fn wait_for_work(&mut self) -> WorkerState {
+ tokio::time::sleep_until(self.next_snapshot.into()).await;
+ WorkerState::Busy
+ }
+}
diff --git a/src/util/config.rs b/src/util/config.rs
index e243c813..c5a24f76 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -23,6 +23,14 @@ pub struct Config {
#[serde(default)]
pub data_fsync: bool,
+ /// Disable automatic scrubbing of the data directory
+ #[serde(default)]
+ pub disable_scrub: bool,
+
+ /// Automatic snapshot interval for metadata
+ #[serde(default)]
+ pub metadata_auto_snapshot_interval: Option<String>,
+
/// Size of data blocks to save to disk
#[serde(
deserialize_with = "deserialize_capacity",