diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 15 | ||||
-rw-r--r-- | src/util/background/mod.rs | 1 | ||||
-rw-r--r-- | src/util/background/vars.rs | 113 | ||||
-rw-r--r-- | src/util/build.rs | 8 | ||||
-rw-r--r-- | src/util/config.rs | 176 | ||||
-rw-r--r-- | src/util/data.rs | 4 | ||||
-rw-r--r-- | src/util/forwarded_headers.rs | 63 | ||||
-rw-r--r-- | src/util/lib.rs | 2 | ||||
-rw-r--r-- | src/util/persister.rs | 34 | ||||
-rw-r--r-- | src/util/time.rs | 2 | ||||
-rw-r--r-- | src/util/token_bucket.rs | 40 | ||||
-rw-r--r-- | src/util/version.rs | 4 |
12 files changed, 411 insertions, 51 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 0468b7f4..387471ed 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_util" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,11 +14,11 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } +garage_db = { version = "0.8.2", path = "../db" } arc-swap = "1.0" async-trait = "0.1" -blake2 = "0.9" +blake2 = "0.10" bytes = "1.0" digest = "0.10" err-derive = "0.3" @@ -27,7 +27,7 @@ hexdump = "0.1" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" lazy_static = "1.4" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" sha2 = "0.10" @@ -35,7 +35,7 @@ chrono = "0.4" rmp-serde = "1.1" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_json = "1.0" -toml = "0.5" +toml = "0.6" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } @@ -47,6 +47,11 @@ hyper = "0.14" opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] } +[build-dependencies] +rustc_version = "0.4.0" + +[dev-dependencies] +mktemp = "0.5" [features] k2v = [] diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 41b48e93..607cd7a3 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,5 +1,6 @@ //! Job runner for futures and async functions +pub mod vars; pub mod worker; use std::collections::HashMap; diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs new file mode 100644 index 00000000..7a449c95 --- /dev/null +++ b/src/util/background/vars.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use crate::error::{Error, OkOrMessage}; +use crate::migrate::Migrate; +use crate::persister::PersisterShared; + +pub struct BgVars { + vars: HashMap<&'static str, Box<dyn BgVarTrait>>, +} + +impl BgVars { + pub fn new() -> Self { + Self { + vars: HashMap::new(), + } + } + + pub fn register_rw<V, T, GF, SF>( + &mut self, + p: &PersisterShared<V>, + name: &'static str, + get_fn: GF, + set_fn: SF, + ) where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static, + SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let p2 = p.clone(); + let set_fn = move |v| set_fn(&p2, v); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF) + where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name))); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn get(&self, var: &str) -> Result<String, Error> { + Ok(self + .vars + .get(var) + .ok_or_message("variable does not exist")? + .get()) + } + + pub fn get_all(&self) -> Vec<(&'static str, String)> { + self.vars.iter().map(|(k, v)| (*k, v.get())).collect() + } + + pub fn set(&self, var: &str, val: &str) -> Result<(), Error> { + self.vars + .get(var) + .ok_or_message("variable does not exist")? + .set(val) + } +} + +impl Default for BgVars { + fn default() -> Self { + Self::new() + } +} + +// ---- + +trait BgVarTrait: Send + Sync + 'static { + fn get(&self) -> String; + fn set(&self, v: &str) -> Result<(), Error>; +} + +struct BgVar<T, GF, SF> +where + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn() -> T + Send + Sync + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + get_fn: GF, + set_fn: SF, +} + +impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF> +where + T: FromStr + ToString + Sync + Send + 'static, + GF: Fn() -> T + Sync + Send + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + fn get(&self) -> String { + (self.get_fn)().to_string() + } + + fn set(&self, vstr: &str) -> Result<(), Error> { + let value = vstr + .parse() + .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?; + (self.set_fn)(value) + } +} diff --git a/src/util/build.rs b/src/util/build.rs new file mode 100644 index 00000000..a4e955b8 --- /dev/null +++ b/src/util/build.rs @@ -0,0 +1,8 @@ +use rustc_version::version; + +fn main() { + // Acquire the version of Rust used to compile, this is added as a label to + // the garage_build_info metric. + let v = version().unwrap(); + println!("cargo:rustc-env=RUSTC_VERSION={v}"); +} diff --git a/src/util/config.rs b/src/util/config.rs index 04f8375a..2176353e 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -34,7 +34,9 @@ pub struct Config { pub compression_level: Option<i32>, /// RPC secret key: 32 bytes hex encoded - pub rpc_secret: String, + pub rpc_secret: Option<String>, + /// Optional file where RPC secret key is read from + pub rpc_secret_file: Option<String>, /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, @@ -118,10 +120,17 @@ pub struct WebConfig { pub struct AdminConfig { /// Address and port to bind for admin API serving pub api_bind_addr: Option<SocketAddr>, + /// Bearer token to use to scrape metrics pub metrics_token: Option<String>, + /// File to read metrics token from + pub metrics_token_file: Option<String>, + /// Bearer token to use to access Admin API endpoints pub admin_token: Option<String>, + /// File to read admin token from + pub admin_token_file: Option<String>, + /// OTLP server to where to export traces pub trace_sink: Option<String>, } @@ -177,7 +186,57 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { let mut config = String::new(); file.read_to_string(&mut config)?; - Ok(toml::from_str(&config)?) + let mut parsed_config: Config = toml::from_str(&config)?; + + secret_from_file( + &mut parsed_config.rpc_secret, + &parsed_config.rpc_secret_file, + "rpc_secret", + )?; + secret_from_file( + &mut parsed_config.admin.metrics_token, + &parsed_config.admin.metrics_token_file, + "admin.metrics_token", + )?; + secret_from_file( + &mut parsed_config.admin.admin_token, + &parsed_config.admin.admin_token_file, + "admin.admin_token", + )?; + + Ok(parsed_config) +} + +fn secret_from_file( + secret: &mut Option<String>, + secret_file: &Option<String>, + name: &'static str, +) -> Result<(), Error> { + match (&secret, &secret_file) { + (_, None) => { + // no-op + } + (Some(_), Some(_)) => { + return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into()); + } + (None, Some(file_path)) => { + #[cfg(unix)] + if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") { + use std::os::unix::fs::MetadataExt; + let metadata = std::fs::metadata(&file_path)?; + if metadata.mode() & 0o077 != 0 { + return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into()); + } + } + let mut file = std::fs::OpenOptions::new().read(true).open(file_path)?; + let mut secret_buf = String::new(); + file.read_to_string(&mut secret_buf)?; + // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`. + // also editors sometimes add a trailing newline + *secret = Some(String::from(secret_buf.trim_end())); + } + } + Ok(()) } fn default_compression() -> Option<i32> { @@ -233,3 +292,116 @@ where deserializer.deserialize_any(OptionVisitor) } + +#[cfg(test)] +mod tests { + use crate::error::Error; + use std::fs::File; + use std::io::Write; + + #[test] + fn test_rpc_secret() -> Result<(), Error> { + let path2 = mktemp::Temp::new_file()?; + let mut file2 = File::create(path2.as_path())?; + writeln!( + file2, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret = "foo" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "# + )?; + + let config = super::read_config(path2.to_path_buf())?; + assert_eq!("foo", config.rpc_secret.unwrap()); + drop(path2); + drop(file2); + + Ok(()) + } + + #[test] + fn test_rpc_secret_file_works() -> Result<(), Error> { + let path_secret = mktemp::Temp::new_file()?; + let mut file_secret = File::create(path_secret.as_path())?; + writeln!(file_secret, "foo")?; + drop(file_secret); + + let path_config = mktemp::Temp::new_file()?; + let mut file_config = File::create(path_config.as_path())?; + let path_secret_path = path_secret.as_path(); + writeln!( + file_config, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret_file = "{}" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "#, + path_secret_path.display() + )?; + let config = super::read_config(path_config.to_path_buf())?; + assert_eq!("foo", config.rpc_secret.unwrap()); + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let metadata = std::fs::metadata(&path_secret_path)?; + let mut perm = metadata.permissions(); + perm.set_mode(0o660); + std::fs::set_permissions(&path_secret_path, perm)?; + + std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "false"); + assert!(super::read_config(path_config.to_path_buf()).is_err()); + + std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "true"); + assert!(super::read_config(path_config.to_path_buf()).is_ok()); + } + + drop(path_config); + drop(path_secret); + drop(file_config); + Ok(()) + } + + #[test] + fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> { + let path_config = mktemp::Temp::new_file()?; + let mut file_config = File::create(path_config.as_path())?; + writeln!( + file_config, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret= "dummy" + rpc_secret_file = "dummy" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "# + )?; + assert_eq!( + "only one of `rpc_secret` and `rpc_secret_file` can be set", + super::read_config(path_config.to_path_buf()) + .unwrap_err() + .to_string() + ); + drop(path_config); + drop(file_config); + Ok(()) + } +} diff --git a/src/util/data.rs b/src/util/data.rs index 3f61e301..bdd8daee 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -115,9 +115,9 @@ pub fn sha256sum(data: &[u8]) -> Hash { /// Compute the blake2 of a slice pub fn blake2sum(data: &[u8]) -> Hash { - use blake2::{Blake2b, Digest}; + use blake2::{Blake2b512, Digest}; - let mut hasher = Blake2b::new(); + let mut hasher = Blake2b512::new(); hasher.update(data); let mut hash = [0u8; 32]; hash.copy_from_slice(&hasher.finalize()[..32]); diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs new file mode 100644 index 00000000..6ae275aa --- /dev/null +++ b/src/util/forwarded_headers.rs @@ -0,0 +1,63 @@ +use http::{HeaderMap, HeaderValue}; +use std::net::IpAddr; +use std::str::FromStr; + +use crate::error::{Error, OkOrMessage}; + +pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { + let forwarded_for_header = headers + .get("x-forwarded-for") + .ok_or_message("X-Forwarded-For header not provided")?; + + let forwarded_for_ip_str = forwarded_for_header + .to_str() + .ok_or_message("Error parsing X-Forwarded-For header")?; + + let client_ip = IpAddr::from_str(&forwarded_for_ip_str) + .ok_or_message("Valid IP address not found in X-Forwarded-For header")?; + + Ok(client_ip.to_string()) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_handle_forwarded_for_headers_ipv4_client() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("X-Forwarded-For", "192.0.2.100".parse().unwrap()); + + if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) { + assert_eq!(forwarded_ip, "192.0.2.100"); + } + } + + #[test] + fn test_handle_forwarded_for_headers_ipv6_client() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("X-Forwarded-For", "2001:db8::f00d:cafe".parse().unwrap()); + + if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) { + assert_eq!(forwarded_ip, "2001:db8::f00d:cafe"); + } + } + + #[test] + fn test_handle_forwarded_for_headers_invalid_ip() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("X-Forwarded-For", "www.example.com".parse().unwrap()); + + let result = handle_forwarded_for_headers(&test_headers); + assert!(result.is_err()); + } + + #[test] + fn test_handle_forwarded_for_headers_missing() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("Host", "www.deuxfleurs.fr".parse().unwrap()); + + let result = handle_forwarded_for_headers(&test_headers); + assert!(result.is_err()); + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index be82061f..c9110fb2 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,10 +11,10 @@ pub mod data; pub mod encode; pub mod error; pub mod formater; +pub mod forwarded_headers; pub mod metrics; pub mod migrate; pub mod persister; pub mod time; -pub mod token_bucket; pub mod tranquilizer; pub mod version; diff --git a/src/util/persister.rs b/src/util/persister.rs index 4b9adf51..5c66bbed 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -1,5 +1,6 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> { Ok(()) } } + +pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>); + +impl<V: Migrate + Default> Clone for PersisterShared<V> { + fn clone(&self) -> PersisterShared<V> { + PersisterShared(self.0.clone()) + } +} + +impl<V: Migrate + Default> PersisterShared<V> { + pub fn new(base_dir: &Path, file_name: &str) -> Self { + let persister = Persister::new(base_dir, file_name); + let value = persister.load().unwrap_or_default(); + Self(Arc::new((persister, RwLock::new(value)))) + } + + pub fn get_with<F, R>(&self, f: F) -> R + where + F: FnOnce(&V) -> R, + { + let value = self.0 .1.read().unwrap(); + f(&value) + } + + pub fn set_with<F>(&self, f: F) -> Result<(), Error> + where + F: FnOnce(&mut V), + { + let mut value = self.0 .1.write().unwrap(); + f(&mut value); + self.0 .0.save(&value) + } +} diff --git a/src/util/time.rs b/src/util/time.rs index 257b4d2a..42f41a44 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -25,6 +25,6 @@ pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 { pub fn msec_to_rfc3339(msecs: u64) -> String { let secs = msecs as i64 / 1000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; - let timestamp = Utc.timestamp(secs, nanos); + let timestamp = Utc.timestamp_opt(secs, nanos).unwrap(); timestamp.to_rfc3339_opts(SecondsFormat::Millis, true) } diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs deleted file mode 100644 index cc0dfa1f..00000000 --- a/src/util/token_bucket.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::{Duration, Instant}; - -use tokio::time::sleep; - -pub struct TokenBucket { - // Replenish rate: number of tokens per second - replenish_rate: u64, - // Current number of tokens - tokens: u64, - // Last replenish time - last_replenish: Instant, -} - -impl TokenBucket { - pub fn new(replenish_rate: u64) -> Self { - Self { - replenish_rate, - tokens: 0, - last_replenish: Instant::now(), - } - } - - pub async fn take(&mut self, tokens: u64) { - while self.tokens < tokens { - let needed = tokens - self.tokens; - let delay = (needed as f64) / (self.replenish_rate as f64); - sleep(Duration::from_secs_f64(delay)).await; - self.replenish(); - } - self.tokens -= tokens; - } - - pub fn replenish(&mut self) { - let now = Instant::now(); - let new_tokens = - ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; - self.tokens += new_tokens; - self.last_replenish = now; - } -} diff --git a/src/util/version.rs b/src/util/version.rs index b515dccc..2b2ea271 100644 --- a/src/util/version.rs +++ b/src/util/version.rs @@ -26,3 +26,7 @@ pub fn init_version(version: &'static str) { pub fn init_features(features: &'static [&'static str]) { FEATURES.store(Some(Arc::new(features))); } + +pub fn rust_version() -> &'static str { + env!("RUSTC_VERSION") +} |