From 552fc7e5a0d623114901d4d8e8e29bfffa47e09c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 2 Feb 2023 15:17:23 +0100 Subject: Split into several files and make more APIs --- src/catalog.rs | 230 ++++++++++++++++++++++++++++++++++++++++ src/kv.rs | 64 ++++++++++++ src/lib.rs | 305 +++++++++++------------------------------------------- src/locking.rs | 65 ++++++++++++ src/with_index.rs | 75 ++++++++++++++ 5 files changed, 495 insertions(+), 244 deletions(-) create mode 100644 src/catalog.rs create mode 100644 src/kv.rs create mode 100644 src/locking.rs create mode 100644 src/with_index.rs (limited to 'src') diff --git a/src/catalog.rs b/src/catalog.rs new file mode 100644 index 0000000..952e99e --- /dev/null +++ b/src/catalog.rs @@ -0,0 +1,230 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use futures::future::BoxFuture; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use log::*; +use serde::{Deserialize, Serialize}; +use tokio::select; +use tokio::sync::watch; + +use crate::{Consul, WithIndex}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulNode { + pub node: String, + pub address: String, + pub meta: HashMap, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulService { + pub service: String, + pub address: String, + pub port: u16, + pub tags: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulCatalogNode { + pub node: ConsulNode, + pub services: HashMap, +} + +pub type ConsulServiceList = HashMap>; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulServiceNode { + pub node: String, + pub address: String, + pub node_meta: HashMap, + pub service_name: String, + pub service_tags: Vec, + pub service_address: String, + pub service_port: u16, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulHealthServiceNode { + pub node: ConsulNode, + pub service: ConsulService, + pub checks: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulHealthCheck { + pub node: String, + #[serde(rename = "CheckID")] + pub check_id: String, + pub name: String, + pub status: String, + pub output: String, + #[serde(rename = "Type")] + pub type_: String, +} + +pub type AllServiceHealth = HashMap>; + +impl Consul { + pub async fn catalog_node_list( + &self, + last_index: Option, + ) -> Result>> { + self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index) + .await + } + + pub async fn catalog_node( + &self, + host: &str, + last_index: Option, + ) -> Result>> { + self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index) + .await + } + + pub async fn catalog_service_list( + &self, + last_index: Option, + ) -> Result> { + self.get_with_index::( + format!("{}/v1/catalog/services", self.url), + last_index, + ) + .await + } + + pub async fn catalog_service_nodes( + &self, + service: &str, + last_index: Option, + ) -> Result>> { + self.get_with_index( + format!("{}/v1/catalog/service/{}", self.url, service), + last_index, + ) + .await + } + + pub async fn health_service_instances( + &self, + service: &str, + last_index: Option, + ) -> Result>> { + self.get_with_index( + format!("{}/v1/health/service/{}", self.url, service), + last_index, + ) + .await + } + + pub fn watch_all_service_health(&self) -> watch::Receiver { + let (tx, rx) = watch::channel(HashMap::new()); + + tokio::spawn(do_watch_all_service_health(self.clone(), tx)); + + rx + } + + async fn get_with_index Deserialize<'de>>( + &self, + mut url: String, + last_index: Option, + ) -> Result> { + if let Some(i) = last_index { + if url.contains('?') { + write!(&mut url, "&index={}", i).unwrap(); + } else { + write!(&mut url, "?index={}", i).unwrap(); + } + } + debug!("GET {} as {}", url, std::any::type_name::()); + + let http = self.client.get(&url).send().await?; + + Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) + } +} + +async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender) { + let mut services = AllServiceHealth::new(); + let mut service_watchers = FuturesUnordered::)>>::new(); + let mut service_list: BoxFuture> = Box::pin(consul.catalog_service_list(None)); + + loop { + select! { + list_res = &mut service_list => { + match list_res { + Ok(list) => { + let list_index = list.index(); + for service in list.into_inner().keys() { + if !services.contains_key(service) { + services.insert(service.to_string(), Arc::new([])); + + let service = service.to_string(); + service_watchers.push(Box::pin(async { + let res = consul.health_service_instances(&service, None).await; + (service, res) + })); + } + } + service_list = Box::pin(consul.catalog_service_list(Some(list_index))); + } + Err(e) => { + warn!("Error listing services: {}", e); + service_list = Box::pin(async { + tokio::time::sleep(Duration::from_secs(30)).await; + consul.catalog_service_list(None).await + }); + } + } + } + (service, watch_res) = service_watchers.next().then(some_or_pending) => { + match watch_res { + Ok(nodes) => { + let index = nodes.index(); + services.insert(service.clone(), nodes.into_inner().into()); + + let consul = &consul; + service_watchers.push(Box::pin(async move { + let res = consul.health_service_instances(&service, Some(index)).await; + (service, res) + })); + + if tx.send(services.clone()).is_err() { + break; + } + } + Err(e) => { + warn!("Error getting service {}: {}", service, e); + service_watchers.push(Box::pin(async { + tokio::time::sleep(Duration::from_secs(30)).await; + let res = consul.health_service_instances(&service, None).await; + (service, res) + })); + } + } + } + _ = tx.closed() => { + break; + } + } + } +} + +async fn some_or_pending(value: Option) -> T { + match value { + Some(v) => v, + None => futures::future::pending().await, + } +} diff --git a/src/kv.rs b/src/kv.rs new file mode 100644 index 0000000..6c6f15a --- /dev/null +++ b/src/kv.rs @@ -0,0 +1,64 @@ +use anyhow::{anyhow, Result}; +use bytes::Bytes; +use log::*; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; + +use crate::Consul; + +impl Consul { + pub async fn kv_get(&self, key: &str) -> Result> { + debug!("kv_get {}", key); + + let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); + let http = self.client.get(&url).send().await?; + match http.status() { + StatusCode::OK => Ok(Some(http.bytes().await?)), + StatusCode::NOT_FOUND => Ok(None), + _ => Err(anyhow!( + "Consul request failed: {:?}", + http.error_for_status() + )), + } + } + + pub async fn kv_get_json Deserialize<'de>>(&self, key: &str) -> Result> { + debug!("kv_get_json {}", key); + + let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); + let http = self.client.get(&url).send().await?; + match http.status() { + StatusCode::OK => Ok(Some(http.json().await?)), + StatusCode::NOT_FOUND => Ok(None), + _ => Err(anyhow!( + "Consul request failed: {:?}", + http.error_for_status() + )), + } + } + + pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { + debug!("kv_put {}", key); + + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.put(&url).body(bytes).send().await?; + http.error_for_status()?; + Ok(()) + } + + pub async fn kv_put_json(&self, key: &str, value: &T) -> Result<()> { + debug!("kv_put_json {}", key); + + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.put(&url).json(value).send().await?; + http.error_for_status()?; + Ok(()) + } + + pub async fn kv_delete(&self, key: &str) -> Result<()> { + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.delete(&url).send().await?; + http.error_for_status()?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8a9ad53..6326fa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,258 +1,75 @@ -use std::collections::HashMap; +mod catalog; +mod kv; +mod locking; +mod with_index; + use std::fs::File; use std::io::Read; -use anyhow::{anyhow, bail, Result}; -use bytes::Bytes; -use log::*; -use reqwest::StatusCode; -use serde::{Deserialize, Serialize}; - -pub struct ConsulConfig { - pub addr: String, - pub ca_cert: Option, - pub tls_skip_verify: bool, - pub client_cert: Option, - pub client_key: Option, -} - -// ---- Watch and retrieve Consul catalog ---- -// -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulNode { - #[serde(rename = "Node")] - pub node: String, - #[serde(rename = "Address")] - pub address: String, - #[serde(rename = "Meta")] - pub meta: HashMap, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulServiceEntry { - #[serde(rename = "Service")] - pub service: String, - - #[serde(rename = "Address")] - pub address: String, - - #[serde(rename = "Port")] - pub port: u16, - - #[serde(rename = "Tags")] - pub tags: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulNodeCatalog { - #[serde(rename = "Node")] - pub node: ConsulNode, - #[serde(rename = "Services")] - pub services: HashMap, -} +use anyhow::{bail, Result}; -// ---- Consul session management ---- +pub use with_index::WithIndex; -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulSessionRequest { - #[serde(rename = "Name")] - pub name: String, - - #[serde(rename = "Node")] - pub node: Option, - - #[serde(rename = "LockDelay")] - pub lock_delay: Option, - - #[serde(rename = "TTL")] - pub ttl: Option, - - #[serde(rename = "Behavior")] - pub behavior: Option, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulSessionResponse { - #[serde(rename = "ID")] - pub id: String, +pub struct ConsulConfig { + pub addr: String, + pub ca_cert: Option, + pub tls_skip_verify: bool, + pub client_cert: Option, + pub client_key: Option, } #[derive(Clone)] pub struct Consul { - client: reqwest::Client, + client: reqwest::Client, - url: String, - kv_prefix: String, + url: String, + kv_prefix: String, } impl Consul { - pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result { - let client = match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - if config.tls_skip_verify { - reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()? - } else if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - - reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()? - } else { - reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()? - } - } - (None, None) => reqwest::Client::new(), - _ => bail!("Incomplete Consul TLS configuration parameters"), - }; - - Ok(Self { - client, - url: config.addr.trim_end_matches('/').to_string(), - kv_prefix: kv_prefix.to_string(), - }) - } - - pub async fn list_nodes(&self) -> Result> { - debug!("list_nodes"); - - let url = format!("{}/v1/catalog/nodes", self.url); - - let http = self.client.get(&url).send().await?; - let resp: Vec = http.json().await?; - Ok(resp) - } - - pub async fn watch_node( - &self, - host: &str, - idx: Option, - ) -> Result<(Option, usize)> { - debug!("watch_node {} {:?}", host, idx); - - let url = match idx { - Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i), - None => format!("{}/v1/catalog/node/{}", self.url, host), - }; - - let http = self.client.get(&url).send().await?; - let new_idx = match http.headers().get("X-Consul-Index") { - Some(v) => v.to_str()?.parse::()?, - None => bail!("X-Consul-Index header not found"), - }; - - let resp: Option = http.json().await?; - Ok((resp, new_idx)) - } - - // ---- KV get and put ---- - - pub async fn kv_get(&self, key: &str) -> Result> { - debug!("kv_get {}", key); - - let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); - let http = self.client.get(&url).send().await?; - match http.status() { - StatusCode::OK => Ok(Some(http.bytes().await?)), - StatusCode::NOT_FOUND => Ok(None), - _ => Err(anyhow!( - "Consul request failed: {:?}", - http.error_for_status() - )), - } - } - - pub async fn kv_get_json Deserialize<'de>>(&self, key: &str) -> Result> { - debug!("kv_get_json {}", key); - - let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); - let http = self.client.get(&url).send().await?; - match http.status() { - StatusCode::OK => Ok(Some(http.json().await?)), - StatusCode::NOT_FOUND => Ok(None), - _ => Err(anyhow!( - "Consul request failed: {:?}", - http.error_for_status() - )), - } - } - - pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { - debug!("kv_put {}", key); - - let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); - let http = self.client.put(&url).body(bytes).send().await?; - http.error_for_status()?; - Ok(()) - } - - pub async fn kv_put_json(&self, key: &str, value: &T) -> Result<()> { - debug!("kv_put_json {}", key); - - let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); - let http = self.client.put(&url).json(value).send().await?; - http.error_for_status()?; - Ok(()) - } - - pub async fn kv_delete(&self, key: &str) -> Result<()> { - let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); - let http = self.client.delete(&url).send().await?; - http.error_for_status()?; - Ok(()) - } - - // ---- Locking ---- - - pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { - debug!("create_session {:?}", req); - - let url = format!("{}/v1/session/create", self.url); - let http = self.client.put(&url).json(req).send().await?; - let resp: ConsulSessionResponse = http.json().await?; - Ok(resp.id) - } - - pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { - debug!("acquire {}", key); - - let url = format!( - "{}/v1/kv/{}{}?acquire={}", - self.url, self.kv_prefix, key, session - ); - let http = self.client.put(&url).body(bytes).send().await?; - let resp: bool = http.json().await?; - Ok(resp) - } - - pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { - debug!("release {}", key); - - let url = format!( - "{}/v1/kv/{}{}?release={}", - self.url, self.kv_prefix, key, session - ); - let http = self.client.put(&url).body(bytes).send().await?; - http.error_for_status()?; - Ok(()) - } + pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result { + let client = match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + if config.tls_skip_verify { + reqwest::Client::builder() + .use_rustls_tls() + .danger_accept_invalid_certs(true) + .identity(identity) + .build()? + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + + reqwest::Client::builder() + .use_rustls_tls() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .identity(identity) + .build()? + } else { + reqwest::Client::builder() + .use_rustls_tls() + .identity(identity) + .build()? + } + } + (None, None) => reqwest::Client::new(), + _ => bail!("Incomplete Consul TLS configuration parameters"), + }; + + Ok(Self { + client, + url: config.addr.trim_end_matches('/').to_string(), + kv_prefix: kv_prefix.to_string(), + }) + } } diff --git a/src/locking.rs b/src/locking.rs new file mode 100644 index 0000000..12e9ac0 --- /dev/null +++ b/src/locking.rs @@ -0,0 +1,65 @@ +use anyhow::Result; +use bytes::Bytes; +use log::*; +use serde::{Deserialize, Serialize}; + +use crate::Consul; + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulSessionRequest { + #[serde(rename = "Name")] + pub name: String, + + #[serde(rename = "Node")] + pub node: Option, + + #[serde(rename = "LockDelay")] + pub lock_delay: Option, + + #[serde(rename = "TTL")] + pub ttl: Option, + + #[serde(rename = "Behavior")] + pub behavior: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulSessionResponse { + #[serde(rename = "ID")] + pub id: String, +} + +impl Consul { + pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { + debug!("create_session {:?}", req); + + let url = format!("{}/v1/session/create", self.url); + let http = self.client.put(&url).json(req).send().await?; + let resp: ConsulSessionResponse = http.json().await?; + Ok(resp.id) + } + + pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { + debug!("acquire {}", key); + + let url = format!( + "{}/v1/kv/{}{}?acquire={}", + self.url, self.kv_prefix, key, session + ); + let http = self.client.put(&url).body(bytes).send().await?; + let resp: bool = http.json().await?; + Ok(resp) + } + + pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { + debug!("release {}", key); + + let url = format!( + "{}/v1/kv/{}{}?release={}", + self.url, self.kv_prefix, key, session + ); + let http = self.client.put(&url).body(bytes).send().await?; + http.error_for_status()?; + Ok(()) + } +} diff --git a/src/with_index.rs b/src/with_index.rs new file mode 100644 index 0000000..90e06be --- /dev/null +++ b/src/with_index.rs @@ -0,0 +1,75 @@ +use std::fmt::{Debug, Display}; + +use anyhow::{bail, Result}; +use reqwest::Response; + +pub struct WithIndex { + value: T, + index: usize, +} + +impl WithIndex { + pub fn index_from(resp: &Response) -> Result> { + let index = match resp.headers().get("X-Consul-Index") { + Some(v) => v.to_str()?.parse::()?, + None => bail!("X-Consul-Index header not found"), + }; + Ok(WithIndexBuilder { + index, + _phantom: Default::default(), + }) + } + + pub fn into_inner(self) -> T { + self.value + } + + pub fn index(&self) -> usize { + self.index + } +} + +impl std::convert::AsRef for WithIndex { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl std::borrow::Borrow for WithIndex { + fn borrow(&self) -> &T { + &self.value + } +} + +impl std::ops::Deref for WithIndex { + type Target = T; + fn deref(&self) -> &T { + &self.value + } +} + +impl Debug for WithIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +impl Display for WithIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +pub struct WithIndexBuilder { + _phantom: std::marker::PhantomData, + index: usize, +} + +impl WithIndexBuilder { + pub fn value(self, value: T) -> WithIndex { + WithIndex { + value, + index: self.index, + } + } +} -- cgit v1.2.3