diff options
author | Alex <alex@adnab.me> | 2023-01-04 13:47:42 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-04 13:47:42 +0000 |
commit | 329c0e64f9044511f1a0d46b1b3ed99bdd890630 (patch) | |
tree | 222ec61f03c5abcca6678239111e94ad4dcfeb67 /src/util | |
parent | 4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff) | |
parent | 29dbcb82780dcdb6f2a01a9da5122e70abaf93bf (diff) | |
download | garage-329c0e64f9044511f1a0d46b1b3ed99bdd890630.tar.gz garage-329c0e64f9044511f1a0d46b1b3ed99bdd890630.zip |
Merge pull request 'Improve `garage worker set` and add `garage worker get`' (#464) from worker-get into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/464
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/background/mod.rs | 1 | ||||
-rw-r--r-- | src/util/background/vars.rs | 113 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/persister.rs | 34 | ||||
-rw-r--r-- | src/util/token_bucket.rs | 40 |
5 files changed, 148 insertions, 41 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 41b48e93..607cd7a3 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,5 +1,6 @@ //! Job runner for futures and async functions +pub mod vars; pub mod worker; use std::collections::HashMap; diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs new file mode 100644 index 00000000..7a449c95 --- /dev/null +++ b/src/util/background/vars.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use crate::error::{Error, OkOrMessage}; +use crate::migrate::Migrate; +use crate::persister::PersisterShared; + +pub struct BgVars { + vars: HashMap<&'static str, Box<dyn BgVarTrait>>, +} + +impl BgVars { + pub fn new() -> Self { + Self { + vars: HashMap::new(), + } + } + + pub fn register_rw<V, T, GF, SF>( + &mut self, + p: &PersisterShared<V>, + name: &'static str, + get_fn: GF, + set_fn: SF, + ) where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static, + SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let p2 = p.clone(); + let set_fn = move |v| set_fn(&p2, v); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF) + where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name))); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn get(&self, var: &str) -> Result<String, Error> { + Ok(self + .vars + .get(var) + .ok_or_message("variable does not exist")? + .get()) + } + + pub fn get_all(&self) -> Vec<(&'static str, String)> { + self.vars.iter().map(|(k, v)| (*k, v.get())).collect() + } + + pub fn set(&self, var: &str, val: &str) -> Result<(), Error> { + self.vars + .get(var) + .ok_or_message("variable does not exist")? + .set(val) + } +} + +impl Default for BgVars { + fn default() -> Self { + Self::new() + } +} + +// ---- + +trait BgVarTrait: Send + Sync + 'static { + fn get(&self) -> String; + fn set(&self, v: &str) -> Result<(), Error>; +} + +struct BgVar<T, GF, SF> +where + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn() -> T + Send + Sync + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + get_fn: GF, + set_fn: SF, +} + +impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF> +where + T: FromStr + ToString + Sync + Send + 'static, + GF: Fn() -> T + Sync + Send + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + fn get(&self) -> String { + (self.get_fn)().to_string() + } + + fn set(&self, vstr: &str) -> Result<(), Error> { + let value = vstr + .parse() + .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?; + (self.set_fn)(value) + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index be82061f..d35ca72f 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -15,6 +15,5 @@ pub mod metrics; pub mod migrate; pub mod persister; pub mod time; -pub mod token_bucket; pub mod tranquilizer; pub mod version; diff --git a/src/util/persister.rs b/src/util/persister.rs index 4b9adf51..5c66bbed 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -1,5 +1,6 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> { Ok(()) } } + +pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>); + +impl<V: Migrate + Default> Clone for PersisterShared<V> { + fn clone(&self) -> PersisterShared<V> { + PersisterShared(self.0.clone()) + } +} + +impl<V: Migrate + Default> PersisterShared<V> { + pub fn new(base_dir: &Path, file_name: &str) -> Self { + let persister = Persister::new(base_dir, file_name); + let value = persister.load().unwrap_or_default(); + Self(Arc::new((persister, RwLock::new(value)))) + } + + pub fn get_with<F, R>(&self, f: F) -> R + where + F: FnOnce(&V) -> R, + { + let value = self.0 .1.read().unwrap(); + f(&value) + } + + pub fn set_with<F>(&self, f: F) -> Result<(), Error> + where + F: FnOnce(&mut V), + { + let mut value = self.0 .1.write().unwrap(); + f(&mut value); + self.0 .0.save(&value) + } +} diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs deleted file mode 100644 index cc0dfa1f..00000000 --- a/src/util/token_bucket.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::{Duration, Instant}; - -use tokio::time::sleep; - -pub struct TokenBucket { - // Replenish rate: number of tokens per second - replenish_rate: u64, - // Current number of tokens - tokens: u64, - // Last replenish time - last_replenish: Instant, -} - -impl TokenBucket { - pub fn new(replenish_rate: u64) -> Self { - Self { - replenish_rate, - tokens: 0, - last_replenish: Instant::now(), - } - } - - pub async fn take(&mut self, tokens: u64) { - while self.tokens < tokens { - let needed = tokens - self.tokens; - let delay = (needed as f64) / (self.replenish_rate as f64); - sleep(Duration::from_secs_f64(delay)).await; - self.replenish(); - } - self.tokens -= tokens; - } - - pub fn replenish(&mut self) { - let now = Instant::now(); - let new_tokens = - ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; - self.tokens += new_tokens; - self.last_replenish = now; - } -} |