summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-02-02 15:17:23 +0100
committerAlex Auvolat <alex@adnab.me>2023-02-02 15:17:23 +0100
commit552fc7e5a0d623114901d4d8e8e29bfffa47e09c (patch)
tree324903c281474e15aea8e9d86b600c6ecb3999f0
parent19220178311ca80374f6f5ff4069c0adcaab932d (diff)
downloaddf-consul-552fc7e5a0d623114901d4d8e8e29bfffa47e09c.tar.gz
df-consul-552fc7e5a0d623114901d4d8e8e29bfffa47e09c.zip
Split into several files and make more APIs
-rw-r--r--Cargo.toml4
-rw-r--r--examples/health-service-1.json206
-rw-r--r--examples/test.rs59
-rw-r--r--src/catalog.rs230
-rw-r--r--src/kv.rs64
-rw-r--r--src/lib.rs305
-rw-r--r--src/locking.rs65
-rw-r--r--src/with_index.rs75
8 files changed, 748 insertions, 260 deletions
diff --git a/Cargo.toml b/Cargo.toml
index a36f28b..b5860aa 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,8 @@ serde = { version = "1.0.149", features = ["derive"] }
log = "0.4"
bytes = "1"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] }
+tokio = { version = "1.23", default-features = false, features = [ "macros" ] }
+futures = "0.3.25"
[dev-dependencies]
-tokio = { version = "1.22", features = ["rt", "rt-multi-thread", "macros"] }
+tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] }
diff --git a/examples/health-service-1.json b/examples/health-service-1.json
new file mode 100644
index 0000000..915b3a9
--- /dev/null
+++ b/examples/health-service-1.json
@@ -0,0 +1,206 @@
+[
+ {
+ "Node": {
+ "ID": "9ccf821c-5e3e-cbe0-3727-0b0e5df35412",
+ "Node": "celeri",
+ "Address": "10.83.1.3",
+ "Datacenter": "prod",
+ "TaggedAddresses": {
+ "lan": "10.83.1.3",
+ "lan_ipv4": "10.83.1.3",
+ "wan": "10.83.1.3",
+ "wan_ipv4": "10.83.1.3"
+ },
+ "Meta": {
+ "cname_target": "neptune.site.deuxfleurs.fr.",
+ "consul-network-segment": "",
+ "public_ipv4": "77.207.15.215",
+ "public_ipv6": "2001:910:1204:1::33",
+ "site": "neptune"
+ },
+ "CreateIndex": 28797576,
+ "ModifyIndex": 28797584
+ },
+ "Service": {
+ "ID": "_nomad-task-c7132dd2-cc89-cbb4-5e53-6bd7c8c33677-front-https-jitsi-https_port",
+ "Service": "https-jitsi",
+ "Tags": [
+ "jitsi",
+ "tricot jitsi.deuxfleurs.fr",
+ "d53-cname jitsi.deuxfleurs.fr"
+ ],
+ "Address": "10.83.1.3",
+ "TaggedAddresses": {
+ "lan_ipv4": {
+ "Address": "10.83.1.3",
+ "Port": 30140
+ },
+ "wan_ipv4": {
+ "Address": "10.83.1.3",
+ "Port": 30140
+ }
+ },
+ "Meta": {
+ "external-source": "nomad"
+ },
+ "Port": 30140,
+ "Weights": {
+ "Passing": 1,
+ "Warning": 1
+ },
+ "EnableTagOverride": false,
+ "Proxy": {
+ "Mode": "",
+ "MeshGateway": {},
+ "Expose": {}
+ },
+ "Connect": {},
+ "CreateIndex": 29922481,
+ "ModifyIndex": 29922481
+ },
+ "Checks": [
+ {
+ "Node": "celeri",
+ "CheckID": "serfHealth",
+ "Name": "Serf Health Status",
+ "Status": "critical",
+ "Notes": "",
+ "Output": "Agent not live or unreachable",
+ "ServiceID": "",
+ "ServiceName": "",
+ "ServiceTags": [],
+ "Type": "",
+ "Interval": "",
+ "Timeout": "",
+ "ExposedPort": 0,
+ "Definition": {},
+ "CreateIndex": 28797576,
+ "ModifyIndex": 32831331
+ },
+ {
+ "Node": "celeri",
+ "CheckID": "_nomad-check-1e8b81aa1790b51ffdf12117760d851bf00cb96f",
+ "Name": "service: \"https-jitsi\" check",
+ "Status": "passing",
+ "Notes": "",
+ "Output": "TCP connect 10.83.1.3:30140: Success",
+ "ServiceID": "_nomad-task-c7132dd2-cc89-cbb4-5e53-6bd7c8c33677-front-https-jitsi-https_port",
+ "ServiceName": "https-jitsi",
+ "ServiceTags": [
+ "jitsi",
+ "tricot jitsi.deuxfleurs.fr",
+ "d53-cname jitsi.deuxfleurs.fr"
+ ],
+ "Type": "tcp",
+ "Interval": "1m0s",
+ "Timeout": "1m0s",
+ "ExposedPort": 0,
+ "Definition": {},
+ "CreateIndex": 29922481,
+ "ModifyIndex": 29922617
+ }
+ ]
+ },
+ {
+ "Node": {
+ "ID": "cee1c827-ff32-6f38-a815-69038a961a80",
+ "Node": "dahlia",
+ "Address": "10.83.2.1",
+ "Datacenter": "prod",
+ "TaggedAddresses": {
+ "lan": "10.83.2.1",
+ "lan_ipv4": "10.83.2.1",
+ "wan": "10.83.2.1",
+ "wan_ipv4": "10.83.2.1"
+ },
+ "Meta": {
+ "cname_target": "orion.site.deuxfleurs.fr.",
+ "consul-network-segment": "",
+ "public_ipv4": "82.66.80.201",
+ "public_ipv6": "2a01:e0a:28f:5e60::11",
+ "site": "orion"
+ },
+ "CreateIndex": 31880680,
+ "ModifyIndex": 31880870
+ },
+ "Service": {
+ "ID": "_nomad-task-8214d936-aab2-35b4-d24e-8941a2d3b5b0-front-https-jitsi-https_port",
+ "Service": "https-jitsi",
+ "Tags": [
+ "jitsi",
+ "tricot jitsi.deuxfleurs.fr",
+ "d53-cname jitsi.deuxfleurs.fr"
+ ],
+ "Address": "10.83.2.1",
+ "TaggedAddresses": {
+ "lan_ipv4": {
+ "Address": "10.83.2.1",
+ "Port": 25753
+ },
+ "wan_ipv4": {
+ "Address": "10.83.2.1",
+ "Port": 25753
+ }
+ },
+ "Meta": {
+ "external-source": "nomad"
+ },
+ "Port": 25753,
+ "Weights": {
+ "Passing": 1,
+ "Warning": 1
+ },
+ "EnableTagOverride": false,
+ "Proxy": {
+ "Mode": "",
+ "MeshGateway": {},
+ "Expose": {}
+ },
+ "Connect": {},
+ "CreateIndex": 33002874,
+ "ModifyIndex": 33002874
+ },
+ "Checks": [
+ {
+ "Node": "dahlia",
+ "CheckID": "serfHealth",
+ "Name": "Serf Health Status",
+ "Status": "passing",
+ "Notes": "",
+ "Output": "Agent alive and reachable",
+ "ServiceID": "",
+ "ServiceName": "",
+ "ServiceTags": [],
+ "Type": "",
+ "Interval": "",
+ "Timeout": "",
+ "ExposedPort": 0,
+ "Definition": {},
+ "CreateIndex": 31880680,
+ "ModifyIndex": 31880680
+ },
+ {
+ "Node": "dahlia",
+ "CheckID": "_nomad-check-75542d8bde0dd9887ff8af10a02bf6a1f605695d",
+ "Name": "service: \"https-jitsi\" check",
+ "Status": "passing",
+ "Notes": "",
+ "Output": "TCP connect 10.83.2.1:25753: Success",
+ "ServiceID": "_nomad-task-8214d936-aab2-35b4-d24e-8941a2d3b5b0-front-https-jitsi-https_port",
+ "ServiceName": "https-jitsi",
+ "ServiceTags": [
+ "jitsi",
+ "tricot jitsi.deuxfleurs.fr",
+ "d53-cname jitsi.deuxfleurs.fr"
+ ],
+ "Type": "tcp",
+ "Interval": "1m0s",
+ "Timeout": "1m0s",
+ "ExposedPort": 0,
+ "Definition": {},
+ "CreateIndex": 33002874,
+ "ModifyIndex": 33002958
+ }
+ ]
+ }
+]
diff --git a/examples/test.rs b/examples/test.rs
index e7c34c8..ba29583 100644
--- a/examples/test.rs
+++ b/examples/test.rs
@@ -2,23 +2,52 @@ use df_consul::*;
#[tokio::main]
async fn main() {
- let config = ConsulConfig {
- addr: "http://localhost:8500".into(),
- ca_cert: None,
- tls_skip_verify: false,
- client_cert: None,
- client_key: None,
- };
+ let config = ConsulConfig {
+ addr: "http://localhost:8500".into(),
+ ca_cert: None,
+ tls_skip_verify: false,
+ client_cert: None,
+ client_key: None,
+ };
- let consul = Consul::new(config, "").unwrap();
+ let consul = Consul::new(config, "").unwrap();
- println!("== LIST NODES ==");
- let list_nodes = consul.list_nodes().await.unwrap();
- println!("{:?}", list_nodes);
+ println!("== LIST NODES ==");
+ let nodes = consul.catalog_node_list(None).await.unwrap();
+ println!("{:?}", nodes);
- println!("== CATALOG 1 ==");
- println!("{:?}", consul.watch_node("caribou", None).await.unwrap());
+ if let Some(node) = nodes.first() {
+ println!("== NODE {} ==", node.node);
+ println!("{:?}", consul.catalog_node(&node.node, None).await.unwrap());
+ }
- println!("== CATALOG 2 ==");
- println!("{:?}", consul.watch_node("cariacou", None).await.unwrap());
+ println!("== LIST SERVICES ==");
+ let services = consul.catalog_service_list(None).await.unwrap();
+ println!("{:?}", services);
+
+ if let Some(service) = services.keys().next() {
+ println!("== SERVICE NODES {} ==", service);
+ println!(
+ "{:?}",
+ consul.catalog_service_nodes(service, None).await.unwrap()
+ );
+
+ println!("== SERVICE HEALTH {} ==", service);
+ println!(
+ "{:?}",
+ consul
+ .health_service_instances(service, None)
+ .await
+ .unwrap()
+ );
+ }
+
+ println!("== WATCHING EVERYTHING ==");
+ let mut watch = consul.watch_all_service_health();
+ loop {
+ if watch.changed().await.is_err() {
+ break;
+ }
+ println!("\n{:?}", watch.borrow_and_update());
+ }
}
diff --git a/src/catalog.rs b/src/catalog.rs
new file mode 100644
index 0000000..952e99e
--- /dev/null
+++ b/src/catalog.rs
@@ -0,0 +1,230 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+use std::time::Duration;
+
+use anyhow::Result;
+use futures::future::BoxFuture;
+use futures::stream::futures_unordered::FuturesUnordered;
+use futures::{FutureExt, StreamExt};
+use log::*;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+use tokio::sync::watch;
+
+use crate::{Consul, WithIndex};
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulNode {
+ pub node: String,
+ pub address: String,
+ pub meta: HashMap<String, String>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulService {
+ pub service: String,
+ pub address: String,
+ pub port: u16,
+ pub tags: Vec<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulCatalogNode {
+ pub node: ConsulNode,
+ pub services: HashMap<String, ConsulService>,
+}
+
+pub type ConsulServiceList = HashMap<String, Vec<String>>;
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulServiceNode {
+ pub node: String,
+ pub address: String,
+ pub node_meta: HashMap<String, String>,
+ pub service_name: String,
+ pub service_tags: Vec<String>,
+ pub service_address: String,
+ pub service_port: u16,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulHealthServiceNode {
+ pub node: ConsulNode,
+ pub service: ConsulService,
+ pub checks: Vec<ConsulHealthCheck>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulHealthCheck {
+ pub node: String,
+ #[serde(rename = "CheckID")]
+ pub check_id: String,
+ pub name: String,
+ pub status: String,
+ pub output: String,
+ #[serde(rename = "Type")]
+ pub type_: String,
+}
+
+pub type AllServiceHealth = HashMap<String, Arc<[ConsulHealthServiceNode]>>;
+
+impl Consul {
+ pub async fn catalog_node_list(
+ &self,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Vec<ConsulNode>>> {
+ self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
+ .await
+ }
+
+ pub async fn catalog_node(
+ &self,
+ host: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Option<ConsulCatalogNode>>> {
+ self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
+ .await
+ }
+
+ pub async fn catalog_service_list(
+ &self,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<ConsulServiceList>> {
+ self.get_with_index::<ConsulServiceList>(
+ format!("{}/v1/catalog/services", self.url),
+ last_index,
+ )
+ .await
+ }
+
+ pub async fn catalog_service_nodes(
+ &self,
+ service: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Vec<ConsulServiceNode>>> {
+ self.get_with_index(
+ format!("{}/v1/catalog/service/{}", self.url, service),
+ last_index,
+ )
+ .await
+ }
+
+ pub async fn health_service_instances(
+ &self,
+ service: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Vec<ConsulHealthServiceNode>>> {
+ self.get_with_index(
+ format!("{}/v1/health/service/{}", self.url, service),
+ last_index,
+ )
+ .await
+ }
+
+ pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> {
+ let (tx, rx) = watch::channel(HashMap::new());
+
+ tokio::spawn(do_watch_all_service_health(self.clone(), tx));
+
+ rx
+ }
+
+ async fn get_with_index<T: for<'de> Deserialize<'de>>(
+ &self,
+ mut url: String,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<T>> {
+ if let Some(i) = last_index {
+ if url.contains('?') {
+ write!(&mut url, "&index={}", i).unwrap();
+ } else {
+ write!(&mut url, "?index={}", i).unwrap();
+ }
+ }
+ debug!("GET {} as {}", url, std::any::type_name::<T>());
+
+ let http = self.client.get(&url).send().await?;
+
+ Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
+ }
+}
+
+async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) {
+ let mut services = AllServiceHealth::new();
+ let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new();
+ let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None));
+
+ loop {
+ select! {
+ list_res = &mut service_list => {
+ match list_res {
+ Ok(list) => {
+ let list_index = list.index();
+ for service in list.into_inner().keys() {
+ if !services.contains_key(service) {
+ services.insert(service.to_string(), Arc::new([]));
+
+ let service = service.to_string();
+ service_watchers.push(Box::pin(async {
+ let res = consul.health_service_instances(&service, None).await;
+ (service, res)
+ }));
+ }
+ }
+ service_list = Box::pin(consul.catalog_service_list(Some(list_index)));
+ }
+ Err(e) => {
+ warn!("Error listing services: {}", e);
+ service_list = Box::pin(async {
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ consul.catalog_service_list(None).await
+ });
+ }
+ }
+ }
+ (service, watch_res) = service_watchers.next().then(some_or_pending) => {
+ match watch_res {
+ Ok(nodes) => {
+ let index = nodes.index();
+ services.insert(service.clone(), nodes.into_inner().into());
+
+ let consul = &consul;
+ service_watchers.push(Box::pin(async move {
+ let res = consul.health_service_instances(&service, Some(index)).await;
+ (service, res)
+ }));
+
+ if tx.send(services.clone()).is_err() {
+ break;
+ }
+ }
+ Err(e) => {
+ warn!("Error getting service {}: {}", service, e);
+ service_watchers.push(Box::pin(async {
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ let res = consul.health_service_instances(&service, None).await;
+ (service, res)
+ }));
+ }
+ }
+ }
+ _ = tx.closed() => {
+ break;
+ }
+ }
+ }
+}
+
+async fn some_or_pending<T>(value: Option<T>) -> T {
+ match value {
+ Some(v) => v,
+ None => futures::future::pending().await,
+ }
+}
diff --git a/src/kv.rs b/src/kv.rs
new file mode 100644
index 0000000..6c6f15a
--- /dev/null
+++ b/src/kv.rs
@@ -0,0 +1,64 @@
+use anyhow::{anyhow, Result};
+use bytes::Bytes;
+use log::*;
+use reqwest::StatusCode;
+use serde::{Deserialize, Serialize};
+
+use crate::Consul;
+
+impl Consul {
+ pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
+ debug!("kv_get {}", key);
+
+ let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
+ let http = self.client.get(&url).send().await?;
+ match http.status() {
+ StatusCode::OK => Ok(Some(http.bytes().await?)),
+ StatusCode::NOT_FOUND => Ok(None),
+ _ => Err(anyhow!(
+ "Consul request failed: {:?}",
+ http.error_for_status()
+ )),
+ }
+ }
+
+ pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
+ debug!("kv_get_json {}", key);
+
+ let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
+ let http = self.client.get(&url).send().await?;
+ match http.status() {
+ StatusCode::OK => Ok(Some(http.json().await?)),
+ StatusCode::NOT_FOUND => Ok(None),
+ _ => Err(anyhow!(
+ "Consul request failed: {:?}",
+ http.error_for_status()
+ )),
+ }
+ }
+
+ pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
+ debug!("kv_put {}", key);
+
+ let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+
+ pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
+ debug!("kv_put_json {}", key);
+
+ let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
+ let http = self.client.put(&url).json(value).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+
+ pub async fn kv_delete(&self, key: &str) -> Result<()> {
+ let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
+ let http = self.client.delete(&url).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 8a9ad53..6326fa0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,258 +1,75 @@
-use std::collections::HashMap;
+mod catalog;
+mod kv;
+mod locking;
+mod with_index;
+
use std::fs::File;
use std::io::Read;
-use anyhow::{anyhow, bail, Result};
-use bytes::Bytes;
-use log::*;
-use reqwest::StatusCode;
-use serde::{Deserialize, Serialize};
-
-pub struct ConsulConfig {
- pub addr: String,
- pub ca_cert: Option<String>,
- pub tls_skip_verify: bool,
- pub client_cert: Option<String>,
- pub client_key: Option<String>,
-}
-
-// ---- Watch and retrieve Consul catalog ----
-//
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulNode {
- #[serde(rename = "Node")]
- pub node: String,
- #[serde(rename = "Address")]
- pub address: String,
- #[serde(rename = "Meta")]
- pub meta: HashMap<String, String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulServiceEntry {
- #[serde(rename = "Service")]
- pub service: String,
-
- #[serde(rename = "Address")]
- pub address: String,
-
- #[serde(rename = "Port")]
- pub port: u16,
-
- #[serde(rename = "Tags")]
- pub tags: Vec<String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulNodeCatalog {
- #[serde(rename = "Node")]
- pub node: ConsulNode,
- #[serde(rename = "Services")]
- pub services: HashMap<String, ConsulServiceEntry>,
-}
+use anyhow::{bail, Result};
-// ---- Consul session management ----
+pub use with_index::WithIndex;
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulSessionRequest {
- #[serde(rename = "Name")]
- pub name: String,
-
- #[serde(rename = "Node")]
- pub node: Option<String>,
-
- #[serde(rename = "LockDelay")]
- pub lock_delay: Option<String>,
-
- #[serde(rename = "TTL")]
- pub ttl: Option<String>,
-
- #[serde(rename = "Behavior")]
- pub behavior: Option<String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulSessionResponse {
- #[serde(rename = "ID")]
- pub id: String,
+pub struct ConsulConfig {
+ pub addr: String,
+ pub ca_cert: Option<String>,
+ pub tls_skip_verify: bool,
+ pub client_cert: Option<String>,
+ pub client_key: Option<String>,
}
#[derive(Clone)]
pub struct Consul {
- client: reqwest::Client,
+ client: reqwest::Client,
- url: String,
- kv_prefix: String,
+ url: String,
+ kv_prefix: String,
}
impl Consul {
- pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result<Self> {
- let client = match (&config.client_cert, &config.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let identity = reqwest::Identity::from_pem(
- &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
- )?;
-
- if config.tls_skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(identity)
- .build()?
- } else if let Some(ca_cert) = &config.ca_cert {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
-
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- .identity(identity)
- .build()?
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(identity)
- .build()?
- }
- }
- (None, None) => reqwest::Client::new(),
- _ => bail!("Incomplete Consul TLS configuration parameters"),
- };
-
- Ok(Self {
- client,
- url: config.addr.trim_end_matches('/').to_string(),
- kv_prefix: kv_prefix.to_string(),
- })
- }
-
- pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> {
- debug!("list_nodes");
-
- let url = format!("{}/v1/catalog/nodes", self.url);
-
- let http = self.client.get(&url).send().await?;
- let resp: Vec<ConsulNode> = http.json().await?;
- Ok(resp)
- }
-
- pub async fn watch_node(
- &self,
- host: &str,
- idx: Option<usize>,
- ) -> Result<(Option<ConsulNodeCatalog>, usize)> {
- debug!("watch_node {} {:?}", host, idx);
-
- let url = match idx {
- Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
- None => format!("{}/v1/catalog/node/{}", self.url, host),
- };
-
- let http = self.client.get(&url).send().await?;
- let new_idx = match http.headers().get("X-Consul-Index") {
- Some(v) => v.to_str()?.parse::<usize>()?,
- None => bail!("X-Consul-Index header not found"),
- };
-
- let resp: Option<ConsulNodeCatalog> = http.json().await?;
- Ok((resp, new_idx))
- }
-
- // ---- KV get and put ----
-
- pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
- debug!("kv_get {}", key);
-
- let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
- let http = self.client.get(&url).send().await?;
- match http.status() {
- StatusCode::OK => Ok(Some(http.bytes().await?)),
- StatusCode::NOT_FOUND => Ok(None),
- _ => Err(anyhow!(
- "Consul request failed: {:?}",
- http.error_for_status()
- )),
- }
- }
-
- pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
- debug!("kv_get_json {}", key);
-
- let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
- let http = self.client.get(&url).send().await?;
- match http.status() {
- StatusCode::OK => Ok(Some(http.json().await?)),
- StatusCode::NOT_FOUND => Ok(None),
- _ => Err(anyhow!(
- "Consul request failed: {:?}",
- http.error_for_status()
- )),
- }
- }
-
- pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
- debug!("kv_put {}", key);
-
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
- debug!("kv_put_json {}", key);
-
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.put(&url).json(value).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- pub async fn kv_delete(&self, key: &str) -> Result<()> {
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.delete(&url).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- // ---- Locking ----
-
- pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
- debug!("create_session {:?}", req);
-
- let url = format!("{}/v1/session/create", self.url);
- let http = self.client.put(&url).json(req).send().await?;
- let resp: ConsulSessionResponse = http.json().await?;
- Ok(resp.id)
- }
-
- pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
- debug!("acquire {}", key);
-
- let url = format!(
- "{}/v1/kv/{}{}?acquire={}",
- self.url, self.kv_prefix, key, session
- );
- let http = self.client.put(&url).body(bytes).send().await?;
- let resp: bool = http.json().await?;
- Ok(resp)
- }
-
- pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
- debug!("release {}", key);
-
- let url = format!(
- "{}/v1/kv/{}{}?release={}",
- self.url, self.kv_prefix, key, session
- );
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
+ pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result<Self> {
+ let client = match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ if config.tls_skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(identity)
+ .build()?
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ .identity(identity)
+ .build()?
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(identity)
+ .build()?
+ }
+ }
+ (None, None) => reqwest::Client::new(),
+ _ => bail!("Incomplete Consul TLS configuration parameters"),
+ };
+
+ Ok(Self {
+ client,
+ url: config.addr.trim_end_matches('/').to_string(),
+ kv_prefix: kv_prefix.to_string(),
+ })
+ }
}
diff --git a/src/locking.rs b/src/locking.rs
new file mode 100644
index 0000000..12e9ac0
--- /dev/null
+++ b/src/locking.rs
@@ -0,0 +1,65 @@
+use anyhow::Result;
+use bytes::Bytes;
+use log::*;
+use serde::{Deserialize, Serialize};
+
+use crate::Consul;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionRequest {
+ #[serde(rename = "Name")]
+ pub name: String,
+
+ #[serde(rename = "Node")]
+ pub node: Option<String>,
+
+ #[serde(rename = "LockDelay")]
+ pub lock_delay: Option<String>,
+
+ #[serde(rename = "TTL")]
+ pub ttl: Option<String>,
+
+ #[serde(rename = "Behavior")]
+ pub behavior: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionResponse {
+ #[serde(rename = "ID")]
+ pub id: String,
+}
+
+impl Consul {
+ pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
+ debug!("create_session {:?}", req);
+
+ let url = format!("{}/v1/session/create", self.url);
+ let http = self.client.put(&url).json(req).send().await?;
+ let resp: ConsulSessionResponse = http.json().await?;
+ Ok(resp.id)
+ }
+
+ pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
+ debug!("acquire {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?acquire={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ let resp: bool = http.json().await?;
+ Ok(resp)
+ }
+
+ pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
+ debug!("release {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?release={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+}
diff --git a/src/with_index.rs b/src/with_index.rs
new file mode 100644
index 0000000..90e06be
--- /dev/null
+++ b/src/with_index.rs
@@ -0,0 +1,75 @@
+use std::fmt::{Debug, Display};
+
+use anyhow::{bail, Result};
+use reqwest::Response;
+
+pub struct WithIndex<T> {
+ value: T,
+ index: usize,
+}
+
+impl<T> WithIndex<T> {
+ pub fn index_from(resp: &Response) -> Result<WithIndexBuilder<T>> {
+ let index = match resp.headers().get("X-Consul-Index") {
+ Some(v) => v.to_str()?.parse::<usize>()?,
+ None => bail!("X-Consul-Index header not found"),
+ };
+ Ok(WithIndexBuilder {
+ index,
+ _phantom: Default::default(),
+ })
+ }
+
+ pub fn into_inner(self) -> T {
+ self.value
+ }
+
+ pub fn index(&self) -> usize {
+ self.index
+ }
+}
+
+impl<T> std::convert::AsRef<T> for WithIndex<T> {
+ fn as_ref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> std::borrow::Borrow<T> for WithIndex<T> {
+ fn borrow(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> std::ops::Deref for WithIndex<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T: Debug> Debug for WithIndex<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ <T as Debug>::fmt(self, f)
+ }
+}
+
+impl<T: Display> Display for WithIndex<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ <T as Display>::fmt(self, f)
+ }
+}
+
+pub struct WithIndexBuilder<T> {
+ _phantom: std::marker::PhantomData<T>,
+ index: usize,
+}
+
+impl<T> WithIndexBuilder<T> {
+ pub fn value(self, value: T) -> WithIndex<T> {
+ WithIndex {
+ value,
+ index: self.index,
+ }
+ }
+}