From 45e12c3bcd1198bdfce3a4f7f000ef4177067a5c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 21 Apr 2023 13:10:41 +0200 Subject: add kv_get_prefix --- src/catalog.rs | 20 -------------------- src/kv.rs | 38 +++++++++++++++++++++++++++++++++++++- src/with_index.rs | 31 ++++++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/catalog.rs b/src/catalog.rs index 042f745..fe82c5a 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -5,7 +5,6 @@ use std::cmp; use std::collections::HashMap; -use std::fmt::Write; use std::sync::Arc; use std::time::Duration; @@ -178,25 +177,6 @@ impl Consul { rx } - - async fn get_with_index Deserialize<'de>>( - &self, - mut url: String, - last_index: Option, - ) -> Result> { - if let Some(i) = last_index { - if url.contains('?') { - write!(&mut url, "&index={}", i).unwrap(); - } else { - write!(&mut url, "?index={}", i).unwrap(); - } - } - debug!("GET {} as {}", url, std::any::type_name::()); - - let http = self.client.get(&url).send().await?; - - Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) - } } async fn do_watch_all_service_health( diff --git a/src/kv.rs b/src/kv.rs index 6c6f15a..5373ec6 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1,10 +1,19 @@ +use std::collections::HashMap; + use anyhow::{anyhow, Result}; use bytes::Bytes; use log::*; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use crate::Consul; +use crate::{Consul, WithIndex}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub struct KvGetPrefixEntry { + pub key: String, + pub value: String, +} impl Consul { pub async fn kv_get(&self, key: &str) -> Result> { @@ -37,6 +46,33 @@ impl Consul { } } + pub async fn kv_get_prefix( + &self, + key_prefix: &str, + last_index: Option, + ) -> Result>> { + debug!("kv_get_prefix {} index={:?}", key_prefix, last_index); + let results: WithIndex> = self + .get_with_index( + format!( + "{}/v1/kv/{}{}?recurse", + self.url, self.kv_prefix, key_prefix + ), + last_index, + ) + .await?; + + let mut res = HashMap::new(); + for ent in results.value { + res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?)); + } + + Ok(WithIndex { + value: res, + index: results.index, + }) + } + pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { debug!("kv_put {}", key); diff --git a/src/with_index.rs b/src/with_index.rs index adce169..ba94779 100644 --- a/src/with_index.rs +++ b/src/with_index.rs @@ -1,14 +1,39 @@ -use std::fmt::{Debug, Display}; +use std::fmt::{Debug, Display, Write}; use anyhow::{bail, Result}; +use log::*; use reqwest::Response; +use serde::Deserialize; + +use crate::Consul; + +impl Consul { + pub(crate) async fn get_with_index Deserialize<'de>>( + &self, + mut url: String, + last_index: Option, + ) -> Result> { + if let Some(i) = last_index { + if url.contains('?') { + write!(&mut url, "&index={}", i).unwrap(); + } else { + write!(&mut url, "?index={}", i).unwrap(); + } + } + debug!("GET {} as {}", url, std::any::type_name::()); + + let http = self.client.get(&url).send().await?; + + Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) + } +} /// Wraps the returned value of an [API call with blocking /// possibility](https://developer.hashicorp.com/consul/api-docs/features/blocking) with the /// returned Consul index pub struct WithIndex { - value: T, - index: usize, + pub(crate) value: T, + pub(crate) index: usize, } impl WithIndex { -- cgit v1.2.3