aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml3
-rw-r--r--src/garage/admin.rs45
-rw-r--r--src/garage/repair.rs50
-rw-r--r--src/garage/server.rs54
-rw-r--r--src/garage/tests/bucket.rs8
5 files changed, 112 insertions, 48 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 902f67f8..eb643160 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -21,6 +21,7 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
@@ -36,8 +37,6 @@ rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index bc1f494a..c662aa00 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -660,11 +660,11 @@ impl AdminRpcHandler {
}
Ok(AdminRpc::Ok(ret))
} else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)))
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
- fn gather_stats_local(&self, opt: StatsOpt) -> String {
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
@@ -672,6 +672,7 @@ impl AdminRpcHandler {
self.garage.system.garage_version(),
)
.unwrap();
+ writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather ring statistics
let ring = self.garage.system.ring.borrow().clone();
@@ -689,59 +690,71 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
}
- self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.key_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.object_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.version_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt);
+ self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
if opt.detailed {
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
- self.garage.block_manager.rc_len()
+ self.garage.block_manager.rc_len()?
)
.unwrap();
}
writeln!(
&mut ret,
" resync queue length: {}",
- self.garage.block_manager.resync_queue_len()
+ self.garage.block_manager.resync_queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
- self.garage.block_manager.resync_errors_len()
+ self.garage.block_manager.resync_errors_len()?
)
.unwrap();
- ret
+ Ok(ret)
}
- fn gather_table_stats<F, R>(&self, to: &mut String, t: &Arc<Table<F, R>>, opt: &StatsOpt)
+ fn gather_table_stats<F, R>(
+ &self,
+ to: &mut String,
+ t: &Arc<Table<F, R>>,
+ opt: &StatsOpt,
+ ) -> Result<(), Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
if opt.detailed {
- writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
+ writeln!(
+ to,
+ " number of items: {}",
+ t.data.store.len().map_err(GarageError::from)?
+ )
+ .unwrap();
writeln!(
to,
" Merkle tree size: {}",
- t.merkle_updater.merkle_tree_len()
+ t.merkle_updater.merkle_tree_len()?
)
.unwrap();
}
writeln!(
to,
" Merkle updater todo queue length: {}",
- t.merkle_updater.todo_len()
+ t.merkle_updater.todo_len()?
)
.unwrap();
- writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
+
+ Ok(())
}
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 830eac71..17e14b8b 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -64,13 +64,23 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
+ let mut i = 0;
- while let Some((item_key, item_bytes)) =
- self.garage.version_table.data.store.get_gt(&pos)?
- {
- pos = item_key.to_vec();
+ while !*must_exit.borrow() {
+ let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? {
+ Some((k, v)) => {
+ pos = k;
+ v
+ }
+ None => break,
+ };
+
+ i += 1;
+ if i % 1000 == 0 {
+ info!("repair_versions: {}", i);
+ }
- let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
continue;
}
@@ -98,23 +108,30 @@ impl Repair {
))
.await?;
}
-
- if *must_exit.borrow() {
- break;
- }
}
+ info!("repair_versions: finished, done {}", i);
Ok(())
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
+ let mut i = 0;
- while let Some((item_key, item_bytes)) =
- self.garage.block_ref_table.data.store.get_gt(&pos)?
- {
- pos = item_key.to_vec();
+ while !*must_exit.borrow() {
+ let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? {
+ Some((k, v)) => {
+ pos = k;
+ v
+ }
+ None => break,
+ };
+
+ i += 1;
+ if i % 1000 == 0 {
+ info!("repair_block_ref: {}", i);
+ }
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
continue;
}
@@ -139,11 +156,8 @@ impl Repair {
})
.await?;
}
-
- if *must_exit.borrow() {
- break;
- }
}
+ info!("repair_block_ref: finished, done {}", i);
Ok(())
}
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index b58ad286..697d3358 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -2,6 +2,8 @@ use std::path::PathBuf;
use tokio::sync::watch;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
@@ -31,13 +33,51 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
- db_path.push("db");
- let db = sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
+ std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
+ let db = match config.db_engine.as_str() {
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size = if u32::MAX as usize == usize::MAX {
+ warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
+ 1usize << 30 // 1GB for 32-bit systems
+ } else {
+ 1usize << 40 // 1TB for 64-bit systems
+ };
+
+ let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&db_path)
+ .expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
+ e
+ )));
+ }
+ };
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index ff5cc8da..b32af068 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -29,8 +29,7 @@ async fn test_bucket_all() {
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
- .find(|x| x.name.as_ref().unwrap() == "hello")
- .is_some());
+ .any(|x| x.name.as_ref().unwrap() == "hello"));
}
{
// Get its location
@@ -75,13 +74,12 @@ async fn test_bucket_all() {
{
// Check bucket is deleted with List buckets
let r = ctx.client.list_buckets().send().await.unwrap();
- assert!(r
+ assert!(!r
.buckets
.as_ref()
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
- .find(|x| x.name.as_ref().unwrap() == "hello")
- .is_none());
+ .any(|x| x.name.as_ref().unwrap() == "hello"));
}
}