aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml15
-rw-r--r--src/util/background/mod.rs1
-rw-r--r--src/util/background/vars.rs113
-rw-r--r--src/util/build.rs8
-rw-r--r--src/util/config.rs176
-rw-r--r--src/util/data.rs4
-rw-r--r--src/util/forwarded_headers.rs63
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/persister.rs34
-rw-r--r--src/util/time.rs2
-rw-r--r--src/util/token_bucket.rs40
-rw-r--r--src/util/version.rs4
12 files changed, 411 insertions, 51 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 0468b7f4..387471ed 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,11 +14,11 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
+garage_db = { version = "0.8.2", path = "../db" }
arc-swap = "1.0"
async-trait = "0.1"
-blake2 = "0.9"
+blake2 = "0.10"
bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
@@ -27,7 +27,7 @@ hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
sha2 = "0.10"
@@ -35,7 +35,7 @@ chrono = "0.4"
rmp-serde = "1.1"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
-toml = "0.5"
+toml = "0.6"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
@@ -47,6 +47,11 @@ hyper = "0.14"
opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] }
+[build-dependencies]
+rustc_version = "0.4.0"
+
+[dev-dependencies]
+mktemp = "0.5"
[features]
k2v = []
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 41b48e93..607cd7a3 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,5 +1,6 @@
//! Job runner for futures and async functions
+pub mod vars;
pub mod worker;
use std::collections::HashMap;
diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs
new file mode 100644
index 00000000..7a449c95
--- /dev/null
+++ b/src/util/background/vars.rs
@@ -0,0 +1,113 @@
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+use crate::migrate::Migrate;
+use crate::persister::PersisterShared;
+
+pub struct BgVars {
+ vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
+}
+
+impl BgVars {
+ pub fn new() -> Self {
+ Self {
+ vars: HashMap::new(),
+ }
+ }
+
+ pub fn register_rw<V, T, GF, SF>(
+ &mut self,
+ p: &PersisterShared<V>,
+ name: &'static str,
+ get_fn: GF,
+ set_fn: SF,
+ ) where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let p2 = p.clone();
+ let set_fn = move |v| set_fn(&p2, v);
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
+ where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn get(&self, var: &str) -> Result<String, Error> {
+ Ok(self
+ .vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .get())
+ }
+
+ pub fn get_all(&self) -> Vec<(&'static str, String)> {
+ self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
+ }
+
+ pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
+ self.vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .set(val)
+ }
+}
+
+impl Default for BgVars {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ----
+
+trait BgVarTrait: Send + Sync + 'static {
+ fn get(&self) -> String;
+ fn set(&self, v: &str) -> Result<(), Error>;
+}
+
+struct BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn() -> T + Send + Sync + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ get_fn: GF,
+ set_fn: SF,
+}
+
+impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Sync + Send + 'static,
+ GF: Fn() -> T + Sync + Send + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ fn get(&self) -> String {
+ (self.get_fn)().to_string()
+ }
+
+ fn set(&self, vstr: &str) -> Result<(), Error> {
+ let value = vstr
+ .parse()
+ .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
+ (self.set_fn)(value)
+ }
+}
diff --git a/src/util/build.rs b/src/util/build.rs
new file mode 100644
index 00000000..a4e955b8
--- /dev/null
+++ b/src/util/build.rs
@@ -0,0 +1,8 @@
+use rustc_version::version;
+
+fn main() {
+ // Acquire the version of Rust used to compile, this is added as a label to
+ // the garage_build_info metric.
+ let v = version().unwrap();
+ println!("cargo:rustc-env=RUSTC_VERSION={v}");
+}
diff --git a/src/util/config.rs b/src/util/config.rs
index 04f8375a..2176353e 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -34,7 +34,9 @@ pub struct Config {
pub compression_level: Option<i32>,
/// RPC secret key: 32 bytes hex encoded
- pub rpc_secret: String,
+ pub rpc_secret: Option<String>,
+ /// Optional file where RPC secret key is read from
+ pub rpc_secret_file: Option<String>,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
@@ -118,10 +120,17 @@ pub struct WebConfig {
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
+
/// Bearer token to use to scrape metrics
pub metrics_token: Option<String>,
+ /// File to read metrics token from
+ pub metrics_token_file: Option<String>,
+
/// Bearer token to use to access Admin API endpoints
pub admin_token: Option<String>,
+ /// File to read admin token from
+ pub admin_token_file: Option<String>,
+
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}
@@ -177,7 +186,57 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut config = String::new();
file.read_to_string(&mut config)?;
- Ok(toml::from_str(&config)?)
+ let mut parsed_config: Config = toml::from_str(&config)?;
+
+ secret_from_file(
+ &mut parsed_config.rpc_secret,
+ &parsed_config.rpc_secret_file,
+ "rpc_secret",
+ )?;
+ secret_from_file(
+ &mut parsed_config.admin.metrics_token,
+ &parsed_config.admin.metrics_token_file,
+ "admin.metrics_token",
+ )?;
+ secret_from_file(
+ &mut parsed_config.admin.admin_token,
+ &parsed_config.admin.admin_token_file,
+ "admin.admin_token",
+ )?;
+
+ Ok(parsed_config)
+}
+
+fn secret_from_file(
+ secret: &mut Option<String>,
+ secret_file: &Option<String>,
+ name: &'static str,
+) -> Result<(), Error> {
+ match (&secret, &secret_file) {
+ (_, None) => {
+ // no-op
+ }
+ (Some(_), Some(_)) => {
+ return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into());
+ }
+ (None, Some(file_path)) => {
+ #[cfg(unix)]
+ if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") {
+ use std::os::unix::fs::MetadataExt;
+ let metadata = std::fs::metadata(&file_path)?;
+ if metadata.mode() & 0o077 != 0 {
+ return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into());
+ }
+ }
+ let mut file = std::fs::OpenOptions::new().read(true).open(file_path)?;
+ let mut secret_buf = String::new();
+ file.read_to_string(&mut secret_buf)?;
+ // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`.
+ // also editors sometimes add a trailing newline
+ *secret = Some(String::from(secret_buf.trim_end()));
+ }
+ }
+ Ok(())
}
fn default_compression() -> Option<i32> {
@@ -233,3 +292,116 @@ where
deserializer.deserialize_any(OptionVisitor)
}
+
+#[cfg(test)]
+mod tests {
+ use crate::error::Error;
+ use std::fs::File;
+ use std::io::Write;
+
+ #[test]
+ fn test_rpc_secret() -> Result<(), Error> {
+ let path2 = mktemp::Temp::new_file()?;
+ let mut file2 = File::create(path2.as_path())?;
+ writeln!(
+ file2,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret = "foo"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+
+ let config = super::read_config(path2.to_path_buf())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+ drop(path2);
+ drop(file2);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_rpc_secret_file_works() -> Result<(), Error> {
+ let path_secret = mktemp::Temp::new_file()?;
+ let mut file_secret = File::create(path_secret.as_path())?;
+ writeln!(file_secret, "foo")?;
+ drop(file_secret);
+
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ let path_secret_path = path_secret.as_path();
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret_file = "{}"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#,
+ path_secret_path.display()
+ )?;
+ let config = super::read_config(path_config.to_path_buf())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::PermissionsExt;
+ let metadata = std::fs::metadata(&path_secret_path)?;
+ let mut perm = metadata.permissions();
+ perm.set_mode(0o660);
+ std::fs::set_permissions(&path_secret_path, perm)?;
+
+ std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "false");
+ assert!(super::read_config(path_config.to_path_buf()).is_err());
+
+ std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "true");
+ assert!(super::read_config(path_config.to_path_buf()).is_ok());
+ }
+
+ drop(path_config);
+ drop(path_secret);
+ drop(file_config);
+ Ok(())
+ }
+
+ #[test]
+ fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> {
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret= "dummy"
+ rpc_secret_file = "dummy"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+ assert_eq!(
+ "only one of `rpc_secret` and `rpc_secret_file` can be set",
+ super::read_config(path_config.to_path_buf())
+ .unwrap_err()
+ .to_string()
+ );
+ drop(path_config);
+ drop(file_config);
+ Ok(())
+ }
+}
diff --git a/src/util/data.rs b/src/util/data.rs
index 3f61e301..bdd8daee 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -115,9 +115,9 @@ pub fn sha256sum(data: &[u8]) -> Hash {
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash {
- use blake2::{Blake2b, Digest};
+ use blake2::{Blake2b512, Digest};
- let mut hasher = Blake2b::new();
+ let mut hasher = Blake2b512::new();
hasher.update(data);
let mut hash = [0u8; 32];
hash.copy_from_slice(&hasher.finalize()[..32]);
diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs
new file mode 100644
index 00000000..6ae275aa
--- /dev/null
+++ b/src/util/forwarded_headers.rs
@@ -0,0 +1,63 @@
+use http::{HeaderMap, HeaderValue};
+use std::net::IpAddr;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+
+pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
+ let forwarded_for_header = headers
+ .get("x-forwarded-for")
+ .ok_or_message("X-Forwarded-For header not provided")?;
+
+ let forwarded_for_ip_str = forwarded_for_header
+ .to_str()
+ .ok_or_message("Error parsing X-Forwarded-For header")?;
+
+ let client_ip = IpAddr::from_str(&forwarded_for_ip_str)
+ .ok_or_message("Valid IP address not found in X-Forwarded-For header")?;
+
+ Ok(client_ip.to_string())
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_handle_forwarded_for_headers_ipv4_client() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "192.0.2.100".parse().unwrap());
+
+ if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) {
+ assert_eq!(forwarded_ip, "192.0.2.100");
+ }
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_ipv6_client() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "2001:db8::f00d:cafe".parse().unwrap());
+
+ if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) {
+ assert_eq!(forwarded_ip, "2001:db8::f00d:cafe");
+ }
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_invalid_ip() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "www.example.com".parse().unwrap());
+
+ let result = handle_forwarded_for_headers(&test_headers);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_missing() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("Host", "www.deuxfleurs.fr".parse().unwrap());
+
+ let result = handle_forwarded_for_headers(&test_headers);
+ assert!(result.is_err());
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index be82061f..c9110fb2 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,10 +11,10 @@ pub mod data;
pub mod encode;
pub mod error;
pub mod formater;
+pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;
pub mod persister;
pub mod time;
-pub mod token_bucket;
pub mod tranquilizer;
pub mod version;
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 4b9adf51..5c66bbed 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -1,5 +1,6 @@
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
+use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> {
Ok(())
}
}
+
+pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
+
+impl<V: Migrate + Default> Clone for PersisterShared<V> {
+ fn clone(&self) -> PersisterShared<V> {
+ PersisterShared(self.0.clone())
+ }
+}
+
+impl<V: Migrate + Default> PersisterShared<V> {
+ pub fn new(base_dir: &Path, file_name: &str) -> Self {
+ let persister = Persister::new(base_dir, file_name);
+ let value = persister.load().unwrap_or_default();
+ Self(Arc::new((persister, RwLock::new(value))))
+ }
+
+ pub fn get_with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&V) -> R,
+ {
+ let value = self.0 .1.read().unwrap();
+ f(&value)
+ }
+
+ pub fn set_with<F>(&self, f: F) -> Result<(), Error>
+ where
+ F: FnOnce(&mut V),
+ {
+ let mut value = self.0 .1.write().unwrap();
+ f(&mut value);
+ self.0 .0.save(&value)
+ }
+}
diff --git a/src/util/time.rs b/src/util/time.rs
index 257b4d2a..42f41a44 100644
--- a/src/util/time.rs
+++ b/src/util/time.rs
@@ -25,6 +25,6 @@ pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 {
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
- let timestamp = Utc.timestamp(secs, nanos);
+ let timestamp = Utc.timestamp_opt(secs, nanos).unwrap();
timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)
}
diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs
deleted file mode 100644
index cc0dfa1f..00000000
--- a/src/util/token_bucket.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-use std::time::{Duration, Instant};
-
-use tokio::time::sleep;
-
-pub struct TokenBucket {
- // Replenish rate: number of tokens per second
- replenish_rate: u64,
- // Current number of tokens
- tokens: u64,
- // Last replenish time
- last_replenish: Instant,
-}
-
-impl TokenBucket {
- pub fn new(replenish_rate: u64) -> Self {
- Self {
- replenish_rate,
- tokens: 0,
- last_replenish: Instant::now(),
- }
- }
-
- pub async fn take(&mut self, tokens: u64) {
- while self.tokens < tokens {
- let needed = tokens - self.tokens;
- let delay = (needed as f64) / (self.replenish_rate as f64);
- sleep(Duration::from_secs_f64(delay)).await;
- self.replenish();
- }
- self.tokens -= tokens;
- }
-
- pub fn replenish(&mut self) {
- let now = Instant::now();
- let new_tokens =
- ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
- self.tokens += new_tokens;
- self.last_replenish = now;
- }
-}
diff --git a/src/util/version.rs b/src/util/version.rs
index b515dccc..2b2ea271 100644
--- a/src/util/version.rs
+++ b/src/util/version.rs
@@ -26,3 +26,7 @@ pub fn init_version(version: &'static str) {
pub fn init_features(features: &'static [&'static str]) {
FEATURES.store(Some(Arc::new(features)));
}
+
+pub fn rust_version() -> &'static str {
+ env!("RUSTC_VERSION")
+}