diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 168 |
1 files changed, 52 insertions, 116 deletions
diff --git a/src/server.rs b/src/server.rs index 3ea29105..de04615f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,79 +1,34 @@ -use std::io::{Read, Write}; -use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use futures_util::future::*; -use serde::Deserialize; use tokio::sync::watch; use crate::background::*; -use crate::data::*; +use crate::config::*; use crate::error::Error; -use crate::membership::System; -use crate::rpc_server::RpcServer; -use crate::table::*; -use crate::table_fullcopy::*; -use crate::table_sharded::*; - -use crate::block::*; -use crate::block_ref_table::*; -use crate::bucket_table::*; -use crate::object_table::*; -use crate::version_table::*; - -use crate::admin_rpc::*; -use crate::api_server; - -#[derive(Deserialize, Debug, Clone)] -pub struct Config { - pub metadata_dir: PathBuf, - pub data_dir: PathBuf, - - pub api_bind_addr: SocketAddr, - pub rpc_bind_addr: SocketAddr, - pub bootstrap_peers: Vec<SocketAddr>, +use crate::rpc::membership::System; +use crate::rpc::rpc_client::RpcHttpClient; +use crate::rpc::rpc_server::RpcServer; - #[serde(default = "default_max_concurrent_rpc_requests")] - pub max_concurrent_rpc_requests: usize, - - #[serde(default = "default_block_size")] - pub block_size: usize, - - #[serde(default = "default_replication_factor")] - pub meta_replication_factor: usize, - - #[serde(default = "default_epidemic_factor")] - pub meta_epidemic_factor: usize, +use crate::table::table_fullcopy::*; +use crate::table::table_sharded::*; +use crate::table::*; - #[serde(default = "default_replication_factor")] - pub data_replication_factor: usize, +use crate::store::block::*; +use crate::store::block_ref_table::*; +use crate::store::bucket_table::*; +use crate::store::object_table::*; +use crate::store::version_table::*; - pub rpc_tls: Option<TlsConfig>, -} +use crate::api::api_server; -fn default_max_concurrent_rpc_requests() -> usize { - 12 -} -fn default_block_size() -> usize { - 1048576 -} -fn default_replication_factor() -> usize { - 3 -} -fn default_epidemic_factor() -> usize { - 3 -} - -#[derive(Deserialize, Debug, Clone)] -pub struct TlsConfig { - pub ca_cert: String, - pub node_cert: String, - pub node_key: String, -} +use crate::admin_rpc::*; pub struct Garage { + pub config: Config, + pub db: sled::Db, pub background: Arc<BackgroundRunner>, pub system: Arc<System>, @@ -88,33 +43,46 @@ pub struct Garage { impl Garage { pub async fn new( config: Config, - id: UUID, db: sled::Db, background: Arc<BackgroundRunner>, rpc_server: &mut RpcServer, ) -> Arc<Self> { info!("Initialize membership management system..."); - let system = System::new(config.clone(), id, background.clone(), rpc_server); - - info!("Initialize block manager..."); - let block_manager = - BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server); + let rpc_http_client = Arc::new( + RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) + .expect("Could not create RPC client"), + ); + let system = System::new( + config.metadata_dir.clone(), + rpc_http_client, + background.clone(), + rpc_server, + ); let data_rep_param = TableShardedReplication { - replication_factor: system.config.data_replication_factor, - write_quorum: (system.config.data_replication_factor + 1) / 2, + replication_factor: config.data_replication_factor, + write_quorum: (config.data_replication_factor + 1) / 2, read_quorum: 1, }; let meta_rep_param = TableShardedReplication { - replication_factor: system.config.meta_replication_factor, - write_quorum: (system.config.meta_replication_factor + 1) / 2, - read_quorum: (system.config.meta_replication_factor + 1) / 2, + replication_factor: config.meta_replication_factor, + write_quorum: (config.meta_replication_factor + 1) / 2, + read_quorum: (config.meta_replication_factor + 1) / 2, }; let control_rep_param = TableFullReplication::new( - system.config.meta_epidemic_factor, - (system.config.meta_epidemic_factor + 1) / 2, + config.meta_epidemic_factor, + (config.meta_epidemic_factor + 1) / 2, + ); + + info!("Initialize block manager..."); + let block_manager = BlockManager::new( + &db, + config.data_dir.clone(), + data_rep_param.clone(), + system.clone(), + rpc_server, ); info!("Initialize block_ref_table..."); @@ -172,6 +140,7 @@ impl Garage { info!("Initialize Garage..."); let garage = Arc::new(Self { + config, db, system: system.clone(), block_manager, @@ -193,40 +162,6 @@ impl Garage { } } -fn read_config(config_file: PathBuf) -> Result<Config, Error> { - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(config_file.as_path())?; - - let mut config = String::new(); - file.read_to_string(&mut config)?; - - Ok(toml::from_str(&config)?) -} - -fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { - let mut id_file = metadata_dir.clone(); - id_file.push("node_id"); - if id_file.as_path().exists() { - let mut f = std::fs::File::open(id_file.as_path())?; - let mut d = vec![]; - f.read_to_end(&mut d)?; - if d.len() != 32 { - return Err(Error::Message(format!("Corrupt node_id file"))); - } - - let mut id = [0u8; 32]; - id.copy_from_slice(&d[..]); - Ok(id.into()) - } else { - let id = gen_uuid(); - - let mut f = std::fs::File::create(id_file.as_path())?; - f.write_all(id.as_slice())?; - Ok(id) - } -} - async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> { // Wait for the CTRL+C signal tokio::signal::ctrl_c() @@ -249,9 +184,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); let config = read_config(config_file).expect("Unable to read config file"); - let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); - info!("Node ID: {}", hex::encode(&id)); - info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); @@ -264,17 +196,21 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let (send_cancel, watch_cancel) = watch::channel(false); let background = BackgroundRunner::new(16, watch_cancel.clone()); - let garage = Garage::new(config, id, db, background.clone(), &mut rpc_server).await; + let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); futures::try_join!( - garage.system.clone().bootstrap().map(|rv| { - info!("Bootstrap done"); - Ok(rv) - }), + garage + .system + .clone() + .bootstrap(&garage.config.bootstrap_peers[..]) + .map(|rv| { + info!("Bootstrap done"); + Ok(rv) + }), run_rpc_server.map(|rv| { info!("RPC server exited"); rv |