summaryrefslogblamecommitdiff
path: root/src/kv.rs
blob: 5373ec696b16cd792af2edb825921bdf607f2f38 (plain) (tree)
1
2
3
4
5
6
7
8

                              





                                    







                                                              































                                                                                                   


























                                                                          
























                                                                                       
use std::collections::HashMap;

use anyhow::{anyhow, Result};
use bytes::Bytes;
use log::*;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};

use crate::{Consul, WithIndex};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct KvGetPrefixEntry {
    pub key: String,
    pub value: String,
}

impl Consul {
    pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
        debug!("kv_get {}", key);

        let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
        let http = self.client.get(&url).send().await?;
        match http.status() {
            StatusCode::OK => Ok(Some(http.bytes().await?)),
            StatusCode::NOT_FOUND => Ok(None),
            _ => Err(anyhow!(
                "Consul request failed: {:?}",
                http.error_for_status()
            )),
        }
    }

    pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
        debug!("kv_get_json {}", key);

        let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
        let http = self.client.get(&url).send().await?;
        match http.status() {
            StatusCode::OK => Ok(Some(http.json().await?)),
            StatusCode::NOT_FOUND => Ok(None),
            _ => Err(anyhow!(
                "Consul request failed: {:?}",
                http.error_for_status()
            )),
        }
    }

    pub async fn kv_get_prefix(
        &self,
        key_prefix: &str,
        last_index: Option<usize>,
    ) -> Result<WithIndex<HashMap<String, Bytes>>> {
        debug!("kv_get_prefix {} index={:?}", key_prefix, last_index);
        let results: WithIndex<Vec<KvGetPrefixEntry>> = self
            .get_with_index(
                format!(
                    "{}/v1/kv/{}{}?recurse",
                    self.url, self.kv_prefix, key_prefix
                ),
                last_index,
            )
            .await?;

        let mut res = HashMap::new();
        for ent in results.value {
            res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?));
        }

        Ok(WithIndex {
            value: res,
            index: results.index,
        })
    }

    pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
        debug!("kv_put {}", key);

        let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
        let http = self.client.put(&url).body(bytes).send().await?;
        http.error_for_status()?;
        Ok(())
    }

    pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
        debug!("kv_put_json {}", key);

        let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
        let http = self.client.put(&url).json(value).send().await?;
        http.error_for_status()?;
        Ok(())
    }

    pub async fn kv_delete(&self, key: &str) -> Result<()> {
        let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
        let http = self.client.delete(&url).send().await?;
        http.error_for_status()?;
        Ok(())
    }
}