diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-11 18:51:11 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-11 18:51:11 +0200 |
commit | 53289b69e5037700689665b4edf20f2382ff15f6 (patch) | |
tree | e9920e1dce29e94bfddc3812b44ee2519ba14bed /src/server.rs | |
parent | 4a2624b76afff714a70ee7a9e4ffd97c54c7ecc4 (diff) | |
download | garage-53289b69e5037700689665b4edf20f2382ff15f6.tar.gz garage-53289b69e5037700689665b4edf20f2382ff15f6.zip |
Background task runner that replaces tokio::spawn
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 99 |
1 files changed, 64 insertions, 35 deletions
diff --git a/src/server.rs b/src/server.rs index b62c18cc..1f926bf0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,13 +1,15 @@ -use futures::channel::oneshot; +use futures_util::future::FutureExt; use serde::Deserialize; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::watch; use tokio::sync::{Mutex, RwLock}; use crate::api_server; +use crate::background::*; use crate::data::*; use crate::error::Error; use crate::membership::System; @@ -15,10 +17,31 @@ use crate::proto::*; use crate::rpc_server; use crate::table::*; +#[derive(Deserialize, Debug)] +pub struct Config { + pub metadata_dir: PathBuf, + pub data_dir: PathBuf, + + pub api_port: u16, + pub rpc_port: u16, + + pub bootstrap_peers: Vec<SocketAddr>, + + #[serde(default = "default_block_size")] + pub block_size: usize, + + #[serde(default = "default_meta_replication_factor")] + pub meta_replication_factor: usize, + + #[serde(default = "default_data_replication_factor")] + pub data_replication_factor: usize, +} + pub struct Garage { pub db: sled::Db, pub system: Arc<System>, pub fs_lock: Mutex<()>, + pub background: Arc<BackgroundRunner>, pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>, @@ -28,8 +51,13 @@ pub struct Garage { } impl Garage { - pub async fn new(config: Config, id: UUID, db: sled::Db) -> Arc<Self> { - let system = Arc::new(System::new(config, id)); + pub async fn new( + config: Config, + id: UUID, + db: sled::Db, + background: Arc<BackgroundRunner>, + ) -> Arc<Self> { + let system = Arc::new(System::new(config, id, background.clone())); let meta_rep_param = TableReplicationParams { replication_factor: system.config.meta_replication_factor, @@ -56,6 +84,14 @@ impl Garage { "version".to_string(), meta_rep_param.clone(), )); + + let data_rep_param = TableReplicationParams { + replication_factor: system.config.data_replication_factor, + write_quorum: (system.config.data_replication_factor + 1) / 2, + read_quorum: 1, + timeout: DEFAULT_TIMEOUT, + }; + let block_ref_table = Arc::new(Table::new( BlockRefTable { garage: RwLock::new(None), @@ -63,13 +99,14 @@ impl Garage { system.clone(), &db, "block_ref".to_string(), - meta_rep_param.clone(), + data_rep_param.clone(), )); let mut garage = Self { db, system: system.clone(), fs_lock: Mutex::new(()), + background, table_rpc_handlers: HashMap::new(), object_table, version_table, @@ -105,22 +142,8 @@ fn default_block_size() -> usize { fn default_meta_replication_factor() -> usize { 3 } - -#[derive(Deserialize, Debug)] -pub struct Config { - pub metadata_dir: PathBuf, - pub data_dir: PathBuf, - - pub api_port: u16, - pub rpc_port: u16, - - pub bootstrap_peers: Vec<SocketAddr>, - - #[serde(default = "default_block_size")] - pub block_size: usize, - - #[serde(default = "default_meta_replication_factor")] - pub meta_replication_factor: usize, +fn default_data_replication_factor() -> usize { + 3 } fn read_config(config_file: PathBuf) -> Result<Config, Error> { @@ -157,19 +180,22 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { } } -async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) { +async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> { // Wait for the CTRL+C signal tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); println!("Received CTRL+C, shutting down."); - for ch in chans { - ch.send(()).unwrap(); - } + send_cancel.broadcast(true)?; + Ok(()) } -async fn wait_from(chan: oneshot::Receiver<()>) -> () { - chan.await.unwrap() +async fn wait_from(mut chan: watch::Receiver<bool>) -> () { + while let Some(exit_now) = chan.recv().await { + if exit_now { + return; + } + } } pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { @@ -182,17 +208,20 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); println!("Node ID: {}", hex::encode(&id)); - let garage = Garage::new(config, id, db).await; - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); + let (send_cancel, watch_cancel) = watch::channel(false); - let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(rx1)); - let api_server = api_server::run_api_server(garage.clone(), wait_from(rx2)); + let background = BackgroundRunner::new(8, watch_cancel.clone()); + let garage = Garage::new(config, id, db, background.clone()).await; - tokio::spawn(shutdown_signal(vec![tx1, tx2])); - tokio::spawn(garage.system.clone().bootstrap()); + let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(watch_cancel.clone())); + let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - futures::try_join!(rpc_server, api_server)?; + futures::try_join!( + garage.system.clone().bootstrap().map(Ok), + rpc_server, + api_server, + background.run().map(Ok), + shutdown_signal(send_cancel), + )?; Ok(()) } |