diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-18 20:17:54 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-18 20:19:30 +0100 |
commit | 0038ca8a78f147b9c0ec07ef0121773aaf110dc9 (patch) | |
tree | 43f39f30c63a6affa62eeea62cfec674f217c2b4 /src | |
parent | 81191d2d92e58ff82ace0f4d82b275c157673ade (diff) | |
parent | 1a0bffae3491fae6af5a8d4defc5c6b84839e197 (diff) | |
download | garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.tar.gz garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 68 | ||||
-rw-r--r-- | src/db/Cargo.toml | 6 | ||||
-rw-r--r-- | src/db/lib.rs | 12 | ||||
-rw-r--r-- | src/db/lmdb_adapter.rs | 10 | ||||
-rw-r--r-- | src/db/open.rs | 10 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 212 | ||||
-rw-r--r-- | src/db/test.rs | 3 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 39 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 3 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 15 | ||||
-rw-r--r-- | src/garage/server.rs | 2 | ||||
-rw-r--r-- | src/garage/tests/common/garage.rs | 6 | ||||
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/garage.rs | 28 | ||||
-rw-r--r-- | src/model/lib.rs | 1 | ||||
-rw-r--r-- | src/model/snapshot.rs | 136 | ||||
-rw-r--r-- | src/util/config.rs | 8 |
17 files changed, 401 insertions, 159 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 18fadf85..eeacf8b9 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -22,7 +22,7 @@ use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_db as db; use garage_util::background::{vars, BackgroundRunner}; -use garage_util::config::DataDirEnum; +use garage_util::config::Config; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; @@ -84,6 +84,7 @@ pub struct BlockManager { data_fsync: bool, compression_level: Option<i32>, + disable_scrub: bool, mutation_lock: Vec<Mutex<BlockManagerLocked>>, @@ -119,9 +120,7 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( db: &db::Db, - data_dir: DataDirEnum, - data_fsync: bool, - compression_level: Option<i32>, + config: &Config, replication: TableShardedReplication, system: Arc<System>, ) -> Result<Arc<Self>, Error> { @@ -131,11 +130,13 @@ impl BlockManager { let data_layout = match data_layout_persister.load() { Ok(mut layout) => { layout - .update(&data_dir) + .update(&config.data_dir) .ok_or_message("invalid data_dir config")?; layout } - Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, + Err(_) => { + DataLayout::initialize(&config.data_dir).ok_or_message("invalid data_dir config")? + } }; data_layout_persister .save(&data_layout) @@ -154,7 +155,7 @@ impl BlockManager { .endpoint("garage_block/manager.rs/Rpc".to_string()); let metrics = BlockManagerMetrics::new( - compression_level, + config.compression_level, rc.rc.clone(), resync.queue.clone(), resync.errors.clone(), @@ -166,8 +167,9 @@ impl BlockManager { replication, data_layout: ArcSwap::new(Arc::new(data_layout)), data_layout_persister, - data_fsync, - compression_level, + data_fsync: config.data_fsync, + disable_scrub: config.disable_scrub, + compression_level: config.compression_level, mutation_lock: vec![(); MUTEX_COUNT] .iter() .map(|_| Mutex::new(BlockManagerLocked())) @@ -194,33 +196,37 @@ impl BlockManager { } // Spawn scrub worker - let (scrub_tx, scrub_rx) = mpsc::channel(1); - self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - bg.spawn_worker(ScrubWorker::new( - self.clone(), - scrub_rx, - self.scrub_persister.clone(), - )); + if !self.disable_scrub { + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + bg.spawn_worker(ScrubWorker::new( + self.clone(), + scrub_rx, + self.scrub_persister.clone(), + )); + } } pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { self.resync.register_bg_vars(vars); - vars.register_rw( - &self.scrub_persister, - "scrub-tranquility", - |p| p.get_with(|x| x.tranquility), - |p, tranquility| p.set_with(|x| x.tranquility = tranquility), - ); - vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { - p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) - }); - vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| { - p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub)) - }); - vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { - p.get_with(|x| x.corruptions_detected) - }); + if !self.disable_scrub { + vars.register_rw( + &self.scrub_persister, + "scrub-tranquility", + |p| p.get_with(|x| x.tranquility), + |p, tranquility| p.set_with(|x| x.tranquility = tranquility), + ); + vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { + p.get_with(|x| x.corruptions_detected) + }); + } } /// Ask nodes that might have a (possibly compressed) block for it diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index a8f6d586..b88298ee 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -17,7 +17,9 @@ hexdump.workspace = true tracing.workspace = true heed = { workspace = true, optional = true } -rusqlite = { workspace = true, optional = true } +rusqlite = { workspace = true, optional = true, features = ["backup"] } +r2d2 = { workspace = true, optional = true } +r2d2_sqlite = { workspace = true, optional = true } [dev-dependencies] mktemp.workspace = true @@ -26,4 +28,4 @@ mktemp.workspace = true default = [ "lmdb", "sqlite" ] bundled-libs = [ "rusqlite?/bundled" ] lmdb = [ "heed" ] -sqlite = [ "rusqlite" ] +sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ] diff --git a/src/db/lib.rs b/src/db/lib.rs index ff511b5f..c8f9e13f 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -15,6 +15,7 @@ use core::ops::{Bound, RangeBounds}; use std::borrow::Cow; use std::cell::Cell; +use std::path::PathBuf; use std::sync::Arc; use err_derive::Error; @@ -44,6 +45,12 @@ pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, #[error(display = "{}", _0)] pub struct Error(pub Cow<'static, str>); +impl From<std::io::Error> for Error { + fn from(e: std::io::Error) -> Error { + Error(format!("IO: {}", e).into()) + } +} + pub type Result<T> = std::result::Result<T, Error>; #[derive(Debug, Error)] @@ -126,6 +133,10 @@ impl Db { } } + pub fn snapshot(&self, path: &PathBuf) -> Result<()> { + self.0.snapshot(path) + } + pub fn import(&self, other: &Db) -> Result<()> { let existing_trees = self.list_trees()?; if !existing_trees.is_empty() { @@ -323,6 +334,7 @@ pub(crate) trait IDb: Send + Sync { fn engine(&self) -> String; fn open_tree(&self, name: &str) -> Result<usize>; fn list_trees(&self) -> Result<Vec<String>>; + fn snapshot(&self, path: &PathBuf) -> Result<()>; fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn len(&self, tree: usize) -> Result<usize>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 5ce7d3e3..d5066664 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -3,6 +3,7 @@ use core::ptr::NonNull; use std::collections::HashMap; use std::convert::TryInto; +use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, RwLock}; @@ -103,6 +104,15 @@ impl IDb for LmdbDb { Ok(ret2) } + fn snapshot(&self, to: &PathBuf) -> Result<()> { + std::fs::create_dir_all(to)?; + let mut path = to.clone(); + path.push("data.mdb"); + self.db + .copy_to_path(path, heed::CompactionOption::Disabled)?; + Ok(()) + } + // ---- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { diff --git a/src/db/open.rs b/src/db/open.rs index 03476a42..19bc96cc 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -68,14 +68,8 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> { #[cfg(feature = "sqlite")] Engine::Sqlite => { info!("Opening Sqlite database at: {}", path.display()); - let db = crate::sqlite_adapter::rusqlite::Connection::open(&path)?; - db.pragma_update(None, "journal_mode", "WAL")?; - if opt.fsync { - db.pragma_update(None, "synchronous", "NORMAL")?; - } else { - db.pragma_update(None, "synchronous", "OFF")?; - } - Ok(crate::sqlite_adapter::SqliteDb::init(db)) + let manager = r2d2_sqlite::SqliteConnectionManager::file(path); + Ok(crate::sqlite_adapter::SqliteDb::new(manager, opt.fsync)?) } // ---- LMDB DB ---- diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 6c556c97..a91b9011 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -1,12 +1,14 @@ use core::ops::Bound; -use std::borrow::BorrowMut; use std::marker::PhantomPinned; +use std::path::PathBuf; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, RwLock}; -use rusqlite::{params, Connection, Rows, Statement, Transaction}; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{params, Rows, Statement, Transaction}; use crate::{ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, @@ -15,6 +17,8 @@ use crate::{ pub use rusqlite; +type Connection = r2d2::PooledConnection<SqliteConnectionManager>; + // --- err impl From<rusqlite::Error> for Error { @@ -23,6 +27,12 @@ impl From<rusqlite::Error> for Error { } } +impl From<r2d2::Error> for Error { + fn from(e: r2d2::Error) -> Error { + Error(format!("Sqlite: {}", e).into()) + } +} + impl From<rusqlite::Error> for TxOpError { fn from(e: rusqlite::Error) -> TxOpError { TxOpError(e.into()) @@ -31,35 +41,47 @@ impl From<rusqlite::Error> for TxOpError { // -- db -pub struct SqliteDb(Mutex<SqliteDbInner>); - -struct SqliteDbInner { - db: Connection, - trees: Vec<String>, +pub struct SqliteDb { + db: Pool<SqliteConnectionManager>, + trees: RwLock<Vec<Arc<str>>>, + // All operations that might write on the DB must take this lock first. + // This emulates LMDB's approach where a single writer can be + // active at once. + write_lock: Mutex<()>, } impl SqliteDb { - pub fn init(db: rusqlite::Connection) -> Db { - let s = Self(Mutex::new(SqliteDbInner { - db, - trees: Vec::new(), - })); - Db(Arc::new(s)) + pub fn new(manager: SqliteConnectionManager, sync_mode: bool) -> Result<Db> { + let manager = manager.with_init(move |db| { + db.pragma_update(None, "journal_mode", "WAL")?; + if sync_mode { + db.pragma_update(None, "synchronous", "NORMAL")?; + } else { + db.pragma_update(None, "synchronous", "OFF")?; + } + Ok(()) + }); + let s = Self { + db: Pool::builder().build(manager)?, + trees: RwLock::new(vec![]), + write_lock: Mutex::new(()), + }; + Ok(Db(Arc::new(s))) } } -impl SqliteDbInner { - fn get_tree(&self, i: usize) -> Result<&'_ str> { +impl SqliteDb { + fn get_tree(&self, i: usize) -> Result<Arc<str>> { self.trees + .read() + .unwrap() .get(i) - .map(String::as_str) + .cloned() .ok_or_else(|| Error("invalid tree id".into())) } - fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> { - let mut stmt = self - .db - .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + fn internal_get(&self, db: &Connection, tree: &str, key: &[u8]) -> Result<Option<Value>> { + let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut res_iter = stmt.query([key])?; match res_iter.next()? { None => Ok(None), @@ -75,13 +97,14 @@ impl IDb for SqliteDb { fn open_tree(&self, name: &str) -> Result<usize> { let name = format!("tree_{}", name.replace(':', "_COLON_")); - let mut this = self.0.lock().unwrap(); + let mut trees = self.trees.write().unwrap(); - if let Some(i) = this.trees.iter().position(|x| x == &name) { + if let Some(i) = trees.iter().position(|x| x.as_ref() == &name) { Ok(i) } else { + let db = self.db.get()?; trace!("create table {}", name); - this.db.execute( + db.execute( &format!( "CREATE TABLE IF NOT EXISTS {} ( k BLOB PRIMARY KEY, @@ -93,8 +116,8 @@ impl IDb for SqliteDb { )?; trace!("table created: {}, unlocking", name); - let i = this.trees.len(); - this.trees.push(name.to_string()); + let i = trees.len(); + trees.push(name.to_string().into_boxed_str().into()); Ok(i) } } @@ -102,11 +125,8 @@ impl IDb for SqliteDb { fn list_trees(&self) -> Result<Vec<String>> { let mut trees = vec![]; - trace!("list_trees: lock db"); - let this = self.0.lock().unwrap(); - trace!("list_trees: lock acquired"); - - let mut stmt = this.db.prepare( + let db = self.db.get()?; + let mut stmt = db.prepare( "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", )?; let mut rows = stmt.query([])?; @@ -119,24 +139,29 @@ impl IDb for SqliteDb { Ok(trees) } + fn snapshot(&self, to: &PathBuf) -> Result<()> { + fn progress(p: rusqlite::backup::Progress) { + let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; + info!("Sqlite snapshot progres: {}%", percent); + } + self.db + .get()? + .backup(rusqlite::DatabaseName::Main, to, Some(progress))?; + Ok(()) + } + // ---- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - trace!("get {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("get {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; - this.internal_get(tree, key) + let tree = self.get_tree(tree)?; + self.internal_get(&self.db.get()?, &tree, key) } fn len(&self, tree: usize) -> Result<usize> { - trace!("len {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("len {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; - let tree = this.get_tree(tree)?; - let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; + let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; match res_iter.next()? { None => Ok(0), @@ -145,69 +170,60 @@ impl IDb for SqliteDb { } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { - trace!("insert {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("insert {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); - let tree = this.get_tree(tree)?; - let old_val = this.internal_get(tree, key)?; + let old_val = self.internal_get(&db, &tree, key)?; let sql = match &old_val { Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree), None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree), }; - let n = this.db.execute(&sql, params![key, value])?; + let n = db.execute(&sql, params![key, value])?; assert_eq!(n, 1); + drop(lock); Ok(old_val) } fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - trace!("remove {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("remove {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); - let tree = this.get_tree(tree)?; - let old_val = this.internal_get(tree, key)?; + let old_val = self.internal_get(&db, &tree, key)?; if old_val.is_some() { - let n = this - .db - .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; + let n = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; assert_eq!(n, 1); } + drop(lock); Ok(old_val) } fn clear(&self, tree: usize) -> Result<()> { - trace!("clear {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("clear {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); + + db.execute(&format!("DELETE FROM {}", tree), [])?; - let tree = this.get_tree(tree)?; - this.db.execute(&format!("DELETE FROM {}", tree), [])?; + drop(lock); Ok(()) } fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { - trace!("iter {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("iter {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - DbValueIterator::make(this, &sql, []) + DbValueIterator::make(self.db.get()?, &sql, []) } fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> { - trace!("iter_rev {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("iter_rev {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); - DbValueIterator::make(this, &sql, []) + DbValueIterator::make(self.db.get()?, &sql, []) } fn range<'r>( @@ -216,11 +232,7 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>> { - trace!("range {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("range {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql); @@ -230,7 +242,7 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::<Vec<_>>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref()) } fn range_rev<'r>( &self, @@ -238,11 +250,7 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>> { - trace!("range_rev {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("range_rev {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql); @@ -252,25 +260,20 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::<Vec<_>>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref()) } // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> { - trace!("transaction: lock db"); - let mut this = self.0.lock().unwrap(); - trace!("transaction: lock acquired"); - - let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); + let mut db = self.db.get().map_err(Error::from).map_err(TxError::Db)?; + let trees = self.trees.read().unwrap(); + let lock = self.write_lock.lock(); + trace!("trying transaction"); let mut tx = SqliteTx { - tx: this_mut_ref - .db - .transaction() - .map_err(Error::from) - .map_err(TxError::Db)?, - trees: &this_mut_ref.trees, + tx: db.transaction().map_err(Error::from).map_err(TxError::Db)?, + trees: &trees, }; let res = match f.try_on(&mut tx) { TxFnResult::Ok(on_commit) => { @@ -290,7 +293,8 @@ impl IDb for SqliteDb { }; trace!("transaction done"); - res + drop(lock); + return res; } } @@ -298,12 +302,12 @@ impl IDb for SqliteDb { struct SqliteTx<'a> { tx: Transaction<'a>, - trees: &'a [String], + trees: &'a [Arc<str>], } impl<'a> SqliteTx<'a> { fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> { - self.trees.get(i).map(String::as_ref).ok_or_else(|| { + self.trees.get(i).map(Arc::as_ref).ok_or_else(|| { TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), )) @@ -423,18 +427,14 @@ impl<'a> ITx for SqliteTx<'a> { // therefore quite some unsafe code (it is a self-referential struct) struct DbValueIterator<'a> { - db: MutexGuard<'a, SqliteDbInner>, + db: Connection, stmt: Option<Statement<'a>>, iter: Option<Rows<'a>>, _pin: PhantomPinned, } impl<'a> DbValueIterator<'a> { - fn make<P: rusqlite::Params>( - db: MutexGuard<'a, SqliteDbInner>, - sql: &str, - args: P, - ) -> Result<ValueIter<'a>> { + fn make<P: rusqlite::Params>(db: Connection, sql: &str, args: P) -> Result<ValueIter<'a>> { let res = DbValueIterator { db, stmt: None, @@ -446,7 +446,7 @@ impl<'a> DbValueIterator<'a> { // This unsafe allows us to bypass lifetime checks let db = unsafe { NonNull::from(&boxed.db).as_ref() }; - let stmt = db.db.prepare(sql)?; + let stmt = db.prepare(sql)?; let mut_ref = Pin::as_mut(&mut boxed); // This unsafe allows us to write in a field of the pinned struct diff --git a/src/db/test.rs b/src/db/test.rs index 3add89fb..adb429e7 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -144,6 +144,7 @@ fn test_lmdb_db() { fn test_sqlite_db() { use crate::sqlite_adapter::SqliteDb; - let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap()); + let manager = r2d2_sqlite::SqliteConnectionManager::memory(); + let db = SqliteDb::new(manager, false).unwrap(); test_suite(db); } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 300e1211..e2468143 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -44,6 +44,7 @@ pub enum AdminRpc { Stats(StatsOpt), Worker(WorkerOperation), BlockOperation(BlockOperation), + MetaOperation(MetaOperation), // Replies Ok(String), @@ -465,6 +466,43 @@ impl AdminRpcHandler { )])) } } + + // ================ META DB COMMANDS ==================== + + async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> { + match mo { + MetaOperation::Snapshot { all: true } => { + let to = self.garage.system.cluster_layout().all_nodes().to_vec(); + + let resps = futures::future::join_all(to.iter().map(|to| async move { + let to = (*to).into(); + self.endpoint + .call( + &to, + AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), + PRIO_NORMAL, + ) + .await + })) + .await; + + let mut ret = vec![]; + for (to, resp) in to.iter().zip(resps.iter()) { + let res_str = match resp { + Ok(_) => "ok".to_string(), + Err(e) => format!("error: {}", e), + }; + ret.push(format!("{:?}\t{}", to, res_str)); + } + + Ok(AdminRpc::Ok(format_table_to_string(ret))) + } + MetaOperation::Snapshot { all: false } => { + garage_model::snapshot::async_snapshot_metadata(&self.garage).await?; + Ok(AdminRpc::Ok("Snapshot has been saved.".into())) + } + } + } } #[async_trait] @@ -481,6 +519,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, + AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 7440457f..a84061a7 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -41,6 +41,9 @@ pub async fn cli_command_dispatch( Command::Block(bo) => { cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await } + Command::Meta(mo) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await + } _ => unreachable!(), } } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a5e2e6e8..1f572a9a 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -52,6 +52,10 @@ pub enum Command { #[structopt(name = "block", version = garage_version())] Block(BlockOperation), + /// Operations on the metadata db + #[structopt(name = "meta", version = garage_version())] + Meta(MetaOperation), + /// Convert metadata db between database engine formats #[structopt(name = "convert-db", version = garage_version())] ConvertDb(convert_db::ConvertDbOpt), @@ -611,3 +615,14 @@ pub enum BlockOperation { blocks: Vec<String>, }, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +pub enum MetaOperation { + /// Save a snapshot of the metadata db file + #[structopt(name = "snapshot", version = garage_version())] + Snapshot { + /// Run on all nodes instead of only local node + #[structopt(long = "all")] + all: bool, + }, +} diff --git a/src/garage/server.rs b/src/garage/server.rs index 6323f957..65bf34db 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -51,7 +51,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone()); info!("Spawning Garage workers..."); - garage.spawn_workers(&background); + garage.spawn_workers(&background)?; if config.admin.trace_sink.is_some() { info!("Initialize tracing..."); diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index f1c1efc8..db23d316 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -42,6 +42,10 @@ impl Instance { .ok() .unwrap_or_else(|| env::temp_dir().join(format!("garage-integ-test-{}", port))); + let db_engine = env::var("GARAGE_TEST_INTEGRATION_DB_ENGINE") + .ok() + .unwrap_or_else(|| "lmdb".into()); + // Clean test runtime directory if path.exists() { fs::remove_dir_all(&path).expect("Could not clean test runtime directory"); @@ -52,7 +56,7 @@ impl Instance { r#" metadata_dir = "{path}/meta" data_dir = "{path}/data" -db_engine = "lmdb" +db_engine = "{db_engine}" replication_factor = 1 diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index a6bcfbe7..1e1ce0f7 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -29,6 +29,7 @@ err-derive.workspace = true hex.workspace = true http.workspace = true base64.workspace = true +parse_duration.workspace = true tracing.workspace = true rand.workspace = true zstd.workspace = true diff --git a/src/model/garage.rs b/src/model/garage.rs index 8987c594..4405d22d 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -170,14 +170,7 @@ impl Garage { }; info!("Initialize block manager..."); - let block_manager = BlockManager::new( - &db, - config.data_dir.clone(), - config.data_fsync, - config.compression_level, - data_rep_param, - system.clone(), - )?; + let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?; block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- @@ -278,7 +271,7 @@ impl Garage { })) } - pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> { self.block_manager.spawn_workers(bg); self.bucket_table.spawn_workers(bg); @@ -299,6 +292,23 @@ impl Garage { #[cfg(feature = "k2v")] self.k2v.spawn_workers(bg); + + if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() { + let interval = parse_duration::parse(itv) + .ok_or_message("Invalid `metadata_auto_snapshot_interval`")?; + if interval < std::time::Duration::from_secs(600) { + return Err(Error::Message( + "metadata_auto_snapshot_interval too small or negative".into(), + )); + } + + bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new( + self.clone(), + interval, + )); + } + + Ok(()) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { diff --git a/src/model/lib.rs b/src/model/lib.rs index 2166105f..1939a7a9 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -15,3 +15,4 @@ pub mod s3; pub mod garage; pub mod helper; +pub mod snapshot; diff --git a/src/model/snapshot.rs b/src/model/snapshot.rs new file mode 100644 index 00000000..36f9ec7d --- /dev/null +++ b/src/model/snapshot.rs @@ -0,0 +1,136 @@ +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use rand::prelude::*; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::*; + +use crate::garage::Garage; + +// The two most recent snapshots are kept +const KEEP_SNAPSHOTS: usize = 2; + +static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(()); + +// ================ snapshotting logic ===================== + +/// Run snashot_metadata in a blocking thread and async await on it +pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> { + let garage = garage.clone(); + let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage)); + worker.await.unwrap()?; + Ok(()) +} + +/// Take a snapshot of the metadata database, and erase older +/// snapshots if necessary. +/// This is not an async function, it should be spawned on a thread pool +pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> { + let lock = match SNAPSHOT_MUTEX.try_lock() { + Ok(lock) => lock, + Err(_) => { + return Err(Error::Message( + "Cannot acquire lock, another snapshot might be in progress".into(), + )) + } + }; + + let mut snapshots_dir = garage.config.metadata_dir.clone(); + snapshots_dir.push("snapshots"); + fs::create_dir_all(&snapshots_dir)?; + + let mut new_path = snapshots_dir.clone(); + new_path.push(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)); + + info!("Snapshotting metadata db to {}", new_path.display()); + garage.db.snapshot(&new_path)?; + info!("Metadata db snapshot finished"); + + if let Err(e) = cleanup_snapshots(&snapshots_dir) { + error!("Failed to do cleanup in snapshots directory: {}", e); + } + + drop(lock); + + Ok(()) +} + +fn cleanup_snapshots(snapshots_dir: &PathBuf) -> Result<(), Error> { + let mut snapshots = + fs::read_dir(&snapshots_dir)?.collect::<Result<Vec<fs::DirEntry>, std::io::Error>>()?; + + snapshots.retain(|x| x.file_name().len() > 8); + snapshots.sort_by_key(|x| x.file_name()); + + for to_delete in snapshots.iter().rev().skip(KEEP_SNAPSHOTS) { + let path = snapshots_dir.join(to_delete.path()); + if to_delete.metadata()?.file_type().is_dir() { + for file in fs::read_dir(&path)? { + let file = file?; + if file.metadata()?.is_file() { + fs::remove_file(path.join(file.path()))?; + } + } + std::fs::remove_dir(&path)?; + } else { + std::fs::remove_file(&path)?; + } + } + Ok(()) +} + +// ================ auto snapshot worker ===================== + +pub struct AutoSnapshotWorker { + garage: Arc<Garage>, + next_snapshot: Instant, + snapshot_interval: Duration, +} + +impl AutoSnapshotWorker { + pub(crate) fn new(garage: Arc<Garage>, snapshot_interval: Duration) -> Self { + Self { + garage, + snapshot_interval, + next_snapshot: Instant::now() + (snapshot_interval / 2), + } + } +} + +#[async_trait] +impl Worker for AutoSnapshotWorker { + fn name(&self) -> String { + "Metadata snapshot worker".into() + } + fn status(&self) -> WorkerStatus { + WorkerStatus { + freeform: vec![format!( + "Next snapshot: {}", + (chrono::Utc::now() + (self.next_snapshot - Instant::now())).to_rfc3339() + )], + ..Default::default() + } + } + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + if Instant::now() < self.next_snapshot { + return Ok(WorkerState::Idle); + } + + async_snapshot_metadata(&self.garage).await?; + + let rand_factor = 1f32 + thread_rng().gen::<f32>() / 5f32; + self.next_snapshot = Instant::now() + self.snapshot_interval.mul_f32(rand_factor); + + Ok(WorkerState::Idle) + } + async fn wait_for_work(&mut self) -> WorkerState { + tokio::time::sleep_until(self.next_snapshot.into()).await; + WorkerState::Busy + } +} diff --git a/src/util/config.rs b/src/util/config.rs index e243c813..c5a24f76 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -23,6 +23,14 @@ pub struct Config { #[serde(default)] pub data_fsync: bool, + /// Disable automatic scrubbing of the data directory + #[serde(default)] + pub disable_scrub: bool, + + /// Automatic snapshot interval for metadata + #[serde(default)] + pub metadata_auto_snapshot_interval: Option<String>, + /// Size of data blocks to save to disk #[serde( deserialize_with = "deserialize_capacity", |