aboutsummaryrefslogtreecommitdiff
path: root/src/consul.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/consul.rs')
-rw-r--r--src/consul.rs70
1 files changed, 69 insertions, 1 deletions
diff --git a/src/consul.rs b/src/consul.rs
index 5ef96c5..6fc031c 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -26,21 +26,52 @@ pub struct ConsulNodeCatalog {
pub services: HashMap<String, ConsulServiceEntry>,
}
+// ---- Consul session management ----
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionRequest {
+ #[serde(rename = "Name")]
+ pub name: String,
+
+ #[serde(rename = "Node")]
+ pub node: Option<String>,
+
+ #[serde(rename = "LockDelay")]
+ pub lock_delay: Option<String>,
+
+ #[serde(rename = "TTL")]
+ pub ttl: Option<String>,
+
+ #[serde(rename = "Behavior")]
+ pub behavior: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionResponse {
+ #[serde(rename = "ID")]
+ pub id: String,
+}
+
#[derive(Clone)]
pub struct Consul {
client: reqwest::Client,
+
url: String,
kv_prefix: String,
+
idx: Option<u64>,
+
+ pub local_node: String,
}
impl Consul {
- pub fn new(url: &str, kv_prefix: &str) -> Self {
+ pub fn new(url: &str, kv_prefix: &str, local_node: &str) -> Self {
return Self {
client: reqwest::Client::new(),
url: url.to_string(),
kv_prefix: kv_prefix.to_string(),
idx: None,
+ local_node: local_node.into(),
};
}
@@ -64,6 +95,8 @@ impl Consul {
return Ok(resp);
}
+ // ---- KV get and put ----
+
pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
debug!("kv_get {}", key);
@@ -118,4 +151,39 @@ impl Consul {
http.error_for_status()?;
Ok(())
}
+
+ // ---- Locking ----
+
+ pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
+ debug!("create_session {:?}", req);
+
+ let url = format!("{}/v1/session/create", self.url);
+ let http = self.client.put(&url).json(req).send().await?;
+ let resp: ConsulSessionResponse = http.json().await?;
+ Ok(resp.id)
+ }
+
+ pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
+ debug!("acquire {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?acquire={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ let resp: bool = http.json().await?;
+ Ok(resp)
+ }
+
+ pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
+ debug!("release {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?release={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
}