diff options
Diffstat (limited to 'src/garage/server.rs')
-rw-r--r-- | src/garage/server.rs | 87 |
1 files changed, 34 insertions, 53 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs index 36f7de5c..0edf3e2d 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -1,7 +1,5 @@ use std::path::PathBuf; -use std::sync::Arc; -use futures_util::future::*; use tokio::sync::watch; use garage_util::background::*; @@ -10,21 +8,10 @@ use garage_util::error::Error; use garage_api::run_api_server; use garage_model::garage::Garage; -use garage_rpc::rpc_server::RpcServer; use garage_web::run_web_server; use crate::admin_rpc::*; -async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - info!("Received CTRL+C, shutting down."); - send_cancel.send(true)?; - Ok(()) -} - async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { if chan.changed().await.is_err() { @@ -47,52 +34,46 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .open() .expect("Unable to open sled DB"); - info!("Initialize RPC server..."); - let mut rpc_server = RpcServer::new(config.rpc_bind_addr, config.rpc_tls.clone()); - info!("Initializing background runner..."); - let (send_cancel, watch_cancel) = watch::channel(false); + let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background, &mut rpc_server); - let bootstrap = garage.system.clone().bootstrap( - config.bootstrap_peers, - config.consul_host, - config.consul_service_name, - ); + let garage = Garage::new(config.clone(), db, background); + + let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Crate admin RPC handler..."); - AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); - - info!("Initializing RPC and API servers..."); - let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); - let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - let web_server = run_web_server(garage, wait_from(watch_cancel.clone())); - - futures::try_join!( - bootstrap.map(|()| { - info!("Bootstrap done"); - Ok(()) - }), - run_rpc_server.map(|rv| { - info!("RPC server exited"); - rv - }), - api_server.map(|rv| { - info!("API server exited"); - rv - }), - web_server.map(|rv| { - info!("Web server exited"); - rv - }), - await_background_done.map(|rv| { - info!("Background runner exited: {:?}", rv); - Ok(()) - }), - shutdown_signal(send_cancel), - )?; + AdminRpcHandler::new(garage.clone()); + + info!("Initializing API server..."); + let api_server = tokio::spawn(run_api_server( + garage.clone(), + wait_from(watch_cancel.clone()), + )); + + info!("Initializing web server..."); + let web_server = tokio::spawn(run_web_server( + garage.clone(), + wait_from(watch_cancel.clone()), + )); + + // Stuff runs + + // When a cancel signal is sent, stuff stops + if let Err(e) = api_server.await? { + warn!("API server exited with error: {}", e); + } + if let Err(e) = web_server.await? { + warn!("Web server exited with error: {}", e); + } + + // Remove RPC handlers for system to break reference cycles + garage.system.netapp.drop_all_handlers(); + + // Await for last parts to end + run_system.await?; + await_background_done.await?; info!("Cleaning up..."); |