From 45e12c3bcd1198bdfce3a4f7f000ef4177067a5c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 21 Apr 2023 13:10:41 +0200 Subject: add kv_get_prefix --- Cargo.toml | 3 ++- examples/test.rs | 15 +++++++++++++++ src/catalog.rs | 20 -------------------- src/kv.rs | 38 +++++++++++++++++++++++++++++++++++++- src/with_index.rs | 31 ++++++++++++++++++++++++++++--- 5 files changed, 82 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f967b4..99c4aa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "df-consul" description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API" authors = [ "Alex Auvolat " ] -version = "0.3.3" +version = "0.3.4" edition = "2021" license = "MIT" repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul" @@ -14,6 +14,7 @@ anyhow = "1.0.66" serde = { version = "1.0.149", features = ["derive"] } log = "0.4" bytes = "1" +base64 = "0.13" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] } tokio = { version = "1.23", default-features = false, features = [ "macros" ] } futures = "0.3.25" diff --git a/examples/test.rs b/examples/test.rs index 7e38c66..111c8c8 100644 --- a/examples/test.rs +++ b/examples/test.rs @@ -45,4 +45,19 @@ async fn main() { .unwrap() ); } + + println!("== LIST PREFIX =="); + let prefix = consul + .kv_get_prefix("diplonat/autodiscovery", None) + .await + .unwrap(); + println!("{:?}", prefix); + for i in 0..3 { + println!("-- wait for update... --"); + let prefix = consul + .kv_get_prefix("diplonat/autodiscovery", Some(prefix.index())) + .await + .unwrap(); + println!("{:?}", prefix); + } } diff --git a/src/catalog.rs b/src/catalog.rs index 042f745..fe82c5a 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -5,7 +5,6 @@ use std::cmp; use std::collections::HashMap; -use std::fmt::Write; use std::sync::Arc; use std::time::Duration; @@ -178,25 +177,6 @@ impl Consul { rx } - - async fn get_with_index Deserialize<'de>>( - &self, - mut url: String, - last_index: Option, - ) -> Result> { - if let Some(i) = last_index { - if url.contains('?') { - write!(&mut url, "&index={}", i).unwrap(); - } else { - write!(&mut url, "?index={}", i).unwrap(); - } - } - debug!("GET {} as {}", url, std::any::type_name::()); - - let http = self.client.get(&url).send().await?; - - Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) - } } async fn do_watch_all_service_health( diff --git a/src/kv.rs b/src/kv.rs index 6c6f15a..5373ec6 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1,10 +1,19 @@ +use std::collections::HashMap; + use anyhow::{anyhow, Result}; use bytes::Bytes; use log::*; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use crate::Consul; +use crate::{Consul, WithIndex}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub struct KvGetPrefixEntry { + pub key: String, + pub value: String, +} impl Consul { pub async fn kv_get(&self, key: &str) -> Result> { @@ -37,6 +46,33 @@ impl Consul { } } + pub async fn kv_get_prefix( + &self, + key_prefix: &str, + last_index: Option, + ) -> Result>> { + debug!("kv_get_prefix {} index={:?}", key_prefix, last_index); + let results: WithIndex> = self + .get_with_index( + format!( + "{}/v1/kv/{}{}?recurse", + self.url, self.kv_prefix, key_prefix + ), + last_index, + ) + .await?; + + let mut res = HashMap::new(); + for ent in results.value { + res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?)); + } + + Ok(WithIndex { + value: res, + index: results.index, + }) + } + pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { debug!("kv_put {}", key); diff --git a/src/with_index.rs b/src/with_index.rs index adce169..ba94779 100644 --- a/src/with_index.rs +++ b/src/with_index.rs @@ -1,14 +1,39 @@ -use std::fmt::{Debug, Display}; +use std::fmt::{Debug, Display, Write}; use anyhow::{bail, Result}; +use log::*; use reqwest::Response; +use serde::Deserialize; + +use crate::Consul; + +impl Consul { + pub(crate) async fn get_with_index Deserialize<'de>>( + &self, + mut url: String, + last_index: Option, + ) -> Result> { + if let Some(i) = last_index { + if url.contains('?') { + write!(&mut url, "&index={}", i).unwrap(); + } else { + write!(&mut url, "?index={}", i).unwrap(); + } + } + debug!("GET {} as {}", url, std::any::type_name::()); + + let http = self.client.get(&url).send().await?; + + Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) + } +} /// Wraps the returned value of an [API call with blocking /// possibility](https://developer.hashicorp.com/consul/api-docs/features/blocking) with the /// returned Consul index pub struct WithIndex { - value: T, - index: usize, + pub(crate) value: T, + pub(crate) index: usize, } impl WithIndex { -- cgit v1.2.3