summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-04-21 13:10:41 +0200
committerAlex Auvolat <alex@adnab.me>2023-04-21 13:10:41 +0200
commit45e12c3bcd1198bdfce3a4f7f000ef4177067a5c (patch)
tree603808716ac2fb6b6b639acdcefe6e76fc48e3b6
parenta11ed03031cb59b412950d62960fc27cf919e2bf (diff)
downloaddf-consul-45e12c3bcd1198bdfce3a4f7f000ef4177067a5c.tar.gz
df-consul-45e12c3bcd1198bdfce3a4f7f000ef4177067a5c.zip
add kv_get_prefixv0.3.4
-rw-r--r--Cargo.toml3
-rw-r--r--examples/test.rs15
-rw-r--r--src/catalog.rs20
-rw-r--r--src/kv.rs38
-rw-r--r--src/with_index.rs31
5 files changed, 82 insertions, 25 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 3f967b4..99c4aa0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,7 +2,7 @@
name = "df-consul"
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
authors = [ "Alex Auvolat <alex@adnab.me>" ]
-version = "0.3.3"
+version = "0.3.4"
edition = "2021"
license = "MIT"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
@@ -14,6 +14,7 @@ anyhow = "1.0.66"
serde = { version = "1.0.149", features = ["derive"] }
log = "0.4"
bytes = "1"
+base64 = "0.13"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] }
tokio = { version = "1.23", default-features = false, features = [ "macros" ] }
futures = "0.3.25"
diff --git a/examples/test.rs b/examples/test.rs
index 7e38c66..111c8c8 100644
--- a/examples/test.rs
+++ b/examples/test.rs
@@ -45,4 +45,19 @@ async fn main() {
.unwrap()
);
}
+
+ println!("== LIST PREFIX ==");
+ let prefix = consul
+ .kv_get_prefix("diplonat/autodiscovery", None)
+ .await
+ .unwrap();
+ println!("{:?}", prefix);
+ for i in 0..3 {
+ println!("-- wait for update... --");
+ let prefix = consul
+ .kv_get_prefix("diplonat/autodiscovery", Some(prefix.index()))
+ .await
+ .unwrap();
+ println!("{:?}", prefix);
+ }
}
diff --git a/src/catalog.rs b/src/catalog.rs
index 042f745..fe82c5a 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -5,7 +5,6 @@
use std::cmp;
use std::collections::HashMap;
-use std::fmt::Write;
use std::sync::Arc;
use std::time::Duration;
@@ -178,25 +177,6 @@ impl Consul {
rx
}
-
- async fn get_with_index<T: for<'de> Deserialize<'de>>(
- &self,
- mut url: String,
- last_index: Option<usize>,
- ) -> Result<WithIndex<T>> {
- if let Some(i) = last_index {
- if url.contains('?') {
- write!(&mut url, "&index={}", i).unwrap();
- } else {
- write!(&mut url, "?index={}", i).unwrap();
- }
- }
- debug!("GET {} as {}", url, std::any::type_name::<T>());
-
- let http = self.client.get(&url).send().await?;
-
- Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
- }
}
async fn do_watch_all_service_health(
diff --git a/src/kv.rs b/src/kv.rs
index 6c6f15a..5373ec6 100644
--- a/src/kv.rs
+++ b/src/kv.rs
@@ -1,10 +1,19 @@
+use std::collections::HashMap;
+
use anyhow::{anyhow, Result};
use bytes::Bytes;
use log::*;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
-use crate::Consul;
+use crate::{Consul, WithIndex};
+
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
+#[serde(rename_all = "PascalCase")]
+pub struct KvGetPrefixEntry {
+ pub key: String,
+ pub value: String,
+}
impl Consul {
pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
@@ -37,6 +46,33 @@ impl Consul {
}
}
+ pub async fn kv_get_prefix(
+ &self,
+ key_prefix: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<HashMap<String, Bytes>>> {
+ debug!("kv_get_prefix {} index={:?}", key_prefix, last_index);
+ let results: WithIndex<Vec<KvGetPrefixEntry>> = self
+ .get_with_index(
+ format!(
+ "{}/v1/kv/{}{}?recurse",
+ self.url, self.kv_prefix, key_prefix
+ ),
+ last_index,
+ )
+ .await?;
+
+ let mut res = HashMap::new();
+ for ent in results.value {
+ res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?));
+ }
+
+ Ok(WithIndex {
+ value: res,
+ index: results.index,
+ })
+ }
+
pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
debug!("kv_put {}", key);
diff --git a/src/with_index.rs b/src/with_index.rs
index adce169..ba94779 100644
--- a/src/with_index.rs
+++ b/src/with_index.rs
@@ -1,14 +1,39 @@
-use std::fmt::{Debug, Display};
+use std::fmt::{Debug, Display, Write};
use anyhow::{bail, Result};
+use log::*;
use reqwest::Response;
+use serde::Deserialize;
+
+use crate::Consul;
+
+impl Consul {
+ pub(crate) async fn get_with_index<T: for<'de> Deserialize<'de>>(
+ &self,
+ mut url: String,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<T>> {
+ if let Some(i) = last_index {
+ if url.contains('?') {
+ write!(&mut url, "&index={}", i).unwrap();
+ } else {
+ write!(&mut url, "?index={}", i).unwrap();
+ }
+ }
+ debug!("GET {} as {}", url, std::any::type_name::<T>());
+
+ let http = self.client.get(&url).send().await?;
+
+ Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
+ }
+}
/// Wraps the returned value of an [API call with blocking
/// possibility](https://developer.hashicorp.com/consul/api-docs/features/blocking) with the
/// returned Consul index
pub struct WithIndex<T> {
- value: T,
- index: usize,
+ pub(crate) value: T,
+ pub(crate) index: usize,
}
impl<T> WithIndex<T> {