From 69f1d8fef23149e45189c296e0c0d23e040cbb0e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 17 Apr 2020 17:09:57 +0200 Subject: WIP TODOs: - ensure sync goes both way - finish sending blocks to other nodes when they need them before deleting --- src/server.rs | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 287b4386..591a7bf9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,11 @@ -pub use futures_util::future::FutureExt; -use serde::Deserialize; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; + +pub use futures_util::future::FutureExt; +use serde::Deserialize; use tokio::sync::watch; use crate::api_server; @@ -66,9 +67,11 @@ impl Garage { db: sled::Db, background: Arc, ) -> Arc { + println!("Initialize membership management system..."); let system = Arc::new(System::new(config.clone(), id, background.clone())); - let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()).await; + println!("Initialize block manager..."); + let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()); let data_rep_param = TableReplicationParams { replication_factor: system.config.data_replication_factor, @@ -84,6 +87,7 @@ impl Garage { timeout: DEFAULT_TIMEOUT, }; + println!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { background: background.clone(), @@ -95,6 +99,8 @@ impl Garage { data_rep_param.clone(), ) .await; + + println!("Initialize version_table..."); let version_table = Table::new( VersionTable { background: background.clone(), @@ -106,6 +112,8 @@ impl Garage { meta_rep_param.clone(), ) .await; + + println!("Initialize object_table..."); let object_table = Table::new( ObjectTable { background: background.clone(), @@ -118,6 +126,7 @@ impl Garage { ) .await; + println!("Initialize Garage..."); let mut garage = Self { db, system: system.clone(), @@ -142,7 +151,13 @@ impl Garage { garage.block_ref_table.clone().rpc_handler(), ); - Arc::new(garage) + let garage = Arc::new(garage); + + println!("Start block manager background thread..."); + garage.block_manager.garage.swap(Some(garage.clone())); + garage.block_manager.clone().spawn_background_worker().await; + + garage } } @@ -206,20 +221,25 @@ async fn wait_from(mut chan: watch::Receiver) -> () { } pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { + println!("Loading configuration..."); let config = read_config(config_file).expect("Unable to read config file"); + let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); + println!("Node ID: {}", hex::encode(&id)); + + println!("Opening database..."); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); let db = sled::open(db_path).expect("Unable to open DB"); - let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); - println!("Node ID: {}", hex::encode(&id)); - let (send_cancel, watch_cancel) = watch::channel(false); + println!("Initializing background runner..."); let background = BackgroundRunner::new(8, watch_cancel.clone()); + let garage = Garage::new(config, id, db, background.clone()).await; + println!("Initializing RPC and API servers..."); let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); -- cgit v1.2.3