diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 210 |
1 files changed, 127 insertions, 83 deletions
diff --git a/src/main.rs b/src/main.rs index 05b0a73a..922e873e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,119 +2,163 @@ mod error; mod data; mod proto; mod membership; -mod rpc; -mod api; +mod server; +mod rpc_server; +mod rpc_client; +mod api_server; -use std::io::{Read, Write}; -use std::sync::Arc; +use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use structopt::StructOpt; -use futures::channel::oneshot; -use serde::Deserialize; -use rand::Rng; -use data::UUID; use error::Error; -use membership::System; +use rpc_client::RpcClient; +use data::*; +use proto::*; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] pub struct Opt { - #[structopt(short = "c", long = "config", default_value = "./config.toml")] - config_file: PathBuf, -} - -#[derive(Deserialize, Debug)] -pub struct Config { - datacenter: String, - - metadata_dir: PathBuf, - data_dir: PathBuf, + /// RPC connect to this host to execute client operations + #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] + rpc_host: SocketAddr, - api_port: u16, - rpc_port: u16, - - bootstrap_peers: Vec<SocketAddr>, + #[structopt(subcommand)] + cmd: Command, } -fn read_config(config_file: PathBuf) -> Result<Config, Error> { - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(config_file.as_path())?; - - let mut config = String::new(); - file.read_to_string(&mut config)?; - - Ok(toml::from_str(&config)?) +#[derive(StructOpt, Debug)] +pub enum Command { + /// Run Garage server + #[structopt(name = "server")] + Server(ServerOpt), + + /// Get network status + #[structopt(name = "status")] + Status, + + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureOpt), } -fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { - let mut id_file = metadata_dir.clone(); - id_file.push("node_id"); - if id_file.as_path().exists() { - let mut f = std::fs::File::open(id_file.as_path())?; - let mut d = vec![]; - f.read_to_end(&mut d)?; - if d.len() != 32 { - return Err(Error::Message(format!("Corrupt node_id file"))) - } - - let mut id = [0u8; 32]; - id.copy_from_slice(&d[..]); - Ok(id) - } else { - let id = rand::thread_rng().gen::<UUID>(); - - let mut f = std::fs::File::create(id_file.as_path())?; - f.write_all(&id[..])?; - Ok(id) - } +#[derive(StructOpt, Debug)] +pub struct ServerOpt { + /// Configuration file + #[structopt(short = "c", long = "config", default_value = "./config.toml")] + config_file: PathBuf, } -async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - for ch in chans { - ch.send(()).unwrap(); - } -} +#[derive(StructOpt, Debug)] +pub struct ConfigureOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, -async fn wait_from(chan: oneshot::Receiver<()>) -> () { - chan.await.unwrap() + /// Number of tokens + n_tokens: u32, } + #[tokio::main] async fn main() { let opt = Opt::from_args(); - let config = read_config(opt.config_file) - .expect("Unable to read config file"); - - let id = gen_node_id(&config.metadata_dir) - .expect("Unable to read or generate node ID"); - println!("Node ID: {}", hex::encode(id)); - - let sys = Arc::new(System::new(config, id)); - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); + let rpc_cli = RpcClient::new(); - let rpc_server = rpc::run_rpc_server(sys.clone(), wait_from(rx1)); - let api_server = api::run_api_server(sys.clone(), wait_from(rx2)); + let resp = match opt.cmd { + Command::Server(server_opt) => { + server::run_server(server_opt.config_file).await + } + Command::Status => { + cmd_status(rpc_cli, opt.rpc_host).await + } + Command::Configure(configure_opt) => { + cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await + } + }; - tokio::spawn(shutdown_signal(vec![tx1, tx2])); - tokio::spawn(sys.bootstrap()); + if let Err(e) = resp { + eprintln!("Error: {}", e); + } +} - let (e1, e2) = futures::join![rpc_server, api_server]; +async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Error> { + let status = match rpc_cli.call(&rpc_host, + &Message::PullStatus, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + let config = match rpc_cli.call(&rpc_host, + &Message::PullConfig, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + + println!("Healthy nodes:"); + for adv in status.iter() { + if let Some(cfg) = config.members.get(&adv.id) { + println!("{}\t{}\t{}\t{}", hex::encode(adv.id), adv.addr, adv.datacenter, cfg.n_tokens); + } + } - if let Err(e) = e1 { - eprintln!("RPC server error: {}", e) + let status_keys = status.iter().map(|x| x.id.clone()).collect::<HashSet<_>>(); + if config.members.iter().any(|(id, _)| !status_keys.contains(id)) { + println!("\nFailed nodes:"); + for (id, cfg) in config.members.iter() { + if !status.iter().any(|x| x.id == *id) { + println!("{}\t{}", hex::encode(id), cfg.n_tokens); + } + } } - if let Err(e) = e2 { - eprintln!("API server error: {}", e) + if status.iter().any(|adv| !config.members.contains_key(&adv.id)) { + println!("\nUnconfigured nodes:"); + for adv in status.iter() { + if !config.members.contains_key(&adv.id) { + println!("{}\t{}\t{}", hex::encode(adv.id), adv.addr, adv.datacenter); + } + } } + + Ok(()) } +async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: ConfigureOpt) -> Result<(), Error> { + let status = match rpc_cli.call(&rpc_host, + &Message::PullStatus, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + + let mut candidates = vec![]; + for adv in status.iter() { + if hex::encode(adv.id).starts_with(&args.node_id) { + candidates.push(adv.id.clone()); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!("{} matching nodes", candidates.len()))); + } + + let mut config = match rpc_cli.call(&rpc_host, + &Message::PullConfig, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + + config.members.insert(candidates[0].clone(), + NetworkConfigEntry{ + n_tokens: args.n_tokens, + }); + config.version += 1; + + rpc_cli.call(&rpc_host, + &Message::AdvertiseConfig(config), + DEFAULT_TIMEOUT).await?; + Ok(()) +} |