From f41583e1b731574b4bb13a20d4b3fd9fe3a899f5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 18 Apr 2020 19:21:34 +0200 Subject: Massive RPC refactoring --- src/server.rs | 45 +++++++++++++++++---------------------------- 1 file changed, 17 insertions(+), 28 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 591a7bf9..57faea21 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; @@ -15,7 +14,7 @@ use crate::data::*; use crate::error::Error; use crate::membership::System; use crate::proto::*; -use crate::rpc_server; +use crate::rpc_server::RpcServer; use crate::table::*; #[derive(Deserialize, Debug, Clone)] @@ -53,8 +52,6 @@ pub struct Garage { pub system: Arc, pub block_manager: Arc, - pub table_rpc_handlers: HashMap>, - pub object_table: Arc>, pub version_table: Arc>, pub block_ref_table: Arc>, @@ -66,12 +63,14 @@ impl Garage { id: UUID, db: sled::Db, background: Arc, + rpc_server: &mut RpcServer, ) -> Arc { println!("Initialize membership management system..."); - let system = Arc::new(System::new(config.clone(), id, background.clone())); + let system = System::new(config.clone(), id, background.clone(), rpc_server); println!("Initialize block manager..."); - let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()); + let block_manager = + BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server); let data_rep_param = TableReplicationParams { replication_factor: system.config.data_replication_factor, @@ -97,6 +96,7 @@ impl Garage { &db, "block_ref".to_string(), data_rep_param.clone(), + rpc_server, ) .await; @@ -110,6 +110,7 @@ impl Garage { &db, "version".to_string(), meta_rep_param.clone(), + rpc_server, ) .await; @@ -123,35 +124,20 @@ impl Garage { &db, "object".to_string(), meta_rep_param.clone(), + rpc_server, ) .await; println!("Initialize Garage..."); - let mut garage = Self { + let garage = Arc::new(Self { db, system: system.clone(), block_manager, background, - table_rpc_handlers: HashMap::new(), object_table, version_table, block_ref_table, - }; - - garage.table_rpc_handlers.insert( - garage.object_table.name.clone(), - garage.object_table.clone().rpc_handler(), - ); - garage.table_rpc_handlers.insert( - garage.version_table.name.clone(), - garage.version_table.clone().rpc_handler(), - ); - garage.table_rpc_handlers.insert( - garage.block_ref_table.name.clone(), - garage.block_ref_table.clone().rpc_handler(), - ); - - let garage = Arc::new(garage); + }); println!("Start block manager background thread..."); garage.block_manager.garage.swap(Some(garage.clone())); @@ -232,20 +218,23 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { db_path.push("db"); let db = sled::open(db_path).expect("Unable to open DB"); - let (send_cancel, watch_cancel) = watch::channel(false); + println!("Initialize RPC server..."); + let rpc_bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], config.rpc_port).into(); + let mut rpc_server = RpcServer::new(rpc_bind_addr, config.rpc_tls.clone()); println!("Initializing background runner..."); + let (send_cancel, watch_cancel) = watch::channel(false); let background = BackgroundRunner::new(8, watch_cancel.clone()); - let garage = Garage::new(config, id, db, background.clone()).await; + let garage = Garage::new(config, id, db, background.clone(), &mut rpc_server).await; println!("Initializing RPC and API servers..."); - let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(watch_cancel.clone())); + let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); futures::try_join!( garage.system.clone().bootstrap().map(Ok), - rpc_server, + run_rpc_server, api_server, background.run().map(Ok), shutdown_signal(send_cancel), -- cgit v1.2.3