aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs277
1 files changed, 204 insertions, 73 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 7ccec945..b95cff58 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,8 +1,9 @@
//! Module containing structs related to membership management
+use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
use std::time::Duration;
use arc_swap::ArcSwap;
@@ -14,21 +15,24 @@ use sodiumoxide::crypto::sign::ed25519;
use tokio::sync::watch;
use tokio::sync::Mutex;
-use netapp::endpoint::{Endpoint, EndpointHandler, Message};
+use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::proto::*;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
+use netapp::util::parse_and_resolve_peer_addr;
use garage_util::background::BackgroundRunner;
+use garage_util::data::Uuid;
use garage_util::error::Error;
use garage_util::persister::Persister;
-//use garage_util::time::*;
+use garage_util::time::*;
-//use crate::consul::get_consul_nodes;
+use crate::consul::*;
use crate::ring::*;
-use crate::rpc_helper::{RequestStrategy, RpcHelper};
+use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
+const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// RPC endpoint used for calls related to membership
@@ -39,33 +43,35 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
pub enum SystemRpc {
/// Response to successfull advertisements
Ok,
- /// Error response
- Error(String),
+ /// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
+ Connect(String),
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
- AdvertiseStatus(StateInfo),
+ AdvertiseStatus(NodeStatus),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
- ReturnKnownNodes(Vec<(NodeID, SocketAddr, bool)>),
+ ReturnKnownNodes(Vec<KnownNodeInfo>),
}
-impl Message for SystemRpc {
- type Response = SystemRpc;
+impl Rpc for SystemRpc {
+ type Response = Result<SystemRpc, Error>;
}
/// This node's membership manager
pub struct System {
/// The id of this node
- pub id: NodeID,
+ pub id: Uuid,
persist_config: Persister<NetworkConfig>,
+ persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
- state_info: ArcSwap<StateInfo>,
+ local_status: ArcSwap<NodeStatus>,
+ node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
pub netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>,
@@ -74,6 +80,7 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
+ rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
@@ -88,7 +95,7 @@ pub struct System {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct StateInfo {
+pub struct NodeStatus {
/// Hostname of the node
pub hostname: String,
/// Replication factor configured on the node
@@ -97,26 +104,34 @@ pub struct StateInfo {
pub config_version: u64,
}
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct KnownNodeInfo {
+ pub id: Uuid,
+ pub addr: SocketAddr,
+ pub is_up: bool,
+ pub status: NodeStatus,
+}
+
fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
- let mut id_file = metadata_dir.to_path_buf();
- id_file.push("node_id");
- if id_file.as_path().exists() {
- let mut f = std::fs::File::open(id_file.as_path())?;
+ let mut key_file = metadata_dir.to_path_buf();
+ key_file.push("node_key");
+ if key_file.as_path().exists() {
+ let mut f = std::fs::File::open(key_file.as_path())?;
let mut d = vec![];
f.read_to_end(&mut d)?;
if d.len() != 64 {
- return Err(Error::Message("Corrupt node_id file".to_string()));
+ return Err(Error::Message("Corrupt node_key file".to_string()));
}
let mut key = [0u8; 64];
key.copy_from_slice(&d[..]);
Ok(NodeKey::from_slice(&key[..]).unwrap())
} else {
- let (key, _) = ed25519::gen_keypair();
+ let (_, key) = ed25519::gen_keypair();
- let mut f = std::fs::File::create(id_file.as_path())?;
+ let mut f = std::fs::File::create(key_file.as_path())?;
f.write_all(&key[..])?;
- Ok(NodeKey::from_slice(&key[..]).unwrap())
+ Ok(key)
}
}
@@ -128,6 +143,7 @@ impl System {
background: Arc<BackgroundRunner>,
replication_factor: usize,
rpc_listen_addr: SocketAddr,
+ rpc_public_address: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
@@ -136,29 +152,20 @@ impl System {
info!("Node public key: {}", hex::encode(&node_key.public_key()));
let persist_config = Persister::new(&metadata_dir, "network_config");
+ let persist_peer_list = Persister::new(&metadata_dir, "peer_list");
let net_config = match persist_config.load() {
Ok(x) => x,
Err(e) => {
- match Persister::<garage_rpc_021::ring::NetworkConfig>::new(
- &metadata_dir,
- "network_config",
- )
- .load()
- {
- Ok(old_config) => NetworkConfig::migrate_from_021(old_config),
- Err(e2) => {
- info!(
- "No valid previous network configuration stored ({}, {}), starting fresh.",
- e, e2
- );
- NetworkConfig::new()
- }
- }
+ info!(
+ "No valid previous network configuration stored ({}), starting fresh.",
+ e
+ );
+ NetworkConfig::new()
}
};
- let state_info = StateInfo {
+ let local_status = NodeStatus {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
@@ -169,15 +176,27 @@ impl System {
let ring = Ring::new(net_config, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
+ if let Some(addr) = rpc_public_address {
+ println!("{}@{}", hex::encode(&node_key.public_key()), addr);
+ } else {
+ println!("{}", hex::encode(&node_key.public_key()));
+ }
+
let netapp = NetApp::new(network_key, node_key);
- let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers.clone());
+ let fullmesh = FullMeshPeeringStrategy::new(
+ netapp.clone(),
+ bootstrap_peers.clone(),
+ rpc_public_address,
+ );
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
let sys = Arc::new(System {
- id: netapp.id.clone(),
+ id: netapp.id.into(),
persist_config,
- state_info: ArcSwap::new(Arc::new(state_info)),
+ persist_peer_list,
+ local_status: ArcSwap::new(Arc::new(local_status)),
+ node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
rpc: RpcHelper {
@@ -187,6 +206,7 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr,
+ rpc_public_addr: rpc_public_address,
bootstrap_peers,
consul_host,
consul_service_name,
@@ -206,11 +226,38 @@ impl System {
.listen(self.rpc_listen_addr, None, must_exit.clone()),
self.fullmesh.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()),
+ self.status_exchange_loop(must_exit.clone()),
);
}
// ---- INTERNALS ----
+ async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ let (consul_host, consul_service_name) =
+ match (&self.consul_host, &self.consul_service_name) {
+ (Some(ch), Some(csn)) => (ch, csn),
+ _ => return Ok(()),
+ };
+
+ let rpc_public_addr = match self.rpc_public_addr {
+ Some(addr) => addr,
+ None => {
+ warn!("Not advertising to Consul because rpc_public_addr is not defined in config file.");
+ return Ok(());
+ }
+ };
+
+ publish_consul_service(
+ consul_host,
+ consul_service_name,
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e)))
+ }
+
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
@@ -221,12 +268,27 @@ impl System {
Ok(())
}
- fn update_state_info(&self) {
- let mut new_si: StateInfo = self.state_info.load().as_ref().clone();
+ fn update_local_status(&self) {
+ let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let ring = self.ring.borrow();
new_si.config_version = ring.config.version;
- self.state_info.swap(Arc::new(new_si));
+ self.local_status.swap(Arc::new(new_si));
+ }
+
+ async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(node)
+ .ok_or_else(|| Error::Message(format!("Unable to parse or resolve node specification: {}", node)))?;
+ let mut errors = vec![];
+ for ip in addrs.iter() {
+ match self.netapp.clone().try_connect(*ip, pubkey).await {
+ Ok(()) => return Ok(SystemRpc::Ok),
+ Err(e) => {
+ errors.push((*ip, e));
+ }
+ }
+ }
+ return Err(Error::Message(format!("Could not connect to specified peers. Errors: {:?}", errors)));
}
fn handle_pull_config(&self) -> SystemRpc {
@@ -234,6 +296,58 @@ impl System {
SystemRpc::AdvertiseConfig(ring.config.clone())
}
+ fn handle_get_known_nodes(&self) -> SystemRpc {
+ let node_status = self.node_status.read().unwrap();
+ let known_nodes =
+ self.fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| KnownNodeInfo {
+ id: n.id.into(),
+ addr: n.addr,
+ is_up: n.is_up(),
+ status: node_status.get(&n.id.into()).cloned().map(|(_, st)| st).unwrap_or(
+ NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ config_version: 0,
+ },
+ ),
+ })
+ .collect::<Vec<_>>();
+ SystemRpc::ReturnKnownNodes(known_nodes)
+ }
+
+ async fn handle_advertise_status(
+ self: &Arc<Self>,
+ from: Uuid,
+ info: &NodeStatus,
+ ) -> Result<SystemRpc, Error> {
+ let local_info = self.local_status.load();
+
+ if local_info.replication_factor < info.replication_factor {
+ error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs",
+ info.replication_factor,
+ local_info.replication_factor);
+ std::process::exit(1);
+ }
+
+ if info.config_version > local_info.config_version {
+ let self2 = self.clone();
+ self.background.spawn_cancellable(async move {
+ self2.pull_config(from).await;
+ Ok(())
+ });
+ }
+
+ self.node_status
+ .write()
+ .unwrap()
+ .insert(from, (now_msec(), info.clone()));
+
+ Ok(SystemRpc::Ok)
+ }
+
async fn handle_advertise_config(
self: Arc<Self>,
adv: &NetworkConfig,
@@ -265,13 +379,32 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) {
- /* TODO
+ async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
+ while !*stop_signal.borrow() {
+ let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL);
+
+ self.update_local_status();
+ let local_status: NodeStatus = self.local_status.load().as_ref().clone();
+ self.rpc
+ .broadcast(
+ &self.system_endpoint,
+ SystemRpc::AdvertiseStatus(local_status),
+ RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
+ )
+ .await;
+
+ select! {
+ _ = restart_at.fuse() => {},
+ _ = stop_signal.changed().fuse() => {},
+ }
+ }
+ }
+
+ async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
let consul_config = match (&self.consul_host, &self.consul_service_name) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
};
- */
while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.is_empty();
@@ -286,34 +419,42 @@ impl System {
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
- let ping_list = self.bootstrap_peers.clone();
+ let mut ping_list = self.bootstrap_peers.clone();
- /*
- *TODO bring this back: persisted list of peers
- if let Ok(peers) = self.persist_status.load_async().await {
- ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
+ // Add peer list from list stored on disk
+ if let Ok(peers) = self.persist_peer_list.load_async().await {
+ ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
- */
- /*
- * TODO bring this back: get peers from consul
+ // Fetch peer list from Consul
if let Some((consul_host, consul_service_name)) = &consul_config {
match get_consul_nodes(consul_host, consul_service_name).await {
Ok(node_list) => {
- ping_list.extend(node_list.iter().map(|a| (*a, None)));
+ ping_list.extend(node_list);
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
}
}
}
- */
for (node_id, node_addr) in ping_list {
tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id));
}
}
+ let peer_list = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| (n.id.into(), n.addr))
+ .collect::<Vec<_>>();
+ if let Err(e) = self.persist_peer_list.save_async(&peer_list).await {
+ warn!("Could not save peer list to file: {}", e);
+ }
+
+ self.background.spawn(self.clone().advertise_to_consul());
+
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => {},
@@ -322,7 +463,7 @@ impl System {
}
}
- async fn pull_config(self: Arc<Self>, peer: NodeID) {
+ async fn pull_config(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc
.call(
@@ -340,24 +481,14 @@ impl System {
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
- async fn handle(self: &Arc<Self>, msg: &SystemRpc, _from: NodeID) -> SystemRpc {
- let resp = match msg {
+ async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
+ match msg {
+ SystemRpc::Connect(node) => self.handle_connect(node).await,
SystemRpc::PullConfig => Ok(self.handle_pull_config()),
+ SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(&adv).await,
- SystemRpc::GetKnownNodes => {
- let known_nodes = self
- .fullmesh
- .get_peer_list()
- .iter()
- .map(|n| (n.id, n.addr, n.is_up()))
- .collect::<Vec<_>>();
- Ok(SystemRpc::ReturnKnownNodes(known_nodes))
- }
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
- };
- match resp {
- Ok(r) => r,
- Err(e) => SystemRpc::Error(format!("{}", e)),
}
}
}