aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 16:55:24 +0200
commit1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch)
treed6437f105a630fa197b67446b5c3b2902335c34a /src
parent4067797d0142ee7860aff8da95d65820d6cc0889 (diff)
downloadgarage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz
garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml2
-rw-r--r--src/api/error.rs8
-rw-r--r--src/garage/admin_rpc.rs46
-rw-r--r--src/garage/cli.rs235
-rw-r--r--src/garage/main.rs3
-rw-r--r--src/garage/server.rs8
-rw-r--r--src/model/block.rs30
-rw-r--r--src/model/garage.rs6
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/rpc/consul.rs125
-rw-r--r--src/rpc/ring.rs36
-rw-r--r--src/rpc/rpc_helper.rs53
-rw-r--r--src/rpc/system.rs277
-rw-r--r--src/table/gc.rs29
-rw-r--r--src/table/replication/fullcopy.rs7
-rw-r--r--src/table/replication/parameters.rs5
-rw-r--r--src/table/replication/sharded.rs5
-rw-r--r--src/table/sync.rs36
-rw-r--r--src/table/table.rs35
-rw-r--r--src/util/config.rs26
-rw-r--r--src/util/data.rs12
-rw-r--r--src/util/error.rs82
-rw-r--r--src/web/Cargo.toml2
-rw-r--r--src/web/error.rs4
24 files changed, 654 insertions, 422 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index b46652df..ebbe7c0d 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -35,7 +35,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
http = "0.2"
httpdate = "0.3"
http-range = "0.1"
-hyper = "0.14"
+hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
percent-encoding = "2.1.0"
roxmltree = "0.14"
serde = { version = "1.0", features = ["derive"] }
diff --git a/src/api/error.rs b/src/api/error.rs
index 7d97366e..35fa404f 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -82,7 +82,9 @@ impl Error {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
- Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE,
+ Error::InternalError(
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_),
+ ) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
@@ -95,7 +97,9 @@ impl Error {
Error::NotFound => "NoSuchKey",
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
- Error::InternalError(GarageError::Rpc(_)) => "ServiceUnavailable",
+ Error::InternalError(
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_),
+ ) => "ServiceUnavailable",
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
_ => "InvalidRequest",
}
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index b9e57c40..339d5bdb 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -31,15 +31,14 @@ pub enum AdminRpc {
// Replies
Ok(String),
- Error(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
}
-impl Message for AdminRpc {
- type Response = AdminRpc;
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpc, Error>;
}
pub struct AdminRpcHandler {
@@ -341,17 +340,20 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
- let node = NodeID::from_slice(node.as_slice()).unwrap();
- if self
+ let node = (*node).into();
+ let resp = self
.endpoint
.call(
&node,
&AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
- .await
- .is_err()
- {
+ .await;
+ let is_err = match resp {
+ Ok(Ok(_)) => false,
+ _ => true,
+ };
+ if is_err {
failures.push(node);
}
}
@@ -386,17 +388,17 @@ impl AdminRpcHandler {
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
- let node = NodeID::from_slice(node.as_slice()).unwrap();
-
let mut opt = opt.clone();
opt.all_nodes = false;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
+
+ let node_id = (*node).into();
match self
.endpoint
- .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL)
- .await
+ .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL)
+ .await?
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
@@ -486,9 +488,16 @@ impl AdminRpcHandler {
.unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
}
+}
- async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> {
- match msg {
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpc, Error> {
+ match message {
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
@@ -497,12 +506,3 @@ impl AdminRpcHandler {
}
}
}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e)))
- }
-}
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index 91ec5ab2..940a5a85 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -1,5 +1,4 @@
-//use std::cmp::max;
-//use std::collections::HashSet;
+use std::collections::HashSet;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
@@ -7,7 +6,7 @@ use structopt::StructOpt;
use garage_util::data::Uuid;
use garage_util::error::Error;
-//use garage_util::time::*;
+use garage_util::time::*;
use garage_rpc::ring::*;
use garage_rpc::system::*;
@@ -58,6 +57,10 @@ pub struct ServerOpt {
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
+ /// Connect to Garage node that is currently isolated from the system
+ #[structopt(name = "connect")]
+ Connect(ConnectNodeOpt),
+
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureNodeOpt),
@@ -68,6 +71,13 @@ pub enum NodeOperation {
}
#[derive(StructOpt, Debug)]
+pub struct ConnectNodeOpt {
+ /// Node public key and address, in the format:
+ /// `<public key hexadecimal>@<ip or hostname>:<port>`
+ node: String,
+}
+
+#[derive(StructOpt, Debug)]
pub struct ConfigureNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
@@ -303,6 +313,9 @@ pub async fn cli_cmd(
) -> Result<(), Error> {
match cmd {
Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
+ Command::Node(NodeOperation::Connect(connect_opt)) => {
+ cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
+ }
Command::Node(NodeOperation::Configure(configure_opt)) => {
cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
@@ -326,142 +339,96 @@ pub async fn cli_cmd(
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- println!("STATUS:");
- for node in status {
- println!("{:?}", node);
- }
- println!("CONFIG: (v{})", config.version);
- for (id, node) in config.members {
- println!("{} {:?}", hex::encode(id.as_slice()), node);
- }
-
- /* TODO
- let (hostname_len, addr_len, tag_len, zone_len) = status
- .iter()
- .map(|(id, addr, _)| (addr, config.members.get(&adv.id)))
- .map(|(addr, cfg)| {
- (
- 8,
- addr.to_string().len(),
- cfg.map(|c| c.tag.len()).unwrap_or(0),
- cfg.map(|c| c.zone.len()).unwrap_or(0),
- )
- })
- .fold((0, 0, 0, 0), |(h, a, t, z), (mh, ma, mt, mz)| {
- (max(h, mh), max(a, ma), max(t, mt), max(z, mz))
- });
-
println!("Healthy nodes:");
- for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) {
+ let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
+ for adv in status.iter().filter(|adv| adv.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
- println!(
- "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}",
- id = id,
- host = "",
- addr = addr,
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
- h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad = " ".repeat(addr_len - adv.addr.to_string().len()),
- t_pad = " ".repeat(tag_len - cfg.tag.len()),
- z_pad = " ".repeat(zone_len - cfg.zone.len()),
- );
+ ));
} else {
- println!(
- "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED",
- id = id,
- h = "",
- addr = addr,
- h_pad = " ".repeat(hostname_len - "".len()),
- a_pad = " ".repeat(addr_len - addr.to_string().len()),
- );
+ healthy_nodes.push(format!(
+ "{id:?}\t{h}\t{addr}\tUNCONFIGURED/REMOVED",
+ id = adv.id,
+ h = adv.status.hostname,
+ addr = adv.addr,
+ ));
}
}
+ format_table(healthy_nodes);
- let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up);
+ let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
+ let failure_case_1 = status.iter().any(|adv| !adv.is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
- for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) {
- if let Some(cfg) = config.members.get(&id) {
- println!(
- "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago",
- id=id,
- host="",
- addr=addr,
- tag=cfg.tag,
- zone=cfg.zone,
- capacity=cfg.capacity_string(),
- last_seen=(now_msec() - 0) / 1000,
- h_pad=" ".repeat(hostname_len - "".len()),
- a_pad=" ".repeat(addr_len - addr.to_string().len()),
- t_pad=" ".repeat(tag_len - cfg.tag.len()),
- z_pad=" ".repeat(zone_len - cfg.zone.len()),
- );
+ let mut failed_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
+ for adv in status.iter().filter(|adv| !adv.is_up) {
+ if let Some(cfg) = config.members.get(&adv.id) {
+ failed_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
+ tag = cfg.tag,
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ last_seen = (now_msec() - 0) / 1000,
+ ));
}
}
- let (tag_len, zone_len) = config
- .members
- .iter()
- .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id))
- .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len()))
- .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz)));
-
for (id, cfg) in config.members.iter() {
- if !status.iter().any(|(xid, _, _)| xid == *id) {
- println!(
- "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen",
+ if !status.iter().any(|adv| adv.id == *id) {
+ failed_nodes.push(format!(
+ "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
id = id,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
- t_pad = " ".repeat(tag_len - cfg.tag.len()),
- z_pad = " ".repeat(zone_len - cfg.zone.len()),
- );
+ ));
}
}
+ format_table(failed_nodes);
}
- */
Ok(())
}
-pub fn find_matching_node(
- cand: impl std::iter::Iterator<Item = Uuid>,
- pattern: &str,
-) -> Result<Uuid, Error> {
- let mut candidates = vec![];
- for c in cand {
- if hex::encode(&c).starts_with(&pattern) {
- candidates.push(c);
+pub async fn cmd_connect(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ args: ConnectNodeOpt,
+) -> Result<(), Error> {
+ match rpc_cli.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL).await?? {
+ SystemRpc::Ok => {
+ println!("Success.");
+ Ok(())
+ }
+ r => {
+ Err(Error::BadRpc(format!("Unexpected response: {:?}", r)))
}
- }
- if candidates.len() != 1 {
- Err(Error::Message(format!(
- "{} nodes match '{}'",
- candidates.len(),
- pattern,
- )))
- } else {
- Ok(candidates[0])
}
}
@@ -472,22 +439,17 @@ pub async fn cmd_configure(
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let added_node = find_matching_node(
- status
- .iter()
- .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()),
- &args.node_id,
- )?;
+ let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -544,7 +506,7 @@ pub async fn cmd_configure(
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await?;
+ .await??;
Ok(())
}
@@ -555,7 +517,7 @@ pub async fn cmd_remove(
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await?
+ .await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@@ -575,7 +537,7 @@ pub async fn cmd_remove(
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await?;
+ .await??;
Ok(())
}
@@ -584,7 +546,7 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? {
+ match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
@@ -613,6 +575,8 @@ pub async fn cmd_admin(
Ok(())
}
+// --- Utility functions ----
+
fn print_key_info(key: &Key) {
println!("Key name: {}", key.name.get());
println!("Key ID: {}", key.key_id);
@@ -640,3 +604,54 @@ fn print_bucket_info(bucket: &Bucket) {
}
};
}
+
+fn format_table(data: Vec<String>) {
+ let data = data
+ .iter()
+ .map(|s| s.split('\t').collect::<Vec<_>>())
+ .collect::<Vec<_>>();
+
+ let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
+ let mut column_size = vec![0; columns];
+
+ let mut out = String::new();
+
+ for row in data.iter() {
+ for (i, col) in row.iter().enumerate() {
+ column_size[i] = std::cmp::max(column_size[i], col.chars().count());
+ }
+ }
+
+ for row in data.iter() {
+ for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
+ out.push_str(col);
+ (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
+ }
+ out.push_str(&row[row.len() - 1]);
+ out.push('\n');
+ }
+
+ print!("{}", out);
+}
+
+pub fn find_matching_node(
+ cand: impl std::iter::Iterator<Item = Uuid>,
+ pattern: &str,
+) -> Result<Uuid, Error> {
+ let mut candidates = vec![];
+ for c in cand {
+ if hex::encode(&c).starts_with(&pattern) {
+ candidates.push(c);
+ }
+ }
+ if candidates.len() != 1 {
+ Err(Error::Message(format!(
+ "{} nodes match '{}'",
+ candidates.len(),
+ pattern,
+ )))
+ } else {
+ Ok(candidates[0])
+ }
+}
+
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 7fe791b8..543860ca 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -9,8 +9,6 @@ mod cli;
mod repair;
mod server;
-use std::net::SocketAddr;
-
use structopt::StructOpt;
use netapp::util::parse_peer_addr;
@@ -43,6 +41,7 @@ struct Opt {
#[tokio::main]
async fn main() {
pretty_env_logger::init();
+ sodiumoxide::init().expect("Unable to init sodiumoxide");
let opt = Opt::from_args();
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 0edf3e2d..cd92d157 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -71,8 +71,14 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();
- // Await for last parts to end
+ // Await for netapp RPC system to end
run_system.await?;
+
+ // Break last reference cycles so that stuff can terminate properly
+ garage.break_reference_cycles();
+ drop(garage);
+
+ // Await for all background tasks to end
await_background_done.await?;
info!("Cleaning up...");
diff --git a/src/model/block.rs b/src/model/block.rs
index 5574b7f6..a1dcf776 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -38,7 +38,6 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
Ok,
- Error(String),
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
@@ -61,8 +60,8 @@ pub struct PutBlockMessage {
pub data: Vec<u8>,
}
-impl Message for BlockRpc {
- type Response = BlockRpc;
+impl Rpc for BlockRpc {
+ type Response = Result<BlockRpc, Error>;
}
/// The block manager, handling block exchange between nodes, and block storage on local node
@@ -117,15 +116,6 @@ impl BlockManager {
block_manager
}
- async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> {
- match msg {
- BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
- BlockRpc::GetBlock(h) => self.read_block(h).await,
- BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
- _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
- }
- }
-
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
@@ -532,11 +522,17 @@ impl BlockManager {
#[async_trait]
impl EndpointHandler<BlockRpc> for BlockManager {
- async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc {
- self.clone()
- .handle_rpc(message)
- .await
- .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e)))
+ async fn handle(
+ self: &Arc<Self>,
+ message: &BlockRpc,
+ _from: NodeID,
+ ) -> Result<BlockRpc, Error> {
+ match message {
+ BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
+ BlockRpc::GetBlock(h) => self.read_block(h).await,
+ BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
+ _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
+ }
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index d4ea6f55..482c4df7 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -61,6 +61,7 @@ impl Garage {
background.clone(),
replication_mode.replication_factor(),
config.rpc_bind_addr,
+ config.rpc_public_addr,
config.bootstrap_peers.clone(),
config.consul_host.clone(),
config.consul_service_name.clone(),
@@ -162,4 +163,9 @@ impl Garage {
garage
}
+
+ /// Use this for shutdown
+ pub fn break_reference_cycles(&self) {
+ self.block_manager.garage.swap(None);
+ }
}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index ef03898a..f0ac6570 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,8 +15,6 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.4.0", path = "../util" }
-garage_rpc_021 = { package = "garage_rpc", version = "0.2.1" }
-
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
@@ -36,5 +34,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-hyper = "0.14"
+hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 63051a6b..82bf99ba 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -1,24 +1,31 @@
+use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use hyper::client::Client;
use hyper::StatusCode;
use hyper::{Body, Method, Request};
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
+
+use netapp::NodeID;
use garage_util::error::Error;
-#[derive(Deserialize, Clone)]
-struct ConsulEntry {
- #[serde(alias = "Address")]
+// ---- READING FROM CONSUL CATALOG ----
+
+#[derive(Deserialize, Clone, Debug)]
+struct ConsulQueryEntry {
+ #[serde(rename = "Address")]
address: String,
- #[serde(alias = "ServicePort")]
+ #[serde(rename = "ServicePort")]
service_port: u16,
+ #[serde(rename = "NodeMeta")]
+ node_meta: HashMap<String, String>,
}
pub async fn get_consul_nodes(
consul_host: &str,
consul_service_name: &str,
-) -> Result<Vec<SocketAddr>, Error> {
+) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
let url = format!(
"http://{}/v1/catalog/service/{}",
consul_host, consul_service_name
@@ -36,17 +43,111 @@ pub async fn get_consul_nodes(
}
let body = hyper::body::to_bytes(resp.into_body()).await?;
- let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?;
+ let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
let mut ret = vec![];
for ent in entries {
- let ip = ent
- .address
- .parse::<IpAddr>()
- .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?;
- ret.push(SocketAddr::new(ip, ent.service_port));
+ let ip = ent.address.parse::<IpAddr>().ok();
+ let pubkey = ent
+ .node_meta
+ .get("pubkey")
+ .map(|k| hex::decode(&k).ok())
+ .flatten()
+ .map(|k| NodeID::from_slice(&k[..]))
+ .flatten();
+ if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
+ ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
+ } else {
+ warn!(
+ "Could not process node spec from Consul: {:?} (invalid IP or public key)",
+ ent
+ );
+ }
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
+
+// ---- PUBLISHING TO CONSUL CATALOG ----
+
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishEntry {
+ #[serde(rename = "Node")]
+ node: String,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "NodeMeta")]
+ node_meta: HashMap<String, String>,
+ #[serde(rename = "Service")]
+ service: ConsulPublishService,
+}
+
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishService {
+ #[serde(rename = "ID")]
+ service_id: String,
+ #[serde(rename = "Service")]
+ service_name: String,
+ #[serde(rename = "Tags")]
+ tags: Vec<String>,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "Port")]
+ port: u16,
+}
+
+pub async fn publish_consul_service(
+ consul_host: &str,
+ consul_service_name: &str,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+) -> Result<(), Error> {
+ let node = format!("garage:{}", hex::encode(&node_id[..8]));
+
+ let advertisment = ConsulPublishEntry {
+ node: node.clone(),
+ address: rpc_public_addr.ip(),
+ node_meta: [
+ ("pubkey".to_string(), hex::encode(node_id)),
+ ("hostname".to_string(), hostname.to_string()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ service: ConsulPublishService {
+ service_id: node.clone(),
+ service_name: consul_service_name.to_string(),
+ tags: vec!["advertised-by-garage".into(), hostname.into()],
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ };
+
+ let url = format!("http://{}/v1/catalog/register", consul_host);
+ let req_body = serde_json::to_string(&advertisment)?;
+ debug!("Request body for consul adv: {}", req_body);
+
+ let req = Request::builder()
+ .uri(url)
+ .method(Method::PUT)
+ .body(Body::from(req_body))?;
+
+ let client = Client::new();
+
+ let resp = client.request(req).await?;
+ debug!("Response of advertising to Consul: {:?}", resp);
+ let resp_code = resp.status();
+ debug!(
+ "{}",
+ std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)
+ .unwrap_or("<invalid utf8>")
+ );
+
+ if resp_code != StatusCode::OK {
+ return Err(Error::Message(format!("HTTP error {}", resp_code)));
+ }
+
+ Ok(())
+}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 7cbab762..3cb0d233 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -3,8 +3,6 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
-use netapp::NodeID;
-
use serde::{Deserialize, Serialize};
use garage_util::data::*;
@@ -40,31 +38,6 @@ impl NetworkConfig {
version: 0,
}
}
-
- pub(crate) fn migrate_from_021(old: garage_rpc_021::ring::NetworkConfig) -> Self {
- let members = old
- .members
- .into_iter()
- .map(|(id, conf)| {
- (
- Hash::try_from(id.as_slice()).unwrap(),
- NetworkConfigEntry {
- zone: conf.datacenter,
- capacity: if conf.capacity == 0 {
- None
- } else {
- Some(conf.capacity)
- },
- tag: conf.tag,
- },
- )
- })
- .collect();
- Self {
- members,
- version: old.version,
- }
- }
}
/// The overall configuration of one (possibly remote) node
@@ -100,7 +73,7 @@ pub struct Ring {
pub config: NetworkConfig,
// Internal order of nodes used to make a more compact representation of the ring
- nodes: Vec<NodeID>,
+ nodes: Vec<Uuid>,
// The list of entries in the ring
ring: Vec<RingEntry>,
@@ -262,11 +235,6 @@ impl Ring {
})
.collect::<Vec<_>>();
- let nodes = nodes
- .iter()
- .map(|id| NodeID::from_slice(id.as_slice()).unwrap())
- .collect::<Vec<_>>();
-
Self {
replication_factor,
config,
@@ -298,7 +266,7 @@ impl Ring {
}
/// Walk the ring to find the n servers in which data should be replicated
- pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<NodeID> {
+ pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index c9458ee6..9f735ab4 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -8,13 +8,14 @@ use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-pub use netapp::endpoint::{Endpoint, EndpointHandler, Message};
+pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
-use garage_util::error::{Error, RpcError};
+use garage_util::error::Error;
+use garage_util::data::Uuid;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -66,46 +67,47 @@ pub struct RpcHelper {
}
impl RpcHelper {
- pub async fn call<M, H>(
+ pub async fn call<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- to: NodeID,
+ to: Uuid,
msg: M,
strat: RequestStrategy,
- ) -> Result<M::Response, Error>
+ ) -> Result<S, Error>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
self.call_arc(endpoint, to, Arc::new(msg), strat).await
}
- pub async fn call_arc<M, H>(
+ pub async fn call_arc<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- to: NodeID,
+ to: Uuid,
msg: Arc<M>,
strat: RequestStrategy,
- ) -> Result<M::Response, Error>
+ ) -> Result<S, Error>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
+ let node_id = to.into();
select! {
- res = endpoint.call(&to, &msg, strat.rs_priority) => Ok(res?),
- _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Rpc(RpcError::Timeout)),
+ res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??),
+ _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout),
}
}
- pub async fn call_many<M, H>(
+ pub async fn call_many<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- to: &[NodeID],
+ to: &[Uuid],
msg: M,
strat: RequestStrategy,
- ) -> Vec<(NodeID, Result<M::Response, Error>)>
+ ) -> Vec<(Uuid, Result<S, Error>)>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
let msg = Arc::new(msg);
@@ -120,37 +122,38 @@ impl RpcHelper {
.collect::<Vec<_>>()
}
- pub async fn broadcast<M, H>(
+ pub async fn broadcast<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
msg: M,
strat: RequestStrategy,
- ) -> Vec<(NodeID, Result<M::Response, Error>)>
+ ) -> Vec<(Uuid, Result<S, Error>)>
where
- M: Message,
+ M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
let to = self
.fullmesh
.get_peer_list()
.iter()
- .map(|p| p.id)
+ .map(|p| p.id.into())
.collect::<Vec<_>>();
self.call_many(endpoint, &to[..], msg, strat).await
}
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
/// strategy could not be respected due to too many errors
- pub async fn try_call_many<M, H>(
+ pub async fn try_call_many<M, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
- to: &[NodeID],
+ to: &[Uuid],
msg: M,
strategy: RequestStrategy,
- ) -> Result<Vec<M::Response>, Error>
+ ) -> Result<Vec<S>, Error>
where
- M: Message + 'static,
+ M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static,
+ S: Send,
{
let msg = Arc::new(msg);
let mut resp_stream = to
@@ -200,7 +203,7 @@ impl RpcHelper {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::from(RpcError::TooManyErrors(errors)))
+ Err(Error::TooManyErrors(errors))
}
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 7ccec945..b95cff58 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,8 +1,9 @@
//! Module containing structs related to membership management
+use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
use std::time::Duration;
use arc_swap::ArcSwap;
@@ -14,21 +15,24 @@ use sodiumoxide::crypto::sign::ed25519;
use tokio::sync::watch;
use tokio::sync::Mutex;
-use netapp::endpoint::{Endpoint, EndpointHandler, Message};
+use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::proto::*;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
+use netapp::util::parse_and_resolve_peer_addr;
use garage_util::background::BackgroundRunner;
+use garage_util::data::Uuid;
use garage_util::error::Error;
use garage_util::persister::Persister;
-//use garage_util::time::*;
+use garage_util::time::*;
-//use crate::consul::get_consul_nodes;
+use crate::consul::*;
use crate::ring::*;
-use crate::rpc_helper::{RequestStrategy, RpcHelper};
+use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
+const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// RPC endpoint used for calls related to membership
@@ -39,33 +43,35 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
pub enum SystemRpc {
/// Response to successfull advertisements
Ok,
- /// Error response
- Error(String),
+ /// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
+ Connect(String),
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
- AdvertiseStatus(StateInfo),
+ AdvertiseStatus(NodeStatus),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
- ReturnKnownNodes(Vec<(NodeID, SocketAddr, bool)>),
+ ReturnKnownNodes(Vec<KnownNodeInfo>),
}
-impl Message for SystemRpc {
- type Response = SystemRpc;
+impl Rpc for SystemRpc {
+ type Response = Result<SystemRpc, Error>;
}
/// This node's membership manager
pub struct System {
/// The id of this node
- pub id: NodeID,
+ pub id: Uuid,
persist_config: Persister<NetworkConfig>,
+ persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
- state_info: ArcSwap<StateInfo>,
+ local_status: ArcSwap<NodeStatus>,
+ node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
pub netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>,
@@ -74,6 +80,7 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
+ rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
@@ -88,7 +95,7 @@ pub struct System {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct StateInfo {
+pub struct NodeStatus {
/// Hostname of the node
pub hostname: String,
/// Replication factor configured on the node
@@ -97,26 +104,34 @@ pub struct StateInfo {
pub config_version: u64,
}
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct KnownNodeInfo {
+ pub id: Uuid,
+ pub addr: SocketAddr,
+ pub is_up: bool,
+ pub status: NodeStatus,
+}
+
fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
- let mut id_file = metadata_dir.to_path_buf();
- id_file.push("node_id");
- if id_file.as_path().exists() {
- let mut f = std::fs::File::open(id_file.as_path())?;
+ let mut key_file = metadata_dir.to_path_buf();
+ key_file.push("node_key");
+ if key_file.as_path().exists() {
+ let mut f = std::fs::File::open(key_file.as_path())?;
let mut d = vec![];
f.read_to_end(&mut d)?;
if d.len() != 64 {
- return Err(Error::Message("Corrupt node_id file".to_string()));
+ return Err(Error::Message("Corrupt node_key file".to_string()));
}
let mut key = [0u8; 64];
key.copy_from_slice(&d[..]);
Ok(NodeKey::from_slice(&key[..]).unwrap())
} else {
- let (key, _) = ed25519::gen_keypair();
+ let (_, key) = ed25519::gen_keypair();
- let mut f = std::fs::File::create(id_file.as_path())?;
+ let mut f = std::fs::File::create(key_file.as_path())?;
f.write_all(&key[..])?;
- Ok(NodeKey::from_slice(&key[..]).unwrap())
+ Ok(key)
}
}
@@ -128,6 +143,7 @@ impl System {
background: Arc<BackgroundRunner>,
replication_factor: usize,
rpc_listen_addr: SocketAddr,
+ rpc_public_address: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
@@ -136,29 +152,20 @@ impl System {
info!("Node public key: {}", hex::encode(&node_key.public_key()));
let persist_config = Persister::new(&metadata_dir, "network_config");
+ let persist_peer_list = Persister::new(&metadata_dir, "peer_list");
let net_config = match persist_config.load() {
Ok(x) => x,
Err(e) => {
- match Persister::<garage_rpc_021::ring::NetworkConfig>::new(
- &metadata_dir,
- "network_config",
- )
- .load()
- {
- Ok(old_config) => NetworkConfig::migrate_from_021(old_config),
- Err(e2) => {
- info!(
- "No valid previous network configuration stored ({}, {}), starting fresh.",
- e, e2
- );
- NetworkConfig::new()
- }
- }
+ info!(
+ "No valid previous network configuration stored ({}), starting fresh.",
+ e
+ );
+ NetworkConfig::new()
}
};
- let state_info = StateInfo {
+ let local_status = NodeStatus {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
@@ -169,15 +176,27 @@ impl System {
let ring = Ring::new(net_config, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
+ if let Some(addr) = rpc_public_address {
+ println!("{}@{}", hex::encode(&node_key.public_key()), addr);
+ } else {
+ println!("{}", hex::encode(&node_key.public_key()));
+ }
+
let netapp = NetApp::new(network_key, node_key);
- let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers.clone());
+ let fullmesh = FullMeshPeeringStrategy::new(
+ netapp.clone(),
+ bootstrap_peers.clone(),
+ rpc_public_address,
+ );
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
let sys = Arc::new(System {
- id: netapp.id.clone(),
+ id: netapp.id.into(),
persist_config,
- state_info: ArcSwap::new(Arc::new(state_info)),
+ persist_peer_list,
+ local_status: ArcSwap::new(Arc::new(local_status)),
+ node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
rpc: RpcHelper {
@@ -187,6 +206,7 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr,
+ rpc_public_addr: rpc_public_address,
bootstrap_peers,
consul_host,
consul_service_name,
@@ -206,11 +226,38 @@ impl System {
.listen(self.rpc_listen_addr, None, must_exit.clone()),
self.fullmesh.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()),
+ self.status_exchange_loop(must_exit.clone()),
);
}
// ---- INTERNALS ----
+ async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ let (consul_host, consul_service_name) =
+ match (&self.consul_host, &self.consul_service_name) {
+ (Some(ch), Some(csn)) => (ch, csn),
+ _ => return Ok(()),
+ };
+
+ let rpc_public_addr = match self.rpc_public_addr {
+ Some(addr) => addr,
+ None => {
+ warn!("Not advertising to Consul because rpc_public_addr is not defined in config file.");
+ return Ok(());
+ }
+ };
+
+ publish_consul_service(
+ consul_host,
+ consul_service_name,
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e)))
+ }
+
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
@@ -221,12 +268,27 @@ impl System {
Ok(())
}
- fn update_state_info(&self) {
- let mut new_si: StateInfo = self.state_info.load().as_ref().clone();
+ fn update_local_status(&self) {
+ let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let ring = self.ring.borrow();
new_si.config_version = ring.config.version;
- self.state_info.swap(Arc::new(new_si));
+ self.local_status.swap(Arc::new(new_si));
+ }
+
+ async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(node)
+ .ok_or_else(|| Error::Message(format!("Unable to parse or resolve node specification: {}", node)))?;
+ let mut errors = vec![];
+ for ip in addrs.iter() {
+ match self.netapp.clone().try_connect(*ip, pubkey).await {
+ Ok(()) => return Ok(SystemRpc::Ok),
+ Err(e) => {
+ errors.push((*ip, e));
+ }
+ }
+ }
+ return Err(Error::Message(format!("Could not connect to specified peers. Errors: {:?}", errors)));
}
fn handle_pull_config(&self) -> SystemRpc {
@@ -234,6 +296,58 @@ impl System {
SystemRpc::AdvertiseConfig(ring.config.clone())
}
+ fn handle_get_known_nodes(&self) -> SystemRpc {
+ let node_status = self.node_status.read().unwrap();
+ let known_nodes =
+ self.fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| KnownNodeInfo {
+ id: n.id.into(),
+ addr: n.addr,
+ is_up: n.is_up(),
+ status: node_status.get(&n.id.into()).cloned().map(|(_, st)| st).unwrap_or(
+ NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ config_version: 0,
+ },
+ ),
+ })
+ .collect::<Vec<_>>();
+ SystemRpc::ReturnKnownNodes(known_nodes)
+ }
+
+ async fn handle_advertise_status(
+ self: &Arc<Self>,
+ from: Uuid,
+ info: &NodeStatus,
+ ) -> Result<SystemRpc, Error> {
+ let local_info = self.local_status.load();
+
+ if local_info.replication_factor < info.replication_factor {
+ error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs",
+ info.replication_factor,
+ local_info.replication_factor);
+ std::process::exit(1);
+ }
+
+ if info.config_version > local_info.config_version {
+ let self2 = self.clone();
+ self.background.spawn_cancellable(async move {
+ self2.pull_config(from).await;
+ Ok(())
+ });
+ }
+
+ self.node_status
+ .write()
+ .unwrap()
+ .insert(from, (now_msec(), info.clone()));
+
+ Ok(SystemRpc::Ok)
+ }
+
async fn handle_advertise_config(
self: Arc<Self>,
adv: &NetworkConfig,
@@ -265,13 +379,32 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) {
- /* TODO
+ async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
+ while !*stop_signal.borrow() {
+ let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL);
+
+ self.update_local_status();
+ let local_status: NodeStatus = self.local_status.load().as_ref().clone();
+ self.rpc
+ .broadcast(
+ &self.system_endpoint,
+ SystemRpc::AdvertiseStatus(local_status),
+ RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
+ )
+ .await;
+
+ select! {
+ _ = restart_at.fuse() => {},
+ _ = stop_signal.changed().fuse() => {},
+ }
+ }
+ }
+
+ async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
let consul_config = match (&self.consul_host, &self.consul_service_name) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
};
- */
while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.is_empty();
@@ -286,34 +419,42 @@ impl System {
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
- let ping_list = self.bootstrap_peers.clone();
+ let mut ping_list = self.bootstrap_peers.clone();
- /*
- *TODO bring this back: persisted list of peers
- if let Ok(peers) = self.persist_status.load_async().await {
- ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
+ // Add peer list from list stored on disk
+ if let Ok(peers) = self.persist_peer_list.load_async().await {
+ ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
- */
- /*
- * TODO bring this back: get peers from consul
+ // Fetch peer list from Consul
if let Some((consul_host, consul_service_name)) = &consul_config {
match get_consul_nodes(consul_host, consul_service_name).await {
Ok(node_list) => {
- ping_list.extend(node_list.iter().map(|a| (*a, None)));
+ ping_list.extend(node_list);
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
}
}
}
- */
for (node_id, node_addr) in ping_list {
tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id));
}
}
+ let peer_list = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| (n.id.into(), n.addr))
+ .collect::<Vec<_>>();
+ if let Err(e) = self.persist_peer_list.save_async(&peer_list).await {
+ warn!("Could not save peer list to file: {}", e);
+ }
+
+ self.background.spawn(self.clone().advertise_to_consul());
+
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => {},
@@ -322,7 +463,7 @@ impl System {
}
}
- async fn pull_config(self: Arc<Self>, peer: NodeID) {
+ async fn pull_config(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc
.call(
@@ -340,24 +481,14 @@ impl System {
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
- async fn handle(self: &Arc<Self>, msg: &SystemRpc, _from: NodeID) -> SystemRpc {
- let resp = match msg {
+ async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
+ match msg {
+ SystemRpc::Connect(node) => self.handle_connect(node).await,
SystemRpc::PullConfig => Ok(self.handle_pull_config()),
+ SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(&adv).await,
- SystemRpc::GetKnownNodes => {
- let known_nodes = self
- .fullmesh
- .get_peer_list()
- .iter()
- .map(|n| (n.id, n.addr, n.is_up()))
- .collect::<Vec<_>>();
- Ok(SystemRpc::ReturnKnownNodes(known_nodes))
- }
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
- };
- match resp {
- Ok(r) => r,
- Err(e) => SystemRpc::Error(format!("{}", e)),
}
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index c03648ef..9b3d60ff 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -36,11 +36,10 @@ enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
- Error(String),
}
-impl Message for GcRpc {
- type Response = GcRpc;
+impl Rpc for GcRpc {
+ type Response = Result<GcRpc, Error>;
}
impl<F, R> TableGc<F, R>
@@ -168,7 +167,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<NodeID>,
+ nodes: Vec<Uuid>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -224,8 +223,15 @@ where
.compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?;
Ok(())
}
+}
- async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
self.data.update_many(items)?;
@@ -242,16 +248,3 @@ where
}
}
}
-
-#[async_trait]
-impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| GcRpc::Error(format!("{}", e)))
- }
-}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index b41c5360..ae6851fb 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -2,7 +2,6 @@ use std::sync::Arc;
use garage_rpc::ring::*;
use garage_rpc::system::System;
-use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -20,19 +19,19 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
- fn read_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
+ fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
+ fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.config
.members
.keys()
- .map(|id| NodeID::from_slice(id.as_slice()).unwrap())
+ .cloned()
.collect::<Vec<_>>()
}
fn write_quorum(&self) -> usize {
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 7fdfce67..3740d947 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,5 +1,4 @@
use garage_rpc::ring::*;
-use garage_rpc::NodeID;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
@@ -8,12 +7,12 @@ pub trait TableReplication: Send + Sync {
// To understand various replication methods
/// Which nodes to send read requests to
- fn read_nodes(&self, hash: &Hash) -> Vec<NodeID>;
+ fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<NodeID>;
+ fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index ffe686a5..75043a17 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -2,7 +2,6 @@ use std::sync::Arc;
use garage_rpc::ring::*;
use garage_rpc::system::System;
-use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -26,7 +25,7 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
- fn read_nodes(&self, hash: &Hash) -> Vec<NodeID> {
+ fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
@@ -34,7 +33,7 @@ impl TableReplication for TableShardedReplication {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<NodeID> {
+ fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index c5db0987..4fcdc528 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -45,11 +45,10 @@ pub(crate) enum SyncRpc {
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
- Error(String),
}
-impl Message for SyncRpc {
- type Response = SyncRpc;
+impl Rpc for SyncRpc {
+ type Response = Result<SyncRpc, Error>;
}
struct SyncTodo {
@@ -305,7 +304,7 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[NodeID],
+ nodes: &[Uuid],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
@@ -354,7 +353,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: NodeID,
+ who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -480,7 +479,7 @@ where
Ok(())
}
- async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -513,9 +512,17 @@ where
)))
}
}
+}
+
+// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@@ -535,19 +542,6 @@ where
}
}
-#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e)))
- }
-}
-
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
diff --git a/src/table/table.rs b/src/table/table.rs
index ad263343..e1357471 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -34,7 +34,6 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
Ok,
- Error(String),
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
@@ -45,8 +44,8 @@ pub(crate) enum TableRpc<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-impl<F: TableSchema> Message for TableRpc<F> {
- type Response = TableRpc<F>;
+impl<F: TableSchema> Rpc for TableRpc<F> {
+ type Response = Result<TableRpc<F>, Error>;
}
impl<F, R> Table<F, R>
@@ -277,7 +276,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
- async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> {
+ async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.system
.rpc
@@ -292,10 +291,19 @@ where
.await?;
Ok(())
}
+}
- // ====== RPC HANDLER =====
- //
- async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(
+ self: &Arc<Self>,
+ msg: &TableRpc<F>,
+ _from: NodeID,
+ ) -> Result<TableRpc<F>, Error> {
match msg {
TableRpc::ReadEntry(key, sort_key) => {
let value = self.data.read_entry(key, sort_key)?;
@@ -313,16 +321,3 @@ where
}
}
}
-
-#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> {
- self.handle_rpc(msg)
- .await
- .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e)))
- }
-}
diff --git a/src/util/config.rs b/src/util/config.rs
index ee153dfa..08ece5b7 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -7,6 +7,7 @@ use serde::de::Error as SerdeError;
use serde::{de, Deserialize};
use netapp::NodeID;
+use netapp::util::parse_and_resolve_peer_addr;
use crate::error::Error;
@@ -34,11 +35,13 @@ pub struct Config {
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
+ /// Public IP address of this node
+ pub rpc_public_addr: Option<SocketAddr>,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<(NodeID, SocketAddr)>,
- /// Consule host to connect to to discover more peers
+ /// Consul host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
@@ -111,26 +114,13 @@ fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<(NodeID, SocketAd
where
D: de::Deserializer<'de>,
{
- use std::net::ToSocketAddrs;
-
let mut ret = vec![];
for peer in <Vec<&str>>::deserialize(deserializer)? {
- let delim = peer
- .find('@')
- .ok_or_else(|| D::Error::custom("Invalid bootstrap peer: public key not specified"))?;
- let (key, host) = peer.split_at(delim);
- let pubkey = NodeID::from_slice(&hex::decode(&key).map_err(D::Error::custom)?)
- .ok_or_else(|| D::Error::custom("Invalid bootstrap peer public key"))?;
- let hosts = host[1..]
- .to_socket_addrs()
- .map_err(D::Error::custom)?
- .collect::<Vec<_>>();
- if hosts.is_empty() {
- return Err(D::Error::custom(format!("Error resolving {}", &host[1..])));
- }
- for host in hosts {
- ret.push((pubkey.clone(), host));
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(peer)
+ .ok_or_else(|| D::Error::custom(format!("Unable to parse or resolve peer: {}", peer)))?;
+ for ip in addrs {
+ ret.push((pubkey.clone(), ip));
}
}
diff --git a/src/util/data.rs b/src/util/data.rs
index 6df51cd0..d4fe0009 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -87,6 +87,18 @@ impl FixedBytes32 {
}
}
+impl From<netapp::NodeID> for FixedBytes32 {
+ fn from(node_id: netapp::NodeID) -> FixedBytes32 {
+ FixedBytes32::try_from(node_id.as_ref()).unwrap()
+ }
+}
+
+impl Into<netapp::NodeID> for FixedBytes32 {
+ fn into(self) -> netapp::NodeID {
+ netapp::NodeID::from_slice(self.as_slice()).unwrap()
+ }
+}
+
/// A 32 bytes UUID
pub type Uuid = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
diff --git a/src/util/error.rs b/src/util/error.rs
index 804a0d4d..390327f1 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -1,34 +1,12 @@
//! Module containing error types used in Garage
-use err_derive::Error;
-use hyper::StatusCode;
+use std::fmt;
use std::io;
-use crate::data::*;
-
-/// RPC related errors
-#[derive(Debug, Error)]
-pub enum RpcError {
- #[error(display = "Node is down: {:?}.", _0)]
- NodeDown(Uuid),
-
- #[error(display = "Timeout")]
- Timeout,
-
- #[error(display = "HTTP error: {}", _0)]
- Http(#[error(source)] http::Error),
-
- #[error(display = "Hyper error: {}", _0)]
- Hyper(#[error(source)] hyper::Error),
+use err_derive::Error;
- #[error(display = "Messagepack encode error: {}", _0)]
- RmpEncode(#[error(source)] rmp_serde::encode::Error),
+use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
- #[error(display = "Messagepack decode error: {}", _0)]
- RmpDecode(#[error(source)] rmp_serde::decode::Error),
-
- #[error(display = "Too many errors: {:?}", _0)]
- TooManyErrors(Vec<String>),
-}
+use crate::data::*;
/// Regroup all Garage errors
#[derive(Debug, Error)]
@@ -63,11 +41,14 @@ pub enum Error {
#[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError),
- #[error(display = "RPC call error: {}", _0)]
- Rpc(#[error(source)] RpcError),
+ #[error(display = "Remote error: {}", _0)]
+ RemoteError(String),
+
+ #[error(display = "Timeout")]
+ Timeout,
- #[error(display = "Remote error: {} (status code {})", _0, _1)]
- RemoteError(String, StatusCode),
+ #[error(display = "Too many errors: {:?}", _0)]
+ TooManyErrors(Vec<String>),
#[error(display = "Bad RPC: {}", _0)]
BadRpc(String),
@@ -99,3 +80,44 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
Error::Message("MPSC send error".to_string())
}
}
+
+// Custom serialization for our error type, for use in RPC.
+// Errors are serialized as a string of their Display representation.
+// Upon deserialization, they all become a RemoteError with the
+// given representation.
+
+impl Serialize for Error {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&format!("{}", self))
+ }
+}
+
+impl<'de> Deserialize<'de> for Error {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ deserializer.deserialize_string(ErrorVisitor)
+ }
+}
+
+struct ErrorVisitor;
+
+impl<'de> Visitor<'de> for ErrorVisitor {
+ type Value = Error;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ write!(formatter, "a string that represents an error value")
+ }
+
+ fn visit_str<E>(self, error_msg: &str) -> Result<Self::Value, E> {
+ Ok(Error::RemoteError(error_msg.to_string()))
+ }
+
+ fn visit_string<E>(self, error_msg: String) -> Result<Self::Value, E> {
+ Ok(Error::RemoteError(error_msg))
+ }
+}
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 99b4481b..f5b40370 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -26,4 +26,4 @@ percent-encoding = "2.1.0"
futures = "0.3"
http = "0.2"
-hyper = "0.14"
+hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
diff --git a/src/web/error.rs b/src/web/error.rs
index 08717ce1..5ac27914 100644
--- a/src/web/error.rs
+++ b/src/web/error.rs
@@ -38,7 +38,9 @@ impl Error {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
Error::ApiError(e) => e.http_status_code(),
- Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE,
+ Error::InternalError(
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_),
+ ) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,
}