diff options
Diffstat (limited to 'src/consul_actor.rs')
-rw-r--r-- | src/consul_actor.rs | 44 |
1 files changed, 40 insertions, 4 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs index 0f09851..c87874d 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -1,15 +1,51 @@ use crate::consul::Consul; use tokio::sync::watch; +use tokio::time::delay_for; +use crate::messages; +use anyhow::Result; +use std::cmp; +use std::time::Duration; +use log::*; pub struct ConsulActor { - consul: Consul - out: + pub rx_open_ports: watch::Receiver<messages::OpenPorts>, + + consul: Consul, + node: String, + retries: u32, + tx_open_ports: watch::Sender<messages::OpenPorts> +} + +fn retry_to_time(retries: u32, max_time: Duration) -> Duration { + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 minutes + return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64)) } impl ConsulActor { - fn new(url: &str) -> Self { + fn new(url: &str, node: &str) -> Self { + let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() }); return Self { - consul: Consul::new(url); + consul: Consul::new(url), + rx_open_ports: rx, + tx_open_ports: tx, + node: node.to_string(), + retries: 0, }; } + + async fn listen(&mut self) -> Result<()> { + loop { + let catalog = match self.consul.watch_node(&self.node).await { + Ok(c) => c, + Err(e) => { + self.retries = cmp::min(u32::MAX - 1, self.retries) + 1; + let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600)); + error!("Failed to query consul. Will retry in {}s. {}", will_retry_in.as_secs(), e); + delay_for(will_retry_in).await; + continue; + } + }; + } + } } |