aboutsummaryrefslogtreecommitdiff
path: root/src/consul.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/consul.rs')
-rw-r--r--src/consul.rs117
1 files changed, 63 insertions, 54 deletions
diff --git a/src/consul.rs b/src/consul.rs
index c7ac2b6..7f695b2 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -7,71 +7,80 @@ use crate::config::RuntimeConfigConsul;
#[derive(Serialize, Deserialize, Debug)]
pub struct ServiceEntry {
- pub Tags: Vec<String>,
+ #[serde(rename = "Tags")]
+ pub tags: Vec<String>,
}
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Default)]
pub struct CatalogNode {
- pub Services: HashMap<String, ServiceEntry>,
+ #[serde(rename = "Services")]
+ pub services: HashMap<String, ServiceEntry>,
}
pub struct Consul {
- client: reqwest::Client,
- url: String,
- idx: Option<u64>,
+ client: reqwest::Client,
+ url: String,
+ idx: Option<u64>,
}
impl Consul {
- pub fn new(config: &RuntimeConfigConsul) -> Self {
- let client = if let Some((ca, skip_verify, ident)) = config.tls.clone() {
- if skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- } else if let Some(ca) = ca {
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(ca)
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- }
- } else {
- reqwest::Client::new()
- };
- return Self {
- client,
- url: config.url.clone(),
- idx: None,
- };
- }
+ pub fn new(config: &RuntimeConfigConsul) -> Self {
+ let client = if let Some((ca, skip_verify, ident)) = config.tls.clone() {
+ if skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ } else if let Some(ca) = ca {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(ca)
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ }
+ } else {
+ reqwest::Client::new()
+ };
+ return Self {
+ client,
+ url: config.url.clone(),
+ idx: None,
+ };
+ }
+
+ pub fn watch_node_reset(&mut self) -> () {
+ self.idx = None;
+ }
- pub fn watch_node_reset(&mut self) -> () {
- self.idx = None;
- }
+ pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
+ let url = match self.idx {
+ Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
+ None => format!("{}/v1/catalog/node/{}", self.url, host),
+ };
- pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
- let url = match self.idx {
- Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
- None => format!("{}/v1/catalog/node/{}", self.url, host),
- };
+ let http = self.client.get(&url).send().await?;
+ self.idx = match http.headers().get("X-Consul-Index") {
+ Some(v) => Some(v.to_str()?.parse::<u64>()?),
+ None => return Err(anyhow!("X-Consul-Index header not found")),
+ };
- let http = self.client.get(&url).send().await?;
- self.idx = match http.headers().get("X-Consul-Index") {
- Some(v) => Some(v.to_str()?.parse::<u64>()?),
- None => return Err(anyhow!("X-Consul-Index header not found")),
- };
+ let resp: Option<CatalogNode> = http.json().await?;
+ return Ok(resp.unwrap_or_default());
+ }
- let resp: CatalogNode = http.json().await?;
- return Ok(resp);
- }
+ pub async fn kv_put(&self, key: &str, bytes: Vec<u8>) -> Result<()> {
+ let url = format!("{}/v1/kv/{}", self.url, key);
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
}