summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs261
1 files changed, 261 insertions, 0 deletions
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..77820b8
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,261 @@
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::Read;
+
+use anyhow::{anyhow, bail, Result};
+use bytes::Bytes;
+use log::*;
+use reqwest::StatusCode;
+use serde::{Deserialize, Serialize};
+
+pub struct ConsulConfig {
+ pub addr: String,
+ pub ca_cert: Option<String>,
+ pub tls_skip_verify: bool,
+ pub client_cert: Option<String>,
+ pub client_key: Option<String>,
+}
+
+// ---- Watch and retrieve Consul catalog ----
+//
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulNode {
+ #[serde(rename = "Node")]
+ pub node: String,
+ #[serde(rename = "Address")]
+ pub address: String,
+ #[serde(rename = "Meta")]
+ pub meta: HashMap<String, String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulServiceEntry {
+ #[serde(rename = "Service")]
+ pub service: String,
+
+ #[serde(rename = "Address")]
+ pub address: String,
+
+ #[serde(rename = "Port")]
+ pub port: u16,
+
+ #[serde(rename = "Tags")]
+ pub tags: Vec<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulNodeCatalog {
+ #[serde(rename = "Node")]
+ pub node: ConsulNode,
+ #[serde(rename = "Services")]
+ pub services: HashMap<String, ConsulServiceEntry>,
+}
+
+// ---- Consul session management ----
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionRequest {
+ #[serde(rename = "Name")]
+ pub name: String,
+
+ #[serde(rename = "Node")]
+ pub node: Option<String>,
+
+ #[serde(rename = "LockDelay")]
+ pub lock_delay: Option<String>,
+
+ #[serde(rename = "TTL")]
+ pub ttl: Option<String>,
+
+ #[serde(rename = "Behavior")]
+ pub behavior: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionResponse {
+ #[serde(rename = "ID")]
+ pub id: String,
+}
+
+#[derive(Clone)]
+pub struct Consul {
+ client: reqwest::Client,
+
+ url: String,
+ kv_prefix: String,
+
+ pub local_node: String,
+}
+
+impl Consul {
+ pub fn new(config: ConsulConfig, kv_prefix: &str, local_node: &str) -> Result<Self> {
+ let client = match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ if config.tls_skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(identity)
+ .build()?
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ .identity(identity)
+ .build()?
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(identity)
+ .build()?
+ }
+ }
+ (None, None) => reqwest::Client::new(),
+ _ => bail!("Incomplete Consul TLS configuration parameters"),
+ };
+
+ Ok(Self {
+ client,
+ url: config.addr.trim_end_matches('/').to_string(),
+ kv_prefix: kv_prefix.to_string(),
+ local_node: local_node.into(),
+ })
+ }
+
+ pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> {
+ debug!("list_nodes");
+
+ let url = format!("{}/v1/catalog/nodes", self.url);
+
+ let http = self.client.get(&url).send().await?;
+ let resp: Vec<ConsulNode> = http.json().await?;
+ Ok(resp)
+ }
+
+ pub async fn watch_node(
+ &self,
+ host: &str,
+ idx: Option<usize>,
+ ) -> Result<(ConsulNodeCatalog, usize)> {
+ debug!("watch_node {} {:?}", host, idx);
+
+ let url = match idx {
+ Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
+ None => format!("{}/v1/catalog/node/{}", self.url, host),
+ };
+
+ let http = self.client.get(&url).send().await?;
+ let new_idx = match http.headers().get("X-Consul-Index") {
+ Some(v) => v.to_str()?.parse::<usize>()?,
+ None => bail!("X-Consul-Index header not found"),
+ };
+
+ let resp: ConsulNodeCatalog = http.json().await?;
+ Ok((resp, new_idx))
+ }
+
+ // ---- KV get and put ----
+
+ pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
+ debug!("kv_get {}", key);
+
+ let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
+ let http = self.client.get(&url).send().await?;
+ match http.status() {
+ StatusCode::OK => Ok(Some(http.bytes().await?)),
+ StatusCode::NOT_FOUND => Ok(None),
+ _ => Err(anyhow!(
+ "Consul request failed: {:?}",
+ http.error_for_status()
+ )),
+ }
+ }
+
+ pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
+ debug!("kv_get_json {}", key);
+
+ let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
+ let http = self.client.get(&url).send().await?;
+ match http.status() {
+ StatusCode::OK => Ok(Some(http.json().await?)),
+ StatusCode::NOT_FOUND => Ok(None),
+ _ => Err(anyhow!(
+ "Consul request failed: {:?}",
+ http.error_for_status()
+ )),
+ }
+ }
+
+ pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
+ debug!("kv_put {}", key);
+
+ let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+
+ pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
+ debug!("kv_put_json {}", key);
+
+ let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
+ let http = self.client.put(&url).json(value).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+
+ pub async fn kv_delete(&self, key: &str) -> Result<()> {
+ let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
+ let http = self.client.delete(&url).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+
+ // ---- Locking ----
+
+ pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
+ debug!("create_session {:?}", req);
+
+ let url = format!("{}/v1/session/create", self.url);
+ let http = self.client.put(&url).json(req).send().await?;
+ let resp: ConsulSessionResponse = http.json().await?;
+ Ok(resp.id)
+ }
+
+ pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
+ debug!("acquire {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?acquire={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ let resp: bool = http.json().await?;
+ Ok(resp)
+ }
+
+ pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
+ debug!("release {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?release={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+}