diff options
Diffstat (limited to 'src/consul_actor.rs')
-rw-r--r-- | src/consul_actor.rs | 157 |
1 files changed, 80 insertions, 77 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs index 8b722b6..c099215 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -11,107 +11,110 @@ use crate::{consul, messages}; #[derive(Serialize, Deserialize, Debug)] pub enum DiplonatParameter { - tcp_port(HashSet<u16>), - udp_port(HashSet<u16>), + #[serde(rename = "tcp_port")] + TcpPort(HashSet<u16>), + #[serde(rename = "udp_port")] + UdpPort(HashSet<u16>), } #[derive(Serialize, Deserialize, Debug)] pub enum DiplonatConsul { - diplonat(Vec<DiplonatParameter>), + #[serde(rename = "diplonat")] + Diplonat(Vec<DiplonatParameter>), } pub struct ConsulActor { - pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>, + pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>, - consul: consul::Consul, - node: String, - retries: u32, - tx_open_ports: watch::Sender<messages::PublicExposedPorts>, + consul: consul::Consul, + node: String, + retries: u32, + tx_open_ports: watch::Sender<messages::PublicExposedPorts>, } fn retry_to_time(retries: u32, max_time: Duration) -> Duration { - // 1.2^x seems to be a good value to exponentially increase time at a good pace - // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 - // minutes - return Duration::from_secs(cmp::min( - max_time.as_secs(), - 1.2f64.powf(retries as f64) as u64, - )); + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 + // minutes + return Duration::from_secs(cmp::min( + max_time.as_secs(), + 1.2f64.powf(retries as f64) as u64, + )); } fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> { - let mut r = Vec::new(); - - for (_, service_info) in &catalog.Services { - for tag in &service_info.Tags { - let diplo_conf: error::Result<DiplonatConsul> = from_str(tag); - match diplo_conf { - Ok(conf) => r.push(conf), - Err(e) => debug!("Failed to parse entry {}. {}", tag, e), - }; + let mut r = Vec::new(); + + for (_, service_info) in &catalog.services { + for tag in &service_info.tags { + let diplo_conf: error::Result<DiplonatConsul> = from_str(tag); + match diplo_conf { + Ok(conf) => r.push(conf), + Err(e) => debug!("Failed to parse entry {}. {}", tag, e), + }; + } } - } - return r; + return r; } fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts { - let mut op = messages::PublicExposedPorts { - tcp_ports: HashSet::new(), - udp_ports: HashSet::new(), - }; - - for conf in params { - let DiplonatConsul::diplonat(c) = conf; - for parameter in c { - match parameter { - DiplonatParameter::tcp_port(p) => op.tcp_ports.extend(p), - DiplonatParameter::udp_port(p) => op.udp_ports.extend(p), - }; + let mut op = messages::PublicExposedPorts { + tcp_ports: HashSet::new(), + udp_ports: HashSet::new(), + }; + + for conf in params { + let DiplonatConsul::Diplonat(c) = conf; + for parameter in c { + match parameter { + DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p), + DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p), + }; + } } - } - return op; + return op; } impl ConsulActor { - pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self { - let (tx, rx) = watch::channel(messages::PublicExposedPorts { - tcp_ports: HashSet::new(), - udp_ports: HashSet::new(), - }); - - return Self { - consul: consul::Consul::new(config), - rx_open_ports: rx, - tx_open_ports: tx, - node: node.to_string(), - retries: 0, - }; - } - - pub async fn listen(&mut self) -> Result<()> { - loop { - let catalog = match self.consul.watch_node(&self.node).await { - Ok(c) => c, - Err(e) => { - self.consul.watch_node_reset(); - self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1; - let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600)); - error!( - "Failed to query consul. Will retry in {}s. {}", - will_retry_in.as_secs(), - e - ); - sleep(will_retry_in).await; - continue; - } - }; - self.retries = 0; - let msg = to_open_ports(&to_parameters(&catalog)); - debug!("Extracted configuration: {:#?}", msg); + pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self { + let (tx, rx) = watch::channel(messages::PublicExposedPorts { + tcp_ports: HashSet::new(), + udp_ports: HashSet::new(), + }); + + return Self { + consul: consul::Consul::new(config), + rx_open_ports: rx, + tx_open_ports: tx, + node: node.to_string(), + retries: 0, + }; + } - self.tx_open_ports.send(msg)?; + pub async fn listen(&mut self) -> Result<()> { + loop { + let catalog = match self.consul.watch_node(&self.node).await { + Ok(c) => c, + Err(e) => { + self.consul.watch_node_reset(); + self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1; + let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600)); + error!( + "Failed to query consul. Will retry in {}s. {}", + will_retry_in.as_secs(), + e + ); + sleep(will_retry_in).await; + continue; + } + }; + self.retries = 0; + let msg = to_open_ports(&to_parameters(&catalog)); + debug!("Extracted configuration: {:#?}", msg); + + self.tx_open_ports.send(msg)?; + } } - } } |