aboutsummaryrefslogtreecommitdiff
path: root/src/garage/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/server.rs')
-rw-r--r--src/garage/server.rs87
1 files changed, 87 insertions, 0 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs
new file mode 100644
index 00000000..52d03464
--- /dev/null
+++ b/src/garage/server.rs
@@ -0,0 +1,87 @@
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use futures_util::future::*;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::config::*;
+use garage_util::error::Error;
+
+use garage_api::api_server;
+use garage_core::garage::Garage;
+use garage_rpc::rpc_server::RpcServer;
+
+use crate::admin_rpc::*;
+
+async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> {
+ // Wait for the CTRL+C signal
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+ info!("Received CTRL+C, shutting down.");
+ send_cancel.broadcast(true)?;
+ Ok(())
+}
+
+async fn wait_from(mut chan: watch::Receiver<bool>) -> () {
+ while let Some(exit_now) = chan.recv().await {
+ if exit_now {
+ return;
+ }
+ }
+}
+
+pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
+ info!("Loading configuration...");
+ let config = read_config(config_file).expect("Unable to read config file");
+
+ info!("Opening database...");
+ let mut db_path = config.metadata_dir.clone();
+ db_path.push("db");
+ let db = sled::open(db_path).expect("Unable to open DB");
+
+ info!("Initialize RPC server...");
+ let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
+
+ info!("Initializing background runner...");
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ let background = BackgroundRunner::new(16, watch_cancel.clone());
+
+ let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await;
+
+ info!("Crate admin RPC handler...");
+ AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
+
+ info!("Initializing RPC and API servers...");
+ let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
+ let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
+
+ futures::try_join!(
+ garage
+ .system
+ .clone()
+ .bootstrap(&garage.config.bootstrap_peers[..])
+ .map(|rv| {
+ info!("Bootstrap done");
+ Ok(rv)
+ }),
+ run_rpc_server.map(|rv| {
+ info!("RPC server exited");
+ rv
+ }),
+ api_server.map(|rv| {
+ info!("API server exited");
+ rv
+ }),
+ background.run().map(|rv| {
+ info!("Background runner exited");
+ Ok(rv)
+ }),
+ shutdown_signal(send_cancel),
+ )?;
+
+ info!("Cleaning up...");
+
+ Ok(())
+}