diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 111 |
1 files changed, 68 insertions, 43 deletions
diff --git a/src/main.rs b/src/main.rs index 324f0d49..69fb2863 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,28 @@ -mod error; mod data; +mod error; mod proto; mod membership; mod table; +mod block; mod object_table; mod version_table; -mod block; -mod server; -mod rpc_server; -mod rpc_client; mod api_server; +mod rpc_client; +mod rpc_server; +mod server; use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use structopt::StructOpt; -use error::Error; -use rpc_client::RpcClient; use data::*; +use error::Error; use proto::*; +use rpc_client::RpcClient; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] @@ -69,7 +69,6 @@ pub struct ConfigureOpt { n_tokens: u32, } - #[tokio::main] async fn main() { let opt = Opt::from_args(); @@ -77,12 +76,8 @@ async fn main() { let rpc_cli = RpcClient::new(); let resp = match opt.cmd { - Command::Server(server_opt) => { - server::run_server(server_opt.config_file).await - } - Command::Status => { - cmd_status(rpc_cli, opt.rpc_host).await - } + Command::Server(server_opt) => server::run_server(server_opt.config_file).await, + Command::Status => cmd_status(rpc_cli, opt.rpc_host).await, Command::Configure(configure_opt) => { cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await } @@ -94,28 +89,40 @@ async fn main() { } async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Error> { - let status = match rpc_cli.call(&rpc_host, - &Message::PullStatus, - DEFAULT_TIMEOUT).await? { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseNodesUp(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let config = match rpc_cli.call(&rpc_host, - &Message::PullConfig, - DEFAULT_TIMEOUT).await? { + let config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; println!("Healthy nodes:"); for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { - println!("{}\t{}\t{}\t{}", hex::encode(&adv.id), cfg.datacenter, cfg.n_tokens, adv.addr); + println!( + "{}\t{}\t{}\t{}", + hex::encode(&adv.id), + cfg.datacenter, + cfg.n_tokens, + adv.addr + ); } } let status_keys = status.iter().map(|x| x.id.clone()).collect::<HashSet<_>>(); - if config.members.iter().any(|(id, _)| !status_keys.contains(id)) { + if config + .members + .iter() + .any(|(id, _)| !status_keys.contains(id)) + { println!("\nFailed nodes:"); for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { @@ -124,7 +131,10 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro } } - if status.iter().any(|adv| !config.members.contains_key(&adv.id)) { + if status + .iter() + .any(|adv| !config.members.contains_key(&adv.id)) + { println!("\nUnconfigured nodes:"); for adv in status.iter() { if !config.members.contains_key(&adv.id) { @@ -136,12 +146,17 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro Ok(()) } -async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: ConfigureOpt) -> Result<(), Error> { - let status = match rpc_cli.call(&rpc_host, - &Message::PullStatus, - DEFAULT_TIMEOUT).await? { +async fn cmd_configure( + rpc_cli: RpcClient, + rpc_host: SocketAddr, + args: ConfigureOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseNodesUp(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let mut candidates = vec![]; @@ -151,25 +166,35 @@ async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: Configure } } if candidates.len() != 1 { - return Err(Error::Message(format!("{} matching nodes", candidates.len()))); + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); } - let mut config = match rpc_cli.call(&rpc_host, - &Message::PullConfig, - DEFAULT_TIMEOUT).await? { + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - config.members.insert(candidates[0].clone(), - NetworkConfigEntry{ - datacenter: args.datacenter, - n_tokens: args.n_tokens, - }); + config.members.insert( + candidates[0].clone(), + NetworkConfigEntry { + datacenter: args.datacenter, + n_tokens: args.n_tokens, + }, + ); config.version += 1; - rpc_cli.call(&rpc_host, - &Message::AdvertiseConfig(config), - DEFAULT_TIMEOUT).await?; + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + DEFAULT_TIMEOUT, + ) + .await?; Ok(()) } |