diff options
Diffstat (limited to 'src/garage/server.rs')
-rw-r--r-- | src/garage/server.rs | 63 |
1 files changed, 20 insertions, 43 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs index 2e109f8b..c45a69b8 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -21,13 +21,13 @@ async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> .await .expect("failed to install CTRL+C signal handler"); info!("Received CTRL+C, shutting down."); - send_cancel.broadcast(true)?; + send_cancel.send(true)?; Ok(()) } async fn wait_from(mut chan: watch::Receiver<bool>) -> () { - while let Some(exit_now) = chan.recv().await { - if exit_now { + while !*chan.borrow() { + if chan.changed().await.is_err() { return; } } @@ -40,37 +40,22 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); - let db = match sled::open(&db_path) { - Ok(db) => db, - Err(e) => { - warn!("Old DB could not be openned ({}), attempting migration.", e); - let old = old_sled::open(&db_path).expect("Unable to open old DB for migration"); - let mut new_path = config.metadata_dir.clone(); - new_path.push("db2"); - let new = sled::open(&new_path).expect("Unable to open new DB for migration"); - new.import(old.export()); - if old.checksum().expect("unable to compute old db checksum") - != new.checksum().expect("unable to compute new db checksum") - { - panic!("db checksums don't match after migration"); - } - drop(new); - drop(old); - std::fs::remove_dir_all(&db_path).expect("Cannot remove old DB folder"); - std::fs::rename(new_path, &db_path) - .expect("Cannot move new DB folder to correct place"); - sled::open(db_path).expect("Unable to open new DB after migration") - } - }; + let db = sled::open(&db_path).expect("Unable to open sled DB"); info!("Initialize RPC server..."); let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); info!("Initializing background runner..."); let (send_cancel, watch_cancel) = watch::channel(false); - let background = BackgroundRunner::new(16, watch_cancel.clone()); + let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); - let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone(), db, background, &mut rpc_server); + let bootstrap = garage.system.clone().bootstrap( + &config.bootstrap_peers[..], + config.consul_host, + config.consul_service_name, + ); info!("Crate admin RPC handler..."); AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); @@ -78,21 +63,13 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - let web_server = web_server::run_web_server(garage.clone(), wait_from(watch_cancel.clone())); + let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone())); futures::try_join!( - garage - .system - .clone() - .bootstrap( - &garage.config.bootstrap_peers[..], - garage.config.consul_host.clone(), - garage.config.consul_service_name.clone() - ) - .map(|rv| { - info!("Bootstrap done"); - Ok(rv) - }), + bootstrap.map(|rv| { + info!("Bootstrap done"); + Ok(rv) + }), run_rpc_server.map(|rv| { info!("RPC server exited"); rv @@ -105,9 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Web server exited"); rv }), - background.run().map(|rv| { - info!("Background runner exited"); - Ok(rv) + await_background_done.map(|rv| { + info!("Background runner exited: {:?}", rv); + Ok(()) }), shutdown_signal(send_cancel), )?; |