summaryrefslogtreecommitdiff
path: root/src/locking.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/locking.rs')
-rw-r--r--src/locking.rs65
1 files changed, 65 insertions, 0 deletions
diff --git a/src/locking.rs b/src/locking.rs
new file mode 100644
index 0000000..12e9ac0
--- /dev/null
+++ b/src/locking.rs
@@ -0,0 +1,65 @@
+use anyhow::Result;
+use bytes::Bytes;
+use log::*;
+use serde::{Deserialize, Serialize};
+
+use crate::Consul;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionRequest {
+ #[serde(rename = "Name")]
+ pub name: String,
+
+ #[serde(rename = "Node")]
+ pub node: Option<String>,
+
+ #[serde(rename = "LockDelay")]
+ pub lock_delay: Option<String>,
+
+ #[serde(rename = "TTL")]
+ pub ttl: Option<String>,
+
+ #[serde(rename = "Behavior")]
+ pub behavior: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionResponse {
+ #[serde(rename = "ID")]
+ pub id: String,
+}
+
+impl Consul {
+ pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
+ debug!("create_session {:?}", req);
+
+ let url = format!("{}/v1/session/create", self.url);
+ let http = self.client.put(&url).json(req).send().await?;
+ let resp: ConsulSessionResponse = http.json().await?;
+ Ok(resp.id)
+ }
+
+ pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
+ debug!("acquire {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?acquire={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ let resp: bool = http.json().await?;
+ Ok(resp)
+ }
+
+ pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
+ debug!("release {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?release={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
+}