aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/rpc_helper.rs4
-rw-r--r--src/rpc/system.rs165
2 files changed, 113 insertions, 56 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 9f735ab4..8c7cc681 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -14,8 +14,8 @@ pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
-use garage_util::error::Error;
use garage_util::data::Uuid;
+use garage_util::error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -203,7 +203,7 @@ impl RpcHelper {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::TooManyErrors(errors))
+ Err(Error::Quorum(quorum, results.len(), to.len(), errors))
}
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index b95cff58..f5ce817f 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -2,9 +2,9 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
-use std::path::{Path, PathBuf};
+use std::path::Path;
use std::sync::{Arc, RwLock};
-use std::time::Duration;
+use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
use async_trait::async_trait;
@@ -18,12 +18,13 @@ use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::proto::*;
-use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use netapp::util::parse_and_resolve_peer_addr;
+use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
+use garage_util::config::Config;
use garage_util::data::Uuid;
-use garage_util::error::Error;
+use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
@@ -38,6 +39,8 @@ const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
+
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@@ -109,10 +112,27 @@ pub struct KnownNodeInfo {
pub id: Uuid,
pub addr: SocketAddr,
pub is_up: bool,
+ pub last_seen_secs_ago: Option<u64>,
pub status: NodeStatus,
}
-fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
+pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
+ let mut pubkey_file = metadata_dir.to_path_buf();
+ pubkey_file.push("node_key.pub");
+
+ let mut f = std::fs::File::open(pubkey_file.as_path())?;
+ let mut d = vec![];
+ f.read_to_end(&mut d)?;
+ if d.len() != 32 {
+ return Err(Error::Message("Corrupt node_key.pub file".to_string()));
+ }
+
+ let mut key = [0u8; 32];
+ key.copy_from_slice(&d[..]);
+ Ok(NodeID::from_slice(&key[..]).unwrap())
+}
+
+pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
let mut key_file = metadata_dir.to_path_buf();
key_file.push("node_key");
if key_file.as_path().exists() {
@@ -127,10 +147,30 @@ fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
key.copy_from_slice(&d[..]);
Ok(NodeKey::from_slice(&key[..]).unwrap())
} else {
- let (_, key) = ed25519::gen_keypair();
+ if !metadata_dir.exists() {
+ info!("Metadata directory does not exist, creating it.");
+ std::fs::create_dir(&metadata_dir)?;
+ }
+
+ info!("Generating new node key pair.");
+ let (pubkey, key) = ed25519::gen_keypair();
+
+ {
+ use std::os::unix::fs::PermissionsExt;
+ let mut f = std::fs::File::create(key_file.as_path())?;
+ let mut perm = f.metadata()?.permissions();
+ perm.set_mode(0o600);
+ std::fs::set_permissions(key_file.as_path(), perm)?;
+ f.write_all(&key[..])?;
+ }
+
+ {
+ let mut pubkey_file = metadata_dir.to_path_buf();
+ pubkey_file.push("node_key.pub");
+ let mut f2 = std::fs::File::create(pubkey_file.as_path())?;
+ f2.write_all(&pubkey[..])?;
+ }
- let mut f = std::fs::File::create(key_file.as_path())?;
- f.write_all(&key[..])?;
Ok(key)
}
}
@@ -139,20 +179,16 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- metadata_dir: PathBuf,
background: Arc<BackgroundRunner>,
replication_factor: usize,
- rpc_listen_addr: SocketAddr,
- rpc_public_address: Option<SocketAddr>,
- bootstrap_peers: Vec<(NodeID, SocketAddr)>,
- consul_host: Option<String>,
- consul_service_name: Option<String>,
+ config: &Config,
) -> Arc<Self> {
- let node_key = gen_node_key(&metadata_dir).expect("Unable to read or generate node ID");
+ let node_key =
+ gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!("Node public key: {}", hex::encode(&node_key.public_key()));
- let persist_config = Persister::new(&metadata_dir, "network_config");
- let persist_peer_list = Persister::new(&metadata_dir, "peer_list");
+ let persist_config = Persister::new(&config.metadata_dir, "network_config");
+ let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let net_config = match persist_config.load() {
Ok(x) => x,
@@ -169,14 +205,14 @@ impl System {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor: replication_factor,
+ replication_factor,
config_version: net_config.version,
};
let ring = Ring::new(net_config, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
- if let Some(addr) = rpc_public_address {
+ if let Some(addr) = config.rpc_public_addr {
println!("{}@{}", hex::encode(&node_key.public_key()), addr);
} else {
println!("{}", hex::encode(&node_key.public_key()));
@@ -185,8 +221,8 @@ impl System {
let netapp = NetApp::new(network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(),
- bootstrap_peers.clone(),
- rpc_public_address,
+ config.bootstrap_peers.clone(),
+ config.rpc_public_addr,
);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@@ -200,19 +236,19 @@ impl System {
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
rpc: RpcHelper {
- fullmesh: fullmesh.clone(),
+ fullmesh,
background: background.clone(),
},
system_endpoint,
replication_factor,
- rpc_listen_addr,
- rpc_public_addr: rpc_public_address,
- bootstrap_peers,
- consul_host,
- consul_service_name,
+ rpc_listen_addr: config.rpc_bind_addr,
+ rpc_public_addr: config.rpc_public_addr,
+ bootstrap_peers: config.bootstrap_peers.clone(),
+ consul_host: config.consul_host.clone(),
+ consul_service_name: config.consul_service_name.clone(),
ring,
update_ring: Mutex::new(update_ring),
- background: background.clone(),
+ background,
});
sys.system_endpoint.set_handler(sys.clone());
sys
@@ -255,7 +291,7 @@ impl System {
rpc_public_addr,
)
.await
- .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e)))
+ .err_context("Error while publishing Consul service")
}
/// Save network configuration to disc
@@ -277,18 +313,31 @@ impl System {
}
async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(node)
- .ok_or_else(|| Error::Message(format!("Unable to parse or resolve node specification: {}", node)))?;
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
+ Error::Message(format!(
+ "Unable to parse or resolve node specification: {}",
+ node
+ ))
+ })?;
let mut errors = vec![];
for ip in addrs.iter() {
- match self.netapp.clone().try_connect(*ip, pubkey).await {
+ match self
+ .netapp
+ .clone()
+ .try_connect(*ip, pubkey)
+ .await
+ .err_context(CONNECT_ERROR_MESSAGE)
+ {
Ok(()) => return Ok(SystemRpc::Ok),
Err(e) => {
errors.push((*ip, e));
}
}
}
- return Err(Error::Message(format!("Could not connect to specified peers. Errors: {:?}", errors)));
+ return Err(Error::Message(format!(
+ "Could not connect to specified peers. Errors: {:?}",
+ errors
+ )));
}
fn handle_pull_config(&self) -> SystemRpc {
@@ -298,23 +347,26 @@ impl System {
fn handle_get_known_nodes(&self) -> SystemRpc {
let node_status = self.node_status.read().unwrap();
- let known_nodes =
- self.fullmesh
- .get_peer_list()
- .iter()
- .map(|n| KnownNodeInfo {
- id: n.id.into(),
- addr: n.addr,
- is_up: n.is_up(),
- status: node_status.get(&n.id.into()).cloned().map(|(_, st)| st).unwrap_or(
- NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- config_version: 0,
- },
- ),
- })
- .collect::<Vec<_>>();
+ let known_nodes = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| KnownNodeInfo {
+ id: n.id.into(),
+ addr: n.addr,
+ is_up: n.is_up(),
+ last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
+ status: node_status
+ .get(&n.id.into())
+ .cloned()
+ .map(|(_, st)| st)
+ .unwrap_or(NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ config_version: 0,
+ }),
+ })
+ .collect::<Vec<_>>();
SystemRpc::ReturnKnownNodes(known_nodes)
}
@@ -361,14 +413,14 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- let adv2 = adv.clone();
+ let adv = adv.clone();
self.background.spawn_cancellable(async move {
self2
.rpc
.broadcast(
&self2.system_endpoint,
- SystemRpc::AdvertiseConfig(adv2),
- RequestStrategy::with_priority(PRIO_NORMAL),
+ SystemRpc::AdvertiseConfig(adv),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
Ok(())
@@ -439,7 +491,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id));
+ tokio::spawn(
+ self.netapp
+ .clone()
+ .try_connect(node_addr, node_id)
+ .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
+ );
}
}