aboutsummaryrefslogtreecommitdiff
path: root/src/consul_actor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/consul_actor.rs')
-rw-r--r--src/consul_actor.rs157
1 files changed, 80 insertions, 77 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index 8b722b6..c099215 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -11,107 +11,110 @@ use crate::{consul, messages};
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatParameter {
- tcp_port(HashSet<u16>),
- udp_port(HashSet<u16>),
+ #[serde(rename = "tcp_port")]
+ TcpPort(HashSet<u16>),
+ #[serde(rename = "udp_port")]
+ UdpPort(HashSet<u16>),
}
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatConsul {
- diplonat(Vec<DiplonatParameter>),
+ #[serde(rename = "diplonat")]
+ Diplonat(Vec<DiplonatParameter>),
}
pub struct ConsulActor {
- pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>,
+ pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>,
- consul: consul::Consul,
- node: String,
- retries: u32,
- tx_open_ports: watch::Sender<messages::PublicExposedPorts>,
+ consul: consul::Consul,
+ node: String,
+ retries: u32,
+ tx_open_ports: watch::Sender<messages::PublicExposedPorts>,
}
fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
- // 1.2^x seems to be a good value to exponentially increase time at a good pace
- // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
- // minutes
- return Duration::from_secs(cmp::min(
- max_time.as_secs(),
- 1.2f64.powf(retries as f64) as u64,
- ));
+ // 1.2^x seems to be a good value to exponentially increase time at a good pace
+ // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
+ // minutes
+ return Duration::from_secs(cmp::min(
+ max_time.as_secs(),
+ 1.2f64.powf(retries as f64) as u64,
+ ));
}
fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
- let mut r = Vec::new();
-
- for (_, service_info) in &catalog.Services {
- for tag in &service_info.Tags {
- let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
- match diplo_conf {
- Ok(conf) => r.push(conf),
- Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
- };
+ let mut r = Vec::new();
+
+ for (_, service_info) in &catalog.services {
+ for tag in &service_info.tags {
+ let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
+ match diplo_conf {
+ Ok(conf) => r.push(conf),
+ Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
+ };
+ }
}
- }
- return r;
+ return r;
}
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
- let mut op = messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
-
- for conf in params {
- let DiplonatConsul::diplonat(c) = conf;
- for parameter in c {
- match parameter {
- DiplonatParameter::tcp_port(p) => op.tcp_ports.extend(p),
- DiplonatParameter::udp_port(p) => op.udp_ports.extend(p),
- };
+ let mut op = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+
+ for conf in params {
+ let DiplonatConsul::Diplonat(c) = conf;
+ for parameter in c {
+ match parameter {
+ DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p),
+ DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p),
+ };
+ }
}
- }
- return op;
+ return op;
}
impl ConsulActor {
- pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self {
- let (tx, rx) = watch::channel(messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- });
-
- return Self {
- consul: consul::Consul::new(config),
- rx_open_ports: rx,
- tx_open_ports: tx,
- node: node.to_string(),
- retries: 0,
- };
- }
-
- pub async fn listen(&mut self) -> Result<()> {
- loop {
- let catalog = match self.consul.watch_node(&self.node).await {
- Ok(c) => c,
- Err(e) => {
- self.consul.watch_node_reset();
- self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1;
- let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600));
- error!(
- "Failed to query consul. Will retry in {}s. {}",
- will_retry_in.as_secs(),
- e
- );
- sleep(will_retry_in).await;
- continue;
- }
- };
- self.retries = 0;
- let msg = to_open_ports(&to_parameters(&catalog));
- debug!("Extracted configuration: {:#?}", msg);
+ pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self {
+ let (tx, rx) = watch::channel(messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ });
+
+ return Self {
+ consul: consul::Consul::new(config),
+ rx_open_ports: rx,
+ tx_open_ports: tx,
+ node: node.to_string(),
+ retries: 0,
+ };
+ }
- self.tx_open_ports.send(msg)?;
+ pub async fn listen(&mut self) -> Result<()> {
+ loop {
+ let catalog = match self.consul.watch_node(&self.node).await {
+ Ok(c) => c,
+ Err(e) => {
+ self.consul.watch_node_reset();
+ self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1;
+ let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600));
+ error!(
+ "Failed to query consul. Will retry in {}s. {}",
+ will_retry_in.as_secs(),
+ e
+ );
+ sleep(will_retry_in).await;
+ continue;
+ }
+ };
+ self.retries = 0;
+ let msg = to_open_ports(&to_parameters(&catalog));
+ debug!("Extracted configuration: {:#?}", msg);
+
+ self.tx_open_ports.send(msg)?;
+ }
}
- }
}