diff options
-rw-r--r-- | src/block_ref_table.rs | 15 | ||||
-rw-r--r-- | src/object_table.rs | 12 | ||||
-rw-r--r-- | src/server.rs | 63 | ||||
-rw-r--r-- | src/version_table.rs | 13 |
4 files changed, 53 insertions, 50 deletions
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index f3a14d81..3e5fb0a1 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::RwLock; use crate::data::*; -use crate::server::Garage; use crate::table::*; +use crate::background::*; +use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { @@ -35,7 +35,8 @@ impl Entry<Hash, UUID> for BlockRef { } pub struct BlockRefTable { - pub garage: RwLock<Option<Arc<Garage>>>, + pub background: Arc<BackgroundRunner>, + pub block_manager: Arc<BlockManager>, } #[async_trait] @@ -45,19 +46,17 @@ impl TableFormat for BlockRefTable { type E = BlockRef; async fn updated(&self, old: Option<Self::E>, new: Self::E) { - let garage = self.garage.read().await.as_ref().cloned().unwrap(); - let was_before = old.map(|x| !x.deleted).unwrap_or(false); let is_after = !new.deleted; if is_after && !was_before { - if let Err(e) = garage.block_manager.block_incref(&new.block) { + if let Err(e) = self.block_manager.block_incref(&new.block) { eprintln!("Failed to incref block {:?}: {}", &new.block, e); } } if was_before && !is_after { - if let Err(e) = garage + if let Err(e) = self .block_manager - .block_decref(&new.block, &garage.background) + .block_decref(&new.block, &self.background) { eprintln!("Failed to decref block {:?}: {}", &new.block, e); } diff --git a/src/object_table.rs b/src/object_table.rs index 63dabf20..8ce49565 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -1,11 +1,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::RwLock; use crate::data::*; -use crate::server::Garage; use crate::table::*; +use crate::background::BackgroundRunner; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -88,7 +87,8 @@ impl Entry<String, String> for Object { } pub struct ObjectTable { - pub garage: RwLock<Option<Arc<Garage>>>, + pub background: Arc<BackgroundRunner>, + pub version_table: Arc<Table<VersionTable>>, } #[async_trait] @@ -98,8 +98,8 @@ impl TableFormat for ObjectTable { type E = Object; async fn updated(&self, old: Option<Self::E>, new: Self::E) { - let garage = self.garage.read().await.as_ref().cloned().unwrap(); - garage.clone().background.spawn(async move { + let version_table = self.version_table.clone(); + self.background.spawn(async move { // Propagate deletion of old versions if let Some(old_v) = old { for v in old_v.versions.iter() { @@ -115,7 +115,7 @@ impl TableFormat for ObjectTable { bucket: old_v.bucket.clone(), key: old_v.key.clone(), }; - garage.version_table.insert(&deleted_version).await?; + version_table.insert(&deleted_version).await?; } } } diff --git a/src/server.rs b/src/server.rs index e38e8580..29a2dbcb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::watch; -use tokio::sync::RwLock; use crate::api_server; use crate::background::*; @@ -36,12 +35,21 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + + pub tls: TlsConfig, +} + +#[derive(Deserialize, Debug)] +pub struct TlsConfig { + pub ca_cert: Option<String>, + pub node_cert: Option<String>, + pub node_key: Option<String>, } pub struct Garage { pub db: sled::Db, pub system: Arc<System>, - pub block_manager: BlockManager, + pub block_manager: Arc<BlockManager>, pub background: Arc<BackgroundRunner>, pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>, @@ -58,10 +66,17 @@ impl Garage { db: sled::Db, background: Arc<BackgroundRunner>, ) -> Arc<Self> { - let block_manager = BlockManager::new(&db, config.data_dir.clone()); + let block_manager = Arc::new(BlockManager::new(&db, config.data_dir.clone())); let system = Arc::new(System::new(config, id, background.clone())); + let data_rep_param = TableReplicationParams { + replication_factor: system.config.data_replication_factor, + write_quorum: (system.config.data_replication_factor + 1) / 2, + read_quorum: 1, + timeout: DEFAULT_TIMEOUT, + }; + let meta_rep_param = TableReplicationParams { replication_factor: system.config.meta_replication_factor, write_quorum: (system.config.meta_replication_factor + 1) / 2, @@ -69,42 +84,38 @@ impl Garage { timeout: DEFAULT_TIMEOUT, }; - let object_table = Arc::new(Table::new( - ObjectTable { - garage: RwLock::new(None), + let block_ref_table = Arc::new(Table::new( + BlockRefTable { + background: background.clone(), + block_manager: block_manager.clone(), }, system.clone(), &db, - "object".to_string(), - meta_rep_param.clone(), + "block_ref".to_string(), + data_rep_param.clone(), )); let version_table = Arc::new(Table::new( VersionTable { - garage: RwLock::new(None), + background: background.clone(), + block_ref_table: block_ref_table.clone(), }, system.clone(), &db, "version".to_string(), meta_rep_param.clone(), )); - - let data_rep_param = TableReplicationParams { - replication_factor: system.config.data_replication_factor, - write_quorum: (system.config.data_replication_factor + 1) / 2, - read_quorum: 1, - timeout: DEFAULT_TIMEOUT, - }; - - let block_ref_table = Arc::new(Table::new( - BlockRefTable { - garage: RwLock::new(None), + let object_table = Arc::new(Table::new( + ObjectTable { + background: background.clone(), + version_table: version_table.clone(), }, system.clone(), &db, - "block_ref".to_string(), - data_rep_param.clone(), + "object".to_string(), + meta_rep_param.clone(), )); + let mut garage = Self { db, system: system.clone(), @@ -129,13 +140,7 @@ impl Garage { garage.block_ref_table.clone().rpc_handler(), ); - let garage = Arc::new(garage); - - *garage.object_table.instance.garage.write().await = Some(garage.clone()); - *garage.version_table.instance.garage.write().await = Some(garage.clone()); - *garage.block_ref_table.instance.garage.write().await = Some(garage.clone()); - - garage + Arc::new(garage) } } diff --git a/src/version_table.rs b/src/version_table.rs index 9ea0551e..cb70c645 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -1,11 +1,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::RwLock; use crate::data::*; -use crate::server::Garage; use crate::table::*; +use crate::background::BackgroundRunner; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { @@ -54,7 +53,8 @@ impl Entry<Hash, EmptySortKey> for Version { } pub struct VersionTable { - pub garage: RwLock<Option<Arc<Garage>>>, + pub background: Arc<BackgroundRunner>, + pub block_ref_table: Arc<Table<BlockRefTable>>, } #[async_trait] @@ -64,8 +64,8 @@ impl TableFormat for VersionTable { type E = Version; async fn updated(&self, old: Option<Self::E>, new: Self::E) { - let garage = self.garage.read().await.as_ref().cloned().unwrap(); - garage.clone().background.spawn(async move { + let block_ref_table = self.block_ref_table.clone(); + self.background.spawn(async move { // Propagate deletion of version blocks if let Some(old_v) = old { if new.deleted && !old_v.deleted { @@ -78,8 +78,7 @@ impl TableFormat for VersionTable { deleted: true, }) .collect::<Vec<_>>(); - garage - .block_ref_table + block_ref_table .insert_many(&deleted_block_refs[..]) .await?; } |