From 5e986c443f014d96bd0c27113fcf4571f2e1e881 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Oct 2021 00:29:50 +0200 Subject: Discovery via consul --- src/rpc/system.rs | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'src/rpc/system.rs') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68edabdf..3a81bc3b 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -28,7 +28,7 @@ use garage_util::error::Error; use garage_util::persister::Persister; use garage_util::time::*; -use crate::consul::get_consul_nodes; +use crate::consul::*; use crate::ring::*; use crate::rpc_helper::*; @@ -80,6 +80,7 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, + rpc_public_addr: Option, bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option, consul_service_name: Option, @@ -199,6 +200,7 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, + rpc_public_addr: config.rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), consul_host: config.consul_host.clone(), consul_service_name: config.consul_service_name.clone(), @@ -224,6 +226,32 @@ impl System { // ---- INTERNALS ---- + async fn advertise_to_consul(self: Arc) -> Result<(), Error> { + let (consul_host, consul_service_name) = + match (&self.consul_host, &self.consul_service_name) { + (Some(ch), Some(csn)) => (ch, csn), + _ => return Ok(()), + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); + return Ok(()); + } + }; + + publish_consul_service( + consul_host, + consul_service_name, + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e))) + } + /// Save network configuration to disc async fn save_network_config(self: Arc) -> Result<(), Error> { let ring: Arc = self.ring.borrow().clone(); @@ -375,7 +403,7 @@ impl System { } } - async fn discovery_loop(&self, mut stop_signal: watch::Receiver) { + async fn discovery_loop(self: &Arc, mut stop_signal: watch::Receiver) { let consul_config = match (&self.consul_host, &self.consul_service_name) { (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), _ => None, @@ -419,6 +447,8 @@ impl System { } } + self.background.spawn(self.clone().advertise_to_consul()); + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { _ = restart_at.fuse() => {}, -- cgit v1.2.3