aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs70
1 files changed, 32 insertions, 38 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6f14fd..8f753b7f 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -50,8 +49,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
-pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
-
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@@ -110,9 +107,6 @@ pub struct System {
pub ring: watch::Receiver<Arc<Ring>>,
update_ring: Mutex<watch::Sender<Arc<Ring>>>,
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
}
@@ -232,7 +226,6 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- background: Arc<BackgroundRunner>,
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
@@ -354,7 +347,6 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
@@ -372,7 +364,6 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
- background,
metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
@@ -444,17 +435,14 @@ impl System {
))
})?;
let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
+ for addr in addrs.iter() {
+ match self.netapp.clone().try_connect(*addr, pubkey).await {
Ok(()) => return Ok(()),
Err(e) => {
- errors.push((*ip, e));
+ errors.push((
+ *addr,
+ Error::Message(connect_error_message(*addr, pubkey, e)),
+ ));
}
}
}
@@ -578,7 +566,7 @@ impl System {
}
/// Save network configuration to disc
- async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
self.persist_cluster_layout
.save_async(&ring.layout)
@@ -630,11 +618,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2.pull_cluster_layout(from).await;
- Ok(())
- });
+ tokio::spawn(self.clone().pull_cluster_layout(from));
}
self.node_status
@@ -676,18 +660,21 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2
+ tokio::spawn(async move {
+ if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await?;
- Ok(())
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
});
- self.background.spawn(self.clone().save_cluster_layout());
+
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
@@ -773,12 +760,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(
- self.netapp
- .clone()
- .try_connect(node_addr, node_id)
- .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
- );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
+ error!("{}", connect_error_message(node_addr, node_id, e));
+ }
+ });
}
}
@@ -787,11 +774,10 @@ impl System {
}
#[cfg(feature = "consul-discovery")]
- self.background.spawn(self.clone().advertise_to_consul());
+ background::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
- self.background
- .spawn(self.clone().advertise_to_kubernetes());
+ background::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
@@ -881,3 +867,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
+
+fn connect_error_message(
+ addr: SocketAddr,
+ pubkey: ed25519::PublicKey,
+ e: netapp::error::Error,
+) -> String {
+ format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e)
+}