aboutsummaryrefslogtreecommitdiff
path: root/src/consul.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/consul.rs')
-rw-r--r--src/consul.rs261
1 files changed, 0 insertions, 261 deletions
diff --git a/src/consul.rs b/src/consul.rs
deleted file mode 100644
index 13b99d8..0000000
--- a/src/consul.rs
+++ /dev/null
@@ -1,261 +0,0 @@
-use std::collections::HashMap;
-use std::fs::File;
-use std::io::Read;
-
-use anyhow::{bail, Result};
-use bytes::Bytes;
-use log::*;
-use reqwest::StatusCode;
-use serde::{Deserialize, Serialize};
-
-pub struct ConsulConfig {
- pub addr: String,
- pub ca_cert: Option<String>,
- pub tls_skip_verify: bool,
- pub client_cert: Option<String>,
- pub client_key: Option<String>,
-}
-
-// ---- Watch and retrieve Consul catalog ----
-//
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulNode {
- #[serde(rename = "Node")]
- pub node: String,
- #[serde(rename = "Address")]
- pub address: String,
- #[serde(rename = "Meta")]
- pub meta: HashMap<String, String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulServiceEntry {
- #[serde(rename = "Service")]
- pub service: String,
-
- #[serde(rename = "Address")]
- pub address: String,
-
- #[serde(rename = "Port")]
- pub port: u16,
-
- #[serde(rename = "Tags")]
- pub tags: Vec<String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulNodeCatalog {
- #[serde(rename = "Node")]
- pub node: ConsulNode,
- #[serde(rename = "Services")]
- pub services: HashMap<String, ConsulServiceEntry>,
-}
-
-// ---- Consul session management ----
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulSessionRequest {
- #[serde(rename = "Name")]
- pub name: String,
-
- #[serde(rename = "Node")]
- pub node: Option<String>,
-
- #[serde(rename = "LockDelay")]
- pub lock_delay: Option<String>,
-
- #[serde(rename = "TTL")]
- pub ttl: Option<String>,
-
- #[serde(rename = "Behavior")]
- pub behavior: Option<String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulSessionResponse {
- #[serde(rename = "ID")]
- pub id: String,
-}
-
-#[derive(Clone)]
-pub struct Consul {
- client: reqwest::Client,
-
- url: String,
- kv_prefix: String,
-
- pub local_node: String,
-}
-
-impl Consul {
- pub fn new(config: ConsulConfig, kv_prefix: &str, local_node: &str) -> Result<Self> {
- let client = match (&config.client_cert, &config.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let identity = reqwest::Identity::from_pem(
- &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
- )?;
-
- if config.tls_skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(identity)
- .build()?
- } else if let Some(ca_cert) = &config.ca_cert {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
-
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- .identity(identity)
- .build()?
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(identity)
- .build()?
- }
- }
- (None, None) => reqwest::Client::new(),
- _ => bail!("Incomplete Consul TLS configuration parameters"),
- };
-
- Ok(Self {
- client,
- url: config.addr.trim_end_matches('/').to_string(),
- kv_prefix: kv_prefix.to_string(),
- local_node: local_node.into(),
- })
- }
-
- pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> {
- debug!("list_nodes");
-
- let url = format!("{}/v1/catalog/nodes", self.url);
-
- let http = self.client.get(&url).send().await?;
- let resp: Vec<ConsulNode> = http.json().await?;
- Ok(resp)
- }
-
- pub async fn watch_node(
- &self,
- host: &str,
- idx: Option<usize>,
- ) -> Result<(ConsulNodeCatalog, usize)> {
- debug!("watch_node {} {:?}", host, idx);
-
- let url = match idx {
- Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
- None => format!("{}/v1/catalog/node/{}", self.url, host),
- };
-
- let http = self.client.get(&url).send().await?;
- let new_idx = match http.headers().get("X-Consul-Index") {
- Some(v) => v.to_str()?.parse::<usize>()?,
- None => bail!("X-Consul-Index header not found"),
- };
-
- let resp: ConsulNodeCatalog = http.json().await?;
- Ok((resp, new_idx))
- }
-
- // ---- KV get and put ----
-
- pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
- debug!("kv_get {}", key);
-
- let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
- let http = self.client.get(&url).send().await?;
- match http.status() {
- StatusCode::OK => Ok(Some(http.bytes().await?)),
- StatusCode::NOT_FOUND => Ok(None),
- _ => Err(anyhow!(
- "Consul request failed: {:?}",
- http.error_for_status()
- )),
- }
- }
-
- pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
- debug!("kv_get_json {}", key);
-
- let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
- let http = self.client.get(&url).send().await?;
- match http.status() {
- StatusCode::OK => Ok(Some(http.json().await?)),
- StatusCode::NOT_FOUND => Ok(None),
- _ => Err(anyhow!(
- "Consul request failed: {:?}",
- http.error_for_status()
- )),
- }
- }
-
- pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
- debug!("kv_put {}", key);
-
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
- debug!("kv_put_json {}", key);
-
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.put(&url).json(value).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- pub async fn kv_delete(&self, key: &str) -> Result<()> {
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.delete(&url).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- // ---- Locking ----
-
- pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
- debug!("create_session {:?}", req);
-
- let url = format!("{}/v1/session/create", self.url);
- let http = self.client.put(&url).json(req).send().await?;
- let resp: ConsulSessionResponse = http.json().await?;
- Ok(resp.id)
- }
-
- pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
- debug!("acquire {}", key);
-
- let url = format!(
- "{}/v1/kv/{}{}?acquire={}",
- self.url, self.kv_prefix, key, session
- );
- let http = self.client.put(&url).body(bytes).send().await?;
- let resp: bool = http.json().await?;
- Ok(resp)
- }
-
- pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
- debug!("release {}", key);
-
- let url = format!(
- "{}/v1/kv/{}{}?release={}",
- self.url, self.kv_prefix, key, session
- );
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-}