aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-04 13:47:42 +0000
committerAlex <alex@adnab.me>2023-01-04 13:47:42 +0000
commit329c0e64f9044511f1a0d46b1b3ed99bdd890630 (patch)
tree222ec61f03c5abcca6678239111e94ad4dcfeb67 /src/util
parent4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff)
parent29dbcb82780dcdb6f2a01a9da5122e70abaf93bf (diff)
downloadgarage-329c0e64f9044511f1a0d46b1b3ed99bdd890630.tar.gz
garage-329c0e64f9044511f1a0d46b1b3ed99bdd890630.zip
Merge pull request 'Improve `garage worker set` and add `garage worker get`' (#464) from worker-get into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/464
Diffstat (limited to 'src/util')
-rw-r--r--src/util/background/mod.rs1
-rw-r--r--src/util/background/vars.rs113
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/persister.rs34
-rw-r--r--src/util/token_bucket.rs40
5 files changed, 148 insertions, 41 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 41b48e93..607cd7a3 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,5 +1,6 @@
//! Job runner for futures and async functions
+pub mod vars;
pub mod worker;
use std::collections::HashMap;
diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs
new file mode 100644
index 00000000..7a449c95
--- /dev/null
+++ b/src/util/background/vars.rs
@@ -0,0 +1,113 @@
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+use crate::migrate::Migrate;
+use crate::persister::PersisterShared;
+
+pub struct BgVars {
+ vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
+}
+
+impl BgVars {
+ pub fn new() -> Self {
+ Self {
+ vars: HashMap::new(),
+ }
+ }
+
+ pub fn register_rw<V, T, GF, SF>(
+ &mut self,
+ p: &PersisterShared<V>,
+ name: &'static str,
+ get_fn: GF,
+ set_fn: SF,
+ ) where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let p2 = p.clone();
+ let set_fn = move |v| set_fn(&p2, v);
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
+ where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn get(&self, var: &str) -> Result<String, Error> {
+ Ok(self
+ .vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .get())
+ }
+
+ pub fn get_all(&self) -> Vec<(&'static str, String)> {
+ self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
+ }
+
+ pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
+ self.vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .set(val)
+ }
+}
+
+impl Default for BgVars {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ----
+
+trait BgVarTrait: Send + Sync + 'static {
+ fn get(&self) -> String;
+ fn set(&self, v: &str) -> Result<(), Error>;
+}
+
+struct BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn() -> T + Send + Sync + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ get_fn: GF,
+ set_fn: SF,
+}
+
+impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Sync + Send + 'static,
+ GF: Fn() -> T + Sync + Send + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ fn get(&self) -> String {
+ (self.get_fn)().to_string()
+ }
+
+ fn set(&self, vstr: &str) -> Result<(), Error> {
+ let value = vstr
+ .parse()
+ .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
+ (self.set_fn)(value)
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index be82061f..d35ca72f 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -15,6 +15,5 @@ pub mod metrics;
pub mod migrate;
pub mod persister;
pub mod time;
-pub mod token_bucket;
pub mod tranquilizer;
pub mod version;
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 4b9adf51..5c66bbed 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -1,5 +1,6 @@
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
+use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> {
Ok(())
}
}
+
+pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
+
+impl<V: Migrate + Default> Clone for PersisterShared<V> {
+ fn clone(&self) -> PersisterShared<V> {
+ PersisterShared(self.0.clone())
+ }
+}
+
+impl<V: Migrate + Default> PersisterShared<V> {
+ pub fn new(base_dir: &Path, file_name: &str) -> Self {
+ let persister = Persister::new(base_dir, file_name);
+ let value = persister.load().unwrap_or_default();
+ Self(Arc::new((persister, RwLock::new(value))))
+ }
+
+ pub fn get_with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&V) -> R,
+ {
+ let value = self.0 .1.read().unwrap();
+ f(&value)
+ }
+
+ pub fn set_with<F>(&self, f: F) -> Result<(), Error>
+ where
+ F: FnOnce(&mut V),
+ {
+ let mut value = self.0 .1.write().unwrap();
+ f(&mut value);
+ self.0 .0.save(&value)
+ }
+}
diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs
deleted file mode 100644
index cc0dfa1f..00000000
--- a/src/util/token_bucket.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-use std::time::{Duration, Instant};
-
-use tokio::time::sleep;
-
-pub struct TokenBucket {
- // Replenish rate: number of tokens per second
- replenish_rate: u64,
- // Current number of tokens
- tokens: u64,
- // Last replenish time
- last_replenish: Instant,
-}
-
-impl TokenBucket {
- pub fn new(replenish_rate: u64) -> Self {
- Self {
- replenish_rate,
- tokens: 0,
- last_replenish: Instant::now(),
- }
- }
-
- pub async fn take(&mut self, tokens: u64) {
- while self.tokens < tokens {
- let needed = tokens - self.tokens;
- let delay = (needed as f64) / (self.replenish_rate as f64);
- sleep(Duration::from_secs_f64(delay)).await;
- self.replenish();
- }
- self.tokens -= tokens;
- }
-
- pub fn replenish(&mut self) {
- let now = Instant::now();
- let new_tokens =
- ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
- self.tokens += new_tokens;
- self.last_replenish = now;
- }
-}