diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 3 | ||||
-rw-r--r-- | src/garage/admin.rs | 45 | ||||
-rw-r--r-- | src/garage/repair.rs | 50 | ||||
-rw-r--r-- | src/garage/server.rs | 54 | ||||
-rw-r--r-- | src/garage/tests/bucket.rs | 8 |
5 files changed, 112 insertions, 48 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 902f67f8..eb643160 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -21,6 +21,7 @@ path = "tests/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_api = { version = "0.7.0", path = "../api" } garage_model = { version = "0.7.0", path = "../model" } garage_rpc = { version = "0.7.0", path = "../rpc" } @@ -36,8 +37,6 @@ rand = "0.8" async-trait = "0.1.7" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index bc1f494a..c662aa00 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -660,11 +660,11 @@ impl AdminRpcHandler { } Ok(AdminRpc::Ok(ret)) } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt))) + Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) } } - fn gather_stats_local(&self, opt: StatsOpt) -> String { + fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> { let mut ret = String::new(); writeln!( &mut ret, @@ -672,6 +672,7 @@ impl AdminRpcHandler { self.garage.system.garage_version(), ) .unwrap(); + writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); // Gather ring statistics let ring = self.garage.system.ring.borrow().clone(); @@ -689,59 +690,71 @@ impl AdminRpcHandler { writeln!(&mut ret, " {:?} {}", n, c).unwrap(); } - self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.key_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.object_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.version_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; writeln!(&mut ret, "\nBlock manager stats:").unwrap(); if opt.detailed { writeln!( &mut ret, " number of RC entries (~= number of blocks): {}", - self.garage.block_manager.rc_len() + self.garage.block_manager.rc_len()? ) .unwrap(); } writeln!( &mut ret, " resync queue length: {}", - self.garage.block_manager.resync_queue_len() + self.garage.block_manager.resync_queue_len()? ) .unwrap(); writeln!( &mut ret, " blocks with resync errors: {}", - self.garage.block_manager.resync_errors_len() + self.garage.block_manager.resync_errors_len()? ) .unwrap(); - ret + Ok(ret) } - fn gather_table_stats<F, R>(&self, to: &mut String, t: &Arc<Table<F, R>>, opt: &StatsOpt) + fn gather_table_stats<F, R>( + &self, + to: &mut String, + t: &Arc<Table<F, R>>, + opt: &StatsOpt, + ) -> Result<(), Error> where F: TableSchema + 'static, R: TableReplication + 'static, { writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); if opt.detailed { - writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); + writeln!( + to, + " number of items: {}", + t.data.store.len().map_err(GarageError::from)? + ) + .unwrap(); writeln!( to, " Merkle tree size: {}", - t.merkle_updater.merkle_tree_len() + t.merkle_updater.merkle_tree_len()? ) .unwrap(); } writeln!( to, " Merkle updater todo queue length: {}", - t.merkle_updater.todo_len() + t.merkle_updater.todo_len()? ) .unwrap(); - writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); + writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap(); + + Ok(()) } } diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 830eac71..17e14b8b 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -64,13 +64,23 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; + let mut i = 0; - while let Some((item_key, item_bytes)) = - self.garage.version_table.data.store.get_gt(&pos)? - { - pos = item_key.to_vec(); + while !*must_exit.borrow() { + let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { + Some((k, v)) => { + pos = k; + v + } + None => break, + }; + + i += 1; + if i % 1000 == 0 { + info!("repair_versions: {}", i); + } - let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if version.deleted.get() { continue; } @@ -98,23 +108,30 @@ impl Repair { )) .await?; } - - if *must_exit.borrow() { - break; - } } + info!("repair_versions: finished, done {}", i); Ok(()) } async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; + let mut i = 0; - while let Some((item_key, item_bytes)) = - self.garage.block_ref_table.data.store.get_gt(&pos)? - { - pos = item_key.to_vec(); + while !*must_exit.borrow() { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { + Some((k, v)) => { + pos = k; + v + } + None => break, + }; + + i += 1; + if i % 1000 == 0 { + info!("repair_block_ref: {}", i); + } - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if block_ref.deleted.get() { continue; } @@ -139,11 +156,8 @@ impl Repair { }) .await?; } - - if *must_exit.borrow() { - break; - } } + info!("repair_block_ref: finished, done {}", i); Ok(()) } } diff --git a/src/garage/server.rs b/src/garage/server.rs index b58ad286..697d3358 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -2,6 +2,8 @@ use std::path::PathBuf; use tokio::sync::watch; +use garage_db as db; + use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; @@ -31,13 +33,51 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); - db_path.push("db"); - let db = sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); + std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); + let db = match config.db_engine.as_str() { + "sled" => { + db_path.push("db"); + info!("Opening Sled database at: {}", db_path.display()); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + "sqlite" | "sqlite3" | "rusqlite" => { + db_path.push("db.sqlite"); + info!("Opening Sqlite database at: {}", db_path.display()); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + "lmdb" | "heed" => { + db_path.push("db.lmdb"); + info!("Opening LMDB database at: {}", db_path.display()); + std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + let map_size = if u32::MAX as usize == usize::MAX { + warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); + 1usize << 30 // 1GB for 32-bit systems + } else { + 1usize << 40 // 1TB for 64-bit systems + }; + + let db = db::lmdb_adapter::heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(map_size) + .open(&db_path) + .expect("Unable to open LMDB DB"); + db::lmdb_adapter::LmdbDb::init(db) + } + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", + e + ))); + } + }; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs index ff5cc8da..b32af068 100644 --- a/src/garage/tests/bucket.rs +++ b/src/garage/tests/bucket.rs @@ -29,8 +29,7 @@ async fn test_bucket_all() { .unwrap() .iter() .filter(|x| x.name.as_ref().is_some()) - .find(|x| x.name.as_ref().unwrap() == "hello") - .is_some()); + .any(|x| x.name.as_ref().unwrap() == "hello")); } { // Get its location @@ -75,13 +74,12 @@ async fn test_bucket_all() { { // Check bucket is deleted with List buckets let r = ctx.client.list_buckets().send().await.unwrap(); - assert!(r + assert!(!r .buckets .as_ref() .unwrap() .iter() .filter(|x| x.name.as_ref().is_some()) - .find(|x| x.name.as_ref().unwrap() == "hello") - .is_none()); + .any(|x| x.name.as_ref().unwrap() == "hello")); } } |