aboutsummaryrefslogtreecommitdiff
path: root/src/model/garage.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/garage.rs')
-rw-r--r--src/model/garage.rs203
1 files changed, 175 insertions, 28 deletions
diff --git a/src/model/garage.rs b/src/model/garage.rs
index abdb920a..75012952 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -2,8 +2,11 @@ use std::sync::Arc;
use netapp::NetworkKey;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
+use garage_util::error::*;
use garage_rpc::system::System;
@@ -13,13 +16,18 @@ use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::block_ref_table::*;
+use crate::s3::block_ref_table::*;
+use crate::s3::object_table::*;
+use crate::s3::version_table::*;
+
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
+use crate::index_counter::*;
use crate::key_table::*;
-use crate::object_table::*;
-use crate::version_table::*;
+
+#[cfg(feature = "k2v")]
+use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -27,7 +35,7 @@ pub struct Garage {
pub config: Config,
/// The local database
- pub db: sled::Db,
+ pub db: db::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
@@ -35,21 +43,118 @@ pub struct Garage {
/// The block manager
pub block_manager: Arc<BlockManager>,
- /// Table containing informations about buckets
+ /// Table containing buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
- /// Table containing informations about bucket aliases
+ /// Table containing bucket aliases
pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>,
- /// Table containing informations about api keys
+ /// Table containing api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
+ /// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ /// Counting table containing object counters
+ pub object_counter_table: Arc<IndexCounter<Object>>,
+ /// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ /// Table containing S3 block references (not blocks themselves)
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+
+ #[cfg(feature = "k2v")]
+ pub k2v: GarageK2V,
+}
+
+#[cfg(feature = "k2v")]
+pub struct GarageK2V {
+ /// Table containing K2V items
+ pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ /// Indexing table containing K2V item counters
+ pub counter_table: Arc<IndexCounter<K2VItem>>,
+ /// K2V RPC handler
+ pub rpc: Arc<K2VRpcHandler>,
}
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ // Create meta dir and data dir if they don't exist already
+ std::fs::create_dir_all(&config.metadata_dir)
+ .ok_or_message("Unable to create Garage metadata directory")?;
+ std::fs::create_dir_all(&config.data_dir)
+ .ok_or_message("Unable to create Garage data directory")?;
+
+ info!("Opening database...");
+ let mut db_path = config.metadata_dir.clone();
+ let db = match config.db_engine.as_str() {
+ // ---- Sled DB ----
+ #[cfg(feature = "sled")]
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ #[cfg(not(feature = "sled"))]
+ "sled" => return Err(Error::Message("sled db not available in this build".into())),
+ // ---- Sqlite DB ----
+ #[cfg(feature = "sqlite")]
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ #[cfg(not(feature = "sqlite"))]
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ return Err(Error::Message(
+ "sqlite db not available in this build".into(),
+ ))
+ }
+ // ---- LMDB DB ----
+ #[cfg(feature = "lmdb")]
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size = garage_db::lmdb_adapter::recommended_map_size();
+
+ use db::lmdb_adapter::heed;
+ let mut env_builder = heed::EnvOpenOptions::new();
+ env_builder.max_dbs(100);
+ env_builder.max_readers(500);
+ env_builder.map_size(map_size);
+ unsafe {
+ env_builder.flag(heed::flags::Flags::MdbNoSync);
+ env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
+ }
+ let db = env_builder.open(&db_path).expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ #[cfg(not(feature = "lmdb"))]
+ "lmdb" | "heed" => return Err(Error::Message("lmdb db not available in this build".into())),
+ // ---- Unavailable DB engine ----
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: {})",
+ e,
+ vec![
+ #[cfg(feature = "sled")]
+ "sled",
+ #[cfg(feature = "sqlite")]
+ "sqlite",
+ #[cfg(feature = "lmdb")]
+ "lmdb",
+ ]
+ .join(", ")
+ )));
+ }
+ };
+
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -64,7 +169,7 @@ impl Garage {
background.clone(),
replication_mode.replication_factor(),
&config,
- );
+ )?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
@@ -90,11 +195,25 @@ impl Garage {
&db,
config.data_dir.clone(),
config.compression_level,
- config.block_manager_background_tranquility,
data_rep_param,
system.clone(),
);
+ // ---- admin tables ----
+ info!("Initialize bucket_table...");
+ let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
+
+ info!("Initialize bucket_alias_table...");
+ let bucket_alias_table = Table::new(
+ BucketAliasTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ );
+ info!("Initialize key_table_table...");
+ let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
+
+ // ---- S3 tables ----
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
@@ -116,34 +235,28 @@ impl Garage {
&db,
);
+ info!("Initialize object counter table...");
+ let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
info!("Initialize object_table...");
+ #[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
+ object_counter_table: object_counter_table.clone(),
},
- meta_rep_param,
- system.clone(),
- &db,
- );
-
- info!("Initialize bucket_table...");
- let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
-
- info!("Initialize bucket_alias_table...");
- let bucket_alias_table = Table::new(
- BucketAliasTable,
- control_rep_param.clone(),
+ meta_rep_param.clone(),
system.clone(),
&db,
);
- info!("Initialize key_table_table...");
- let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
-
- info!("Initialize Garage...");
+ // ---- K2V ----
+ #[cfg(feature = "k2v")]
+ let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- Arc::new(Self {
+ // -- done --
+ Ok(Arc::new(Self {
config,
db,
background,
@@ -153,12 +266,46 @@ impl Garage {
bucket_alias_table,
key_table,
object_table,
+ object_counter_table,
version_table,
block_ref_table,
- })
+ #[cfg(feature = "k2v")]
+ k2v,
+ }))
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
+
+ pub fn key_helper(&self) -> helper::key::KeyHelper {
+ helper::key::KeyHelper(self)
+ }
+}
+
+#[cfg(feature = "k2v")]
+impl GarageK2V {
+ fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
+ info!("Initialize K2V counter table...");
+ let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+ info!("Initialize K2V subscription manager...");
+ let subscriptions = Arc::new(SubscriptionManager::new());
+ info!("Initialize K2V item table...");
+ let item_table = Table::new(
+ K2VItemTable {
+ counter_table: counter_table.clone(),
+ subscriptions: subscriptions.clone(),
+ },
+ meta_rep_param,
+ system.clone(),
+ db,
+ );
+ let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ Self {
+ item_table,
+ counter_table,
+ rpc,
+ }
+ }
}