diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 18 | ||||
-rw-r--r-- | src/garage/admin_rpc.rs | 80 | ||||
-rw-r--r-- | src/garage/cli.rs | 168 | ||||
-rw-r--r-- | src/garage/main.rs | 71 | ||||
-rw-r--r-- | src/garage/server.rs | 87 |
5 files changed, 206 insertions, 218 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 09ed3e1e..3023cb79 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage" -version = "0.3.0" +version = "0.4.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,12 +14,12 @@ path = "main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.3.0", path = "../api" } -garage_model = { version = "0.3.0", path = "../model" } -garage_rpc = { version = "0.3.0", path = "../rpc" } -garage_table = { version = "0.3.0", path = "../table" } -garage_util = { version = "0.3.0", path = "../util" } -garage_web = { version = "0.3.0", path = "../web" } +garage_api = { version = "0.4.0", path = "../api" } +garage_model = { version = "0.4.0", path = "../model" } +garage_rpc = { version = "0.4.0", path = "../rpc" } +garage_table = { version = "0.4.0", path = "../table" } +garage_util = { version = "0.4.0", path = "../util" } +garage_web = { version = "0.4.0", path = "../web" } bytes = "1.0" git-version = "0.3.4" @@ -27,6 +27,8 @@ hex = "0.4" log = "0.4" pretty_env_logger = "0.4" rand = "0.8" +async-trait = "0.1.7" +sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } sled = "0.34" @@ -38,3 +40,5 @@ toml = "0.5" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } + +netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index fe5a9d88..b9e57c40 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_util::error::Error; @@ -10,8 +11,7 @@ use garage_table::crdt::Crdt; use garage_table::replication::*; use garage_table::*; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; @@ -19,10 +19,8 @@ use garage_model::key_table::*; use crate::cli::*; use crate::repair::Repair; -use crate::*; -pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub const ADMIN_RPC_PATH: &str = "_admin"; +pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; #[derive(Debug, Serialize, Deserialize)] pub enum AdminRpc { @@ -33,41 +31,31 @@ pub enum AdminRpc { // Replies Ok(String), + Error(String), BucketList(Vec<String>), BucketInfo(Bucket), KeyList(Vec<(String, String)>), KeyInfo(Key), } -impl RpcMessage for AdminRpc {} +impl Message for AdminRpc { + type Response = AdminRpc; +} pub struct AdminRpcHandler { garage: Arc<Garage>, - rpc_client: Arc<RpcClient<AdminRpc>>, + endpoint: Arc<Endpoint<AdminRpc, Self>>, } impl AdminRpcHandler { pub fn new(garage: Arc<Garage>) -> Arc<Self> { - let rpc_client = garage.system.clone().rpc_client::<AdminRpc>(ADMIN_RPC_PATH); - Arc::new(Self { garage, rpc_client }) - } - - pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) { - rpc_server.add_handler::<AdminRpc, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { - let self2 = self.clone(); - async move { - match msg { - AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, - AdminRpc::Stats(opt) => self2.handle_stats(opt).await, - _ => Err(Error::BadRpc("Invalid RPC".to_string())), - } - } - }); + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { garage, endpoint }); + admin.endpoint.set_handler(admin.clone()); + admin } - async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRpc, Error> { + async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> { match cmd { BucketOperation::List => { let bucket_names = self @@ -187,7 +175,7 @@ impl AdminRpcHandler { } } - async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRpc, Error> { + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => { let key_ids = self @@ -210,13 +198,13 @@ impl AdminRpcHandler { Ok(AdminRpc::KeyInfo(key)) } KeyOperation::New(query) => { - let key = Key::new(query.name); + let key = Key::new(query.name.clone()); self.garage.key_table.insert(&key).await?; Ok(AdminRpc::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_pattern).await?; - key.name.update(query.new_name); + key.name.update(query.new_name.clone()); self.garage.key_table.insert(&key).await?; Ok(AdminRpc::KeyInfo(key)) } @@ -353,17 +341,18 @@ impl AdminRpcHandler { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { + let node = NodeID::from_slice(node.as_slice()).unwrap(); if self - .rpc_client + .endpoint .call( - *node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - ADMIN_RPC_TIMEOUT, + &node, + &AdminRpc::LaunchRepair(opt_to_send.clone()), + PRIO_NORMAL, ) .await .is_err() { - failures.push(*node); + failures.push(node); } } if failures.is_empty() { @@ -397,14 +386,16 @@ impl AdminRpcHandler { let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { + let node = NodeID::from_slice(node.as_slice()).unwrap(); + let mut opt = opt.clone(); opt.all_nodes = false; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); match self - .rpc_client - .call(*node, AdminRpc::Stats(opt), ADMIN_RPC_TIMEOUT) + .endpoint + .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL) .await { Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), @@ -495,4 +486,23 @@ impl AdminRpcHandler { .unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); } + + async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> { + match msg { + AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, + _ => Err(Error::BadRpc("Invalid RPC".to_string())), + } + } +} + +#[async_trait] +impl EndpointHandler<AdminRpc> for AdminRpcHandler { + async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc { + self.handle_rpc(message) + .await + .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e))) + } } diff --git a/src/garage/cli.rs b/src/garage/cli.rs index f9e67fac..91ec5ab2 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -1,6 +1,5 @@ -use std::cmp::max; -use std::collections::HashSet; -use std::net::SocketAddr; +//use std::cmp::max; +//use std::collections::HashSet; use std::path::PathBuf; use serde::{Deserialize, Serialize}; @@ -8,11 +7,11 @@ use structopt::StructOpt; use garage_util::data::Uuid; use garage_util::error::Error; -use garage_util::time::*; +//use garage_util::time::*; -use garage_rpc::membership::*; use garage_rpc::ring::*; -use garage_rpc::rpc_client::*; +use garage_rpc::system::*; +use garage_rpc::*; use garage_model::bucket_table::*; use garage_model::key_table::*; @@ -298,54 +297,65 @@ pub struct StatsOpt { pub async fn cli_cmd( cmd: Command, - membership_rpc_cli: RpcAddrClient<Message>, - admin_rpc_cli: RpcAddrClient<AdminRpc>, - rpc_host: SocketAddr, + system_rpc_endpoint: &Endpoint<SystemRpc, ()>, + admin_rpc_endpoint: &Endpoint<AdminRpc, ()>, + rpc_host: NodeID, ) -> Result<(), Error> { match cmd { - Command::Status => cmd_status(membership_rpc_cli, rpc_host).await, + Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, Command::Node(NodeOperation::Configure(configure_opt)) => { - cmd_configure(membership_rpc_cli, rpc_host, configure_opt).await + cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await } Command::Node(NodeOperation::Remove(remove_opt)) => { - cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await + cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await } Command::Bucket(bo) => { - cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::BucketOperation(bo)).await + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await } - Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::KeyOperation(ko)).await, - Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::LaunchRepair(ro)).await, - Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::Stats(so)).await, + Command::Key(ko) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await + } + Command::Repair(ro) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await + } + Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, _ => unreachable!(), } } -pub async fn cmd_status( - rpc_cli: RpcAddrClient<Message>, - rpc_host: SocketAddr, -) -> Result<(), Error> { +pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await? { - Message::AdvertiseNodesUp(nodes) => nodes, + SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await? { - Message::AdvertiseConfig(cfg) => cfg, + SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; + println!("STATUS:"); + for node in status { + println!("{:?}", node); + } + println!("CONFIG: (v{})", config.version); + for (id, node) in config.members { + println!("{} {:?}", hex::encode(id.as_slice()), node); + } + + /* TODO let (hostname_len, addr_len, tag_len, zone_len) = status .iter() - .map(|adv| (adv, config.members.get(&adv.id))) - .map(|(adv, cfg)| { + .map(|(id, addr, _)| (addr, config.members.get(&adv.id))) + .map(|(addr, cfg)| { ( - adv.state_info.hostname.len(), - adv.addr.to_string().len(), + 8, + addr.to_string().len(), cfg.map(|c| c.tag.len()).unwrap_or(0), cfg.map(|c| c.zone.len()).unwrap_or(0), ) @@ -355,13 +365,13 @@ pub async fn cmd_status( }); println!("Healthy nodes:"); - for adv in status.iter().filter(|x| x.is_up) { + for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) { if let Some(cfg) = config.members.get(&adv.id) { println!( "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}", - id = adv.id, - host = adv.state_info.hostname, - addr = adv.addr, + id = id, + host = "", + addr = addr, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), @@ -373,36 +383,36 @@ pub async fn cmd_status( } else { println!( "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED", - id = adv.id, - h = adv.state_info.hostname, - addr = adv.addr, - h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad = " ".repeat(addr_len - adv.addr.to_string().len()), + id = id, + h = "", + addr = addr, + h_pad = " ".repeat(hostname_len - "".len()), + a_pad = " ".repeat(addr_len - addr.to_string().len()), ); } } - let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|x| !x.is_up); + let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>(); + let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up); let failure_case_2 = config .members .iter() .any(|(id, _)| !status_keys.contains(id)); if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); - for adv in status.iter().filter(|x| !x.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { + for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) { + if let Some(cfg) = config.members.get(&id) { println!( "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago", - id=adv.id, - host=adv.state_info.hostname, - addr=adv.addr, + id=id, + host="", + addr=addr, tag=cfg.tag, zone=cfg.zone, capacity=cfg.capacity_string(), - last_seen=(now_msec() - adv.last_seen) / 1000, - h_pad=" ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad=" ".repeat(addr_len - adv.addr.to_string().len()), + last_seen=(now_msec() - 0) / 1000, + h_pad=" ".repeat(hostname_len - "".len()), + a_pad=" ".repeat(addr_len - addr.to_string().len()), t_pad=" ".repeat(tag_len - cfg.tag.len()), z_pad=" ".repeat(zone_len - cfg.zone.len()), ); @@ -411,12 +421,12 @@ pub async fn cmd_status( let (tag_len, zone_len) = config .members .iter() - .filter(|(&id, _)| !status.iter().any(|x| x.id == id)) + .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id)) .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len())) .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz))); for (id, cfg) in config.members.iter() { - if !status.iter().any(|x| x.id == *id) { + if !status.iter().any(|(xid, _, _)| xid == *id) { println!( "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen", id = id, @@ -429,6 +439,7 @@ pub async fn cmd_status( } } } + */ Ok(()) } @@ -455,25 +466,30 @@ pub fn find_matching_node( } pub async fn cmd_configure( - rpc_cli: RpcAddrClient<Message>, - rpc_host: SocketAddr, + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, args: ConfigureNodeOpt, ) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await? { - Message::AdvertiseNodesUp(nodes) => nodes, + SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?; + let added_node = find_matching_node( + status + .iter() + .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()), + &args.node_id, + )?; let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await? { - Message::AdvertiseConfig(cfg) => cfg, + SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; @@ -527,25 +543,21 @@ pub async fn cmd_configure( config.version += 1; rpc_cli - .call( - &rpc_host, - &Message::AdvertiseConfig(config), - ADMIN_RPC_TIMEOUT, - ) - .await??; + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await?; Ok(()) } pub async fn cmd_remove( - rpc_cli: RpcAddrClient<Message>, - rpc_host: SocketAddr, + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, args: RemoveNodeOpt, ) -> Result<(), Error> { let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? + .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) + .await? { - Message::AdvertiseConfig(cfg) => cfg, + SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; @@ -562,21 +574,17 @@ pub async fn cmd_remove( config.version += 1; rpc_cli - .call( - &rpc_host, - &Message::AdvertiseConfig(config), - ADMIN_RPC_TIMEOUT, - ) - .await??; + .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) + .await?; Ok(()) } pub async fn cmd_admin( - rpc_cli: RpcAddrClient<AdminRpc>, - rpc_host: SocketAddr, + rpc_cli: &Endpoint<AdminRpc, ()>, + rpc_host: NodeID, args: AdminRpc, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? { AdminRpc::Ok(msg) => { println!("{}", msg); } diff --git a/src/garage/main.rs b/src/garage/main.rs index 66828cba..7fe791b8 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -10,16 +10,16 @@ mod repair; mod server; use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; use structopt::StructOpt; -use garage_util::config::TlsConfig; +use netapp::util::parse_peer_addr; +use netapp::NetworkKey; + use garage_util::error::Error; -use garage_rpc::membership::*; -use garage_rpc::rpc_client::*; +use garage_rpc::system::*; +use garage_rpc::*; use admin_rpc::*; use cli::*; @@ -27,16 +27,14 @@ use cli::*; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] struct Opt { - /// RPC connect to this host to execute client operations - #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901", parse(try_from_str = parse_address))] - pub rpc_host: SocketAddr, + /// Host to connect to for admin operations, in the format: + /// <public-key>@<ip>:<port> + #[structopt(short = "h", long = "rpc-host")] + pub rpc_host: Option<String>, - #[structopt(long = "ca-cert")] - pub ca_cert: Option<String>, - #[structopt(long = "client-cert")] - pub client_cert: Option<String>, - #[structopt(long = "client-key")] - pub client_key: Option<String>, + /// RPC secret network key for admin operations + #[structopt(short = "s", long = "rpc-secret")] + pub rpc_secret: Option<String>, #[structopt(subcommand)] cmd: Command, @@ -66,33 +64,20 @@ async fn main() { } async fn cli_command(opt: Opt) -> Result<(), Error> { - let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) { - (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig { - ca_cert, - node_cert: client_cert, - node_key: client_key, - }), - (None, None, None) => None, - _ => { - warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); - None - } - }; - - let rpc_http_cli = - Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client")); - let membership_rpc_cli = - RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string()); - let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string()); - - cli_cmd(opt.cmd, membership_rpc_cli, admin_rpc_cli, opt.rpc_host).await -} - -fn parse_address(address: &str) -> Result<SocketAddr, String> { - use std::net::ToSocketAddrs; - address - .to_socket_addrs() - .map_err(|_| format!("Could not resolve {}", address))? - .next() - .ok_or_else(|| format!("Could not resolve {}", address)) + let net_key_hex_str = &opt.rpc_secret.expect("No RPC secret provided"); + let network_key = NetworkKey::from_slice( + &hex::decode(net_key_hex_str).expect("Invalid RPC secret key (bad hex)")[..], + ) + .expect("Invalid RPC secret provided (wrong length)"); + let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); + + let netapp = NetApp::new(network_key, sk); + let (id, addr) = + parse_peer_addr(&opt.rpc_host.expect("No RPC host provided")).expect("Invalid RPC host"); + netapp.clone().try_connect(addr, id).await?; + + let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); + let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); + + cli_cmd(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await } diff --git a/src/garage/server.rs b/src/garage/server.rs index 36f7de5c..0edf3e2d 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -1,7 +1,5 @@ use std::path::PathBuf; -use std::sync::Arc; -use futures_util::future::*; use tokio::sync::watch; use garage_util::background::*; @@ -10,21 +8,10 @@ use garage_util::error::Error; use garage_api::run_api_server; use garage_model::garage::Garage; -use garage_rpc::rpc_server::RpcServer; use garage_web::run_web_server; use crate::admin_rpc::*; -async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - info!("Received CTRL+C, shutting down."); - send_cancel.send(true)?; - Ok(()) -} - async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { if chan.changed().await.is_err() { @@ -47,52 +34,46 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .open() .expect("Unable to open sled DB"); - info!("Initialize RPC server..."); - let mut rpc_server = RpcServer::new(config.rpc_bind_addr, config.rpc_tls.clone()); - info!("Initializing background runner..."); - let (send_cancel, watch_cancel) = watch::channel(false); + let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background, &mut rpc_server); - let bootstrap = garage.system.clone().bootstrap( - config.bootstrap_peers, - config.consul_host, - config.consul_service_name, - ); + let garage = Garage::new(config.clone(), db, background); + + let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Crate admin RPC handler..."); - AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); - - info!("Initializing RPC and API servers..."); - let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); - let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - let web_server = run_web_server(garage, wait_from(watch_cancel.clone())); - - futures::try_join!( - bootstrap.map(|()| { - info!("Bootstrap done"); - Ok(()) - }), - run_rpc_server.map(|rv| { - info!("RPC server exited"); - rv - }), - api_server.map(|rv| { - info!("API server exited"); - rv - }), - web_server.map(|rv| { - info!("Web server exited"); - rv - }), - await_background_done.map(|rv| { - info!("Background runner exited: {:?}", rv); - Ok(()) - }), - shutdown_signal(send_cancel), - )?; + AdminRpcHandler::new(garage.clone()); + + info!("Initializing API server..."); + let api_server = tokio::spawn(run_api_server( + garage.clone(), + wait_from(watch_cancel.clone()), + )); + + info!("Initializing web server..."); + let web_server = tokio::spawn(run_web_server( + garage.clone(), + wait_from(watch_cancel.clone()), + )); + + // Stuff runs + + // When a cancel signal is sent, stuff stops + if let Err(e) = api_server.await? { + warn!("API server exited with error: {}", e); + } + if let Err(e) = web_server.await? { + warn!("Web server exited with error: {}", e); + } + + // Remove RPC handlers for system to break reference cycles + garage.system.netapp.drop_all_handlers(); + + // Await for last parts to end + run_system.await?; + await_background_done.await?; info!("Cleaning up..."); |