aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml18
-rw-r--r--src/garage/admin_rpc.rs80
-rw-r--r--src/garage/cli.rs168
-rw-r--r--src/garage/main.rs71
-rw-r--r--src/garage/server.rs87
5 files changed, 206 insertions, 218 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 09ed3e1e..3023cb79 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.3.0"
+version = "0.4.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,12 +14,12 @@ path = "main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.3.0", path = "../api" }
-garage_model = { version = "0.3.0", path = "../model" }
-garage_rpc = { version = "0.3.0", path = "../rpc" }
-garage_table = { version = "0.3.0", path = "../table" }
-garage_util = { version = "0.3.0", path = "../util" }
-garage_web = { version = "0.3.0", path = "../web" }
+garage_api = { version = "0.4.0", path = "../api" }
+garage_model = { version = "0.4.0", path = "../model" }
+garage_rpc = { version = "0.4.0", path = "../rpc" }
+garage_table = { version = "0.4.0", path = "../table" }
+garage_util = { version = "0.4.0", path = "../util" }
+garage_web = { version = "0.4.0", path = "../web" }
bytes = "1.0"
git-version = "0.3.4"
@@ -27,6 +27,8 @@ hex = "0.4"
log = "0.4"
pretty_env_logger = "0.4"
rand = "0.8"
+async-trait = "0.1.7"
+sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
sled = "0.34"
@@ -38,3 +40,5 @@ toml = "0.5"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+
+netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index fe5a9d88..b9e57c40 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::error::Error;
@@ -10,8 +11,7 @@ use garage_table::crdt::Crdt;
use garage_table::replication::*;
use garage_table::*;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
@@ -19,10 +19,8 @@ use garage_model::key_table::*;
use crate::cli::*;
use crate::repair::Repair;
-use crate::*;
-pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub const ADMIN_RPC_PATH: &str = "_admin";
+pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpc {
@@ -33,41 +31,31 @@ pub enum AdminRpc {
// Replies
Ok(String),
+ Error(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
}
-impl RpcMessage for AdminRpc {}
+impl Message for AdminRpc {
+ type Response = AdminRpc;
+}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
- rpc_client: Arc<RpcClient<AdminRpc>>,
+ endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
- let rpc_client = garage.system.clone().rpc_client::<AdminRpc>(ADMIN_RPC_PATH);
- Arc::new(Self { garage, rpc_client })
- }
-
- pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
- rpc_server.add_handler::<AdminRpc, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| {
- let self2 = self.clone();
- async move {
- match msg {
- AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
- AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
- AdminRpc::Stats(opt) => self2.handle_stats(opt).await,
- _ => Err(Error::BadRpc("Invalid RPC".to_string())),
- }
- }
- });
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self { garage, endpoint });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
- async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRpc, Error> {
+ async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => {
let bucket_names = self
@@ -187,7 +175,7 @@ impl AdminRpcHandler {
}
}
- async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRpc, Error> {
+ async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => {
let key_ids = self
@@ -210,13 +198,13 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key))
}
KeyOperation::New(query) => {
- let key = Key::new(query.name);
+ let key = Key::new(query.name.clone());
self.garage.key_table.insert(&key).await?;
Ok(AdminRpc::KeyInfo(key))
}
KeyOperation::Rename(query) => {
let mut key = self.get_existing_key(&query.key_pattern).await?;
- key.name.update(query.new_name);
+ key.name.update(query.new_name.clone());
self.garage.key_table.insert(&key).await?;
Ok(AdminRpc::KeyInfo(key))
}
@@ -353,17 +341,18 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
+ let node = NodeID::from_slice(node.as_slice()).unwrap();
if self
- .rpc_client
+ .endpoint
.call(
- *node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- ADMIN_RPC_TIMEOUT,
+ &node,
+ &AdminRpc::LaunchRepair(opt_to_send.clone()),
+ PRIO_NORMAL,
)
.await
.is_err()
{
- failures.push(*node);
+ failures.push(node);
}
}
if failures.is_empty() {
@@ -397,14 +386,16 @@ impl AdminRpcHandler {
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
+ let node = NodeID::from_slice(node.as_slice()).unwrap();
+
let mut opt = opt.clone();
opt.all_nodes = false;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
match self
- .rpc_client
- .call(*node, AdminRpc::Stats(opt), ADMIN_RPC_TIMEOUT)
+ .endpoint
+ .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL)
.await
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
@@ -495,4 +486,23 @@ impl AdminRpcHandler {
.unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
}
+
+ async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> {
+ match msg {
+ AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
+ AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ _ => Err(Error::BadRpc("Invalid RPC".to_string())),
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e)))
+ }
}
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index f9e67fac..91ec5ab2 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -1,6 +1,5 @@
-use std::cmp::max;
-use std::collections::HashSet;
-use std::net::SocketAddr;
+//use std::cmp::max;
+//use std::collections::HashSet;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
@@ -8,11 +7,11 @@ use structopt::StructOpt;
use garage_util::data::Uuid;
use garage_util::error::Error;
-use garage_util::time::*;
+//use garage_util::time::*;
-use garage_rpc::membership::*;
use garage_rpc::ring::*;
-use garage_rpc::rpc_client::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
@@ -298,54 +297,65 @@ pub struct StatsOpt {
pub async fn cli_cmd(
cmd: Command,
- membership_rpc_cli: RpcAddrClient<Message>,
- admin_rpc_cli: RpcAddrClient<AdminRpc>,
- rpc_host: SocketAddr,
+ system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
+ admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
+ rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
- Command::Status => cmd_status(membership_rpc_cli, rpc_host).await,
+ Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(membership_rpc_cli, rpc_host, configure_opt).await
+ cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await
+ cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
}
Command::Bucket(bo) => {
- cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::BucketOperation(bo)).await
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
}
- Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::KeyOperation(ko)).await,
- Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::LaunchRepair(ro)).await,
- Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Key(ko) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
+ }
+ Command::Repair(ro) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
+ }
+ Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
_ => unreachable!(),
}
}
-pub async fn cmd_status(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
-) -> Result<(), Error> {
+pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseNodesUp(nodes) => nodes,
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
+ println!("STATUS:");
+ for node in status {
+ println!("{:?}", node);
+ }
+ println!("CONFIG: (v{})", config.version);
+ for (id, node) in config.members {
+ println!("{} {:?}", hex::encode(id.as_slice()), node);
+ }
+
+ /* TODO
let (hostname_len, addr_len, tag_len, zone_len) = status
.iter()
- .map(|adv| (adv, config.members.get(&adv.id)))
- .map(|(adv, cfg)| {
+ .map(|(id, addr, _)| (addr, config.members.get(&adv.id)))
+ .map(|(addr, cfg)| {
(
- adv.state_info.hostname.len(),
- adv.addr.to_string().len(),
+ 8,
+ addr.to_string().len(),
cfg.map(|c| c.tag.len()).unwrap_or(0),
cfg.map(|c| c.zone.len()).unwrap_or(0),
)
@@ -355,13 +365,13 @@ pub async fn cmd_status(
});
println!("Healthy nodes:");
- for adv in status.iter().filter(|x| x.is_up) {
+ for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
"{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}",
- id = adv.id,
- host = adv.state_info.hostname,
- addr = adv.addr,
+ id = id,
+ host = "",
+ addr = addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
@@ -373,36 +383,36 @@ pub async fn cmd_status(
} else {
println!(
"{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED",
- id = adv.id,
- h = adv.state_info.hostname,
- addr = adv.addr,
- h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad = " ".repeat(addr_len - adv.addr.to_string().len()),
+ id = id,
+ h = "",
+ addr = addr,
+ h_pad = " ".repeat(hostname_len - "".len()),
+ a_pad = " ".repeat(addr_len - addr.to_string().len()),
);
}
}
- let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|x| !x.is_up);
+ let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>();
+ let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
- for adv in status.iter().filter(|x| !x.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
+ for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) {
+ if let Some(cfg) = config.members.get(&id) {
println!(
"{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago",
- id=adv.id,
- host=adv.state_info.hostname,
- addr=adv.addr,
+ id=id,
+ host="",
+ addr=addr,
tag=cfg.tag,
zone=cfg.zone,
capacity=cfg.capacity_string(),
- last_seen=(now_msec() - adv.last_seen) / 1000,
- h_pad=" ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad=" ".repeat(addr_len - adv.addr.to_string().len()),
+ last_seen=(now_msec() - 0) / 1000,
+ h_pad=" ".repeat(hostname_len - "".len()),
+ a_pad=" ".repeat(addr_len - addr.to_string().len()),
t_pad=" ".repeat(tag_len - cfg.tag.len()),
z_pad=" ".repeat(zone_len - cfg.zone.len()),
);
@@ -411,12 +421,12 @@ pub async fn cmd_status(
let (tag_len, zone_len) = config
.members
.iter()
- .filter(|(&id, _)| !status.iter().any(|x| x.id == id))
+ .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id))
.map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len()))
.fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz)));
for (id, cfg) in config.members.iter() {
- if !status.iter().any(|x| x.id == *id) {
+ if !status.iter().any(|(xid, _, _)| xid == *id) {
println!(
"{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen",
id = id,
@@ -429,6 +439,7 @@ pub async fn cmd_status(
}
}
}
+ */
Ok(())
}
@@ -455,25 +466,30 @@ pub fn find_matching_node(
}
pub async fn cmd_configure(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseNodesUp(nodes) => nodes,
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?;
+ let added_node = find_matching_node(
+ status
+ .iter()
+ .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()),
+ &args.node_id,
+ )?;
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
@@ -527,25 +543,21 @@ pub async fn cmd_configure(
config.version += 1;
rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
+ .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
+ .await?;
Ok(())
}
pub async fn cmd_remove(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
@@ -562,21 +574,17 @@ pub async fn cmd_remove(
config.version += 1;
rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
+ .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
+ .await?;
Ok(())
}
pub async fn cmd_admin(
- rpc_cli: RpcAddrClient<AdminRpc>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<AdminRpc, ()>,
+ rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
+ match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 66828cba..7fe791b8 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -10,16 +10,16 @@ mod repair;
mod server;
use std::net::SocketAddr;
-use std::sync::Arc;
-use std::time::Duration;
use structopt::StructOpt;
-use garage_util::config::TlsConfig;
+use netapp::util::parse_peer_addr;
+use netapp::NetworkKey;
+
use garage_util::error::Error;
-use garage_rpc::membership::*;
-use garage_rpc::rpc_client::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
use admin_rpc::*;
use cli::*;
@@ -27,16 +27,14 @@ use cli::*;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
struct Opt {
- /// RPC connect to this host to execute client operations
- #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901", parse(try_from_str = parse_address))]
- pub rpc_host: SocketAddr,
+ /// Host to connect to for admin operations, in the format:
+ /// <public-key>@<ip>:<port>
+ #[structopt(short = "h", long = "rpc-host")]
+ pub rpc_host: Option<String>,
- #[structopt(long = "ca-cert")]
- pub ca_cert: Option<String>,
- #[structopt(long = "client-cert")]
- pub client_cert: Option<String>,
- #[structopt(long = "client-key")]
- pub client_key: Option<String>,
+ /// RPC secret network key for admin operations
+ #[structopt(short = "s", long = "rpc-secret")]
+ pub rpc_secret: Option<String>,
#[structopt(subcommand)]
cmd: Command,
@@ -66,33 +64,20 @@ async fn main() {
}
async fn cli_command(opt: Opt) -> Result<(), Error> {
- let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) {
- (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig {
- ca_cert,
- node_cert: client_cert,
- node_key: client_key,
- }),
- (None, None, None) => None,
- _ => {
- warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS.");
- None
- }
- };
-
- let rpc_http_cli =
- Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client"));
- let membership_rpc_cli =
- RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string());
- let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string());
-
- cli_cmd(opt.cmd, membership_rpc_cli, admin_rpc_cli, opt.rpc_host).await
-}
-
-fn parse_address(address: &str) -> Result<SocketAddr, String> {
- use std::net::ToSocketAddrs;
- address
- .to_socket_addrs()
- .map_err(|_| format!("Could not resolve {}", address))?
- .next()
- .ok_or_else(|| format!("Could not resolve {}", address))
+ let net_key_hex_str = &opt.rpc_secret.expect("No RPC secret provided");
+ let network_key = NetworkKey::from_slice(
+ &hex::decode(net_key_hex_str).expect("Invalid RPC secret key (bad hex)")[..],
+ )
+ .expect("Invalid RPC secret provided (wrong length)");
+ let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
+
+ let netapp = NetApp::new(network_key, sk);
+ let (id, addr) =
+ parse_peer_addr(&opt.rpc_host.expect("No RPC host provided")).expect("Invalid RPC host");
+ netapp.clone().try_connect(addr, id).await?;
+
+ let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
+ let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
+
+ cli_cmd(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 36f7de5c..0edf3e2d 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -1,7 +1,5 @@
use std::path::PathBuf;
-use std::sync::Arc;
-use futures_util::future::*;
use tokio::sync::watch;
use garage_util::background::*;
@@ -10,21 +8,10 @@ use garage_util::error::Error;
use garage_api::run_api_server;
use garage_model::garage::Garage;
-use garage_rpc::rpc_server::RpcServer;
use garage_web::run_web_server;
use crate::admin_rpc::*;
-async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> {
- // Wait for the CTRL+C signal
- tokio::signal::ctrl_c()
- .await
- .expect("failed to install CTRL+C signal handler");
- info!("Received CTRL+C, shutting down.");
- send_cancel.send(true)?;
- Ok(())
-}
-
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
if chan.changed().await.is_err() {
@@ -47,52 +34,46 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
- info!("Initialize RPC server...");
- let mut rpc_server = RpcServer::new(config.rpc_bind_addr, config.rpc_tls.clone());
-
info!("Initializing background runner...");
- let (send_cancel, watch_cancel) = watch::channel(false);
+ let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
- let bootstrap = garage.system.clone().bootstrap(
- config.bootstrap_peers,
- config.consul_host,
- config.consul_service_name,
- );
+ let garage = Garage::new(config.clone(), db, background);
+
+ let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Crate admin RPC handler...");
- AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
-
- info!("Initializing RPC and API servers...");
- let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
- let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
- let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
-
- futures::try_join!(
- bootstrap.map(|()| {
- info!("Bootstrap done");
- Ok(())
- }),
- run_rpc_server.map(|rv| {
- info!("RPC server exited");
- rv
- }),
- api_server.map(|rv| {
- info!("API server exited");
- rv
- }),
- web_server.map(|rv| {
- info!("Web server exited");
- rv
- }),
- await_background_done.map(|rv| {
- info!("Background runner exited: {:?}", rv);
- Ok(())
- }),
- shutdown_signal(send_cancel),
- )?;
+ AdminRpcHandler::new(garage.clone());
+
+ info!("Initializing API server...");
+ let api_server = tokio::spawn(run_api_server(
+ garage.clone(),
+ wait_from(watch_cancel.clone()),
+ ));
+
+ info!("Initializing web server...");
+ let web_server = tokio::spawn(run_web_server(
+ garage.clone(),
+ wait_from(watch_cancel.clone()),
+ ));
+
+ // Stuff runs
+
+ // When a cancel signal is sent, stuff stops
+ if let Err(e) = api_server.await? {
+ warn!("API server exited with error: {}", e);
+ }
+ if let Err(e) = web_server.await? {
+ warn!("Web server exited with error: {}", e);
+ }
+
+ // Remove RPC handlers for system to break reference cycles
+ garage.system.netapp.drop_all_handlers();
+
+ // Await for last parts to end
+ run_system.await?;
+ await_background_done.await?;
info!("Cleaning up...");