aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
commit3477864142ed09c36abea1111937b829fb41c8a4 (patch)
treed95221e66b9c014af7f4dba61ae4ff113c0e409a /src/main.rs
parentd66c0d6833ddbeb61e34ee222dde92a5363bda1f (diff)
downloadgarage-3477864142ed09c36abea1111937b829fb41c8a4.tar.gz
garage-3477864142ed09c36abea1111937b829fb41c8a4.zip
Fix the Sync issue. Details:
So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type.
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs111
1 files changed, 68 insertions, 43 deletions
diff --git a/src/main.rs b/src/main.rs
index 324f0d49..69fb2863 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,28 +1,28 @@
-mod error;
mod data;
+mod error;
mod proto;
mod membership;
mod table;
+mod block;
mod object_table;
mod version_table;
-mod block;
-mod server;
-mod rpc_server;
-mod rpc_client;
mod api_server;
+mod rpc_client;
+mod rpc_server;
+mod server;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::PathBuf;
use structopt::StructOpt;
-use error::Error;
-use rpc_client::RpcClient;
use data::*;
+use error::Error;
use proto::*;
+use rpc_client::RpcClient;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
@@ -69,7 +69,6 @@ pub struct ConfigureOpt {
n_tokens: u32,
}
-
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@@ -77,12 +76,8 @@ async fn main() {
let rpc_cli = RpcClient::new();
let resp = match opt.cmd {
- Command::Server(server_opt) => {
- server::run_server(server_opt.config_file).await
- }
- Command::Status => {
- cmd_status(rpc_cli, opt.rpc_host).await
- }
+ Command::Server(server_opt) => server::run_server(server_opt.config_file).await,
+ Command::Status => cmd_status(rpc_cli, opt.rpc_host).await,
Command::Configure(configure_opt) => {
cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await
}
@@ -94,28 +89,40 @@ async fn main() {
}
async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Error> {
- let status = match rpc_cli.call(&rpc_host,
- &Message::PullStatus,
- DEFAULT_TIMEOUT).await? {
+ let status = match rpc_cli
+ .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
+ .await?
+ {
Message::AdvertiseNodesUp(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let config = match rpc_cli.call(&rpc_host,
- &Message::PullConfig,
- DEFAULT_TIMEOUT).await? {
+ let config = match rpc_cli
+ .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
+ .await?
+ {
Message::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
println!("Healthy nodes:");
for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) {
- println!("{}\t{}\t{}\t{}", hex::encode(&adv.id), cfg.datacenter, cfg.n_tokens, adv.addr);
+ println!(
+ "{}\t{}\t{}\t{}",
+ hex::encode(&adv.id),
+ cfg.datacenter,
+ cfg.n_tokens,
+ adv.addr
+ );
}
}
let status_keys = status.iter().map(|x| x.id.clone()).collect::<HashSet<_>>();
- if config.members.iter().any(|(id, _)| !status_keys.contains(id)) {
+ if config
+ .members
+ .iter()
+ .any(|(id, _)| !status_keys.contains(id))
+ {
println!("\nFailed nodes:");
for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) {
@@ -124,7 +131,10 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro
}
}
- if status.iter().any(|adv| !config.members.contains_key(&adv.id)) {
+ if status
+ .iter()
+ .any(|adv| !config.members.contains_key(&adv.id))
+ {
println!("\nUnconfigured nodes:");
for adv in status.iter() {
if !config.members.contains_key(&adv.id) {
@@ -136,12 +146,17 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro
Ok(())
}
-async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: ConfigureOpt) -> Result<(), Error> {
- let status = match rpc_cli.call(&rpc_host,
- &Message::PullStatus,
- DEFAULT_TIMEOUT).await? {
+async fn cmd_configure(
+ rpc_cli: RpcClient,
+ rpc_host: SocketAddr,
+ args: ConfigureOpt,
+) -> Result<(), Error> {
+ let status = match rpc_cli
+ .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
+ .await?
+ {
Message::AdvertiseNodesUp(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let mut candidates = vec![];
@@ -151,25 +166,35 @@ async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: Configure
}
}
if candidates.len() != 1 {
- return Err(Error::Message(format!("{} matching nodes", candidates.len())));
+ return Err(Error::Message(format!(
+ "{} matching nodes",
+ candidates.len()
+ )));
}
- let mut config = match rpc_cli.call(&rpc_host,
- &Message::PullConfig,
- DEFAULT_TIMEOUT).await? {
+ let mut config = match rpc_cli
+ .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
+ .await?
+ {
Message::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- config.members.insert(candidates[0].clone(),
- NetworkConfigEntry{
- datacenter: args.datacenter,
- n_tokens: args.n_tokens,
- });
+ config.members.insert(
+ candidates[0].clone(),
+ NetworkConfigEntry {
+ datacenter: args.datacenter,
+ n_tokens: args.n_tokens,
+ },
+ );
config.version += 1;
- rpc_cli.call(&rpc_host,
- &Message::AdvertiseConfig(config),
- DEFAULT_TIMEOUT).await?;
+ rpc_cli
+ .call(
+ &rpc_host,
+ &Message::AdvertiseConfig(config),
+ DEFAULT_TIMEOUT,
+ )
+ .await?;
Ok(())
}