aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/generic_server.rs13
-rw-r--r--src/block/manager.rs4
-rw-r--r--src/block/repair.rs96
-rw-r--r--src/model/garage.rs13
-rw-r--r--src/util/forwarded_headers.rs63
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/web/web_server.rs15
7 files changed, 174 insertions, 31 deletions
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index aa90868a..d0354d28 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -19,6 +19,7 @@ use opentelemetry::{
};
use garage_util::error::Error as GarageError;
+use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
@@ -126,15 +127,9 @@ impl<A: ApiHandler> ApiServer<A> {
) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone();
- let has_forwarded_for_header = req.headers().contains_key("x-forwarded-for");
- if has_forwarded_for_header {
- let forwarded_for_ip_addr = &req
- .headers()
- .get("x-forwarded-for")
- .expect("Could not parse X-Forwarded-For header")
- .to_str()
- .unwrap_or_default();
-
+ if let Ok(forwarded_for_ip_addr) =
+ forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ {
info!(
"{} (via {}) {} {}",
forwarded_for_ip_addr,
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 051a9f93..26278974 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -152,6 +152,7 @@ impl BlockManager {
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager.scrub_persister.set_with(|_| ()).unwrap();
block_manager
}
@@ -185,6 +186,9 @@ impl BlockManager {
vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
});
+ vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
+ });
vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
p.get_with(|x| x.corruptions_detected)
});
diff --git a/src/block/repair.rs b/src/block/repair.rs
index d4593dbf..5476bf8a 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
+use rand::Rng;
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
@@ -19,8 +19,8 @@ use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
-// Full scrub every 30 days
-const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30);
+// Full scrub every 25 days with a random element of 10 days mixed in below
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25);
// Scrub tranquility is initially set to 4, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_SCRUB_TRANQUILITY: u32 = 4;
@@ -161,6 +161,50 @@ impl Worker for RepairWorker {
// and whose parameter (esp. speed) can be controlled at runtime.
// ---- ---- ----
+mod v081 {
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize)]
+ pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
+}
+
+mod v082 {
+ use serde::{Deserialize, Serialize};
+
+ use super::v081;
+
+ #[derive(Serialize, Deserialize)]
+ pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) time_next_run_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+ }
+
+ impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
+ type Previous = v081::ScrubWorkerPersisted;
+
+ fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted {
+ use crate::repair::randomize_next_scrub_run_time;
+
+ ScrubWorkerPersisted {
+ tranquility: old.tranquility,
+ time_last_complete_scrub: old.time_last_complete_scrub,
+ time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
+ corruptions_detected: old.corruptions_detected,
+ }
+ }
+ }
+}
+
+pub use v082::*;
+
pub struct ScrubWorker {
manager: Arc<BlockManager>,
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
@@ -171,17 +215,25 @@ pub struct ScrubWorker {
persister: PersisterShared<ScrubWorkerPersisted>,
}
-#[derive(Serialize, Deserialize)]
-pub struct ScrubWorkerPersisted {
- pub tranquility: u32,
- pub(crate) time_last_complete_scrub: u64,
- pub(crate) corruptions_detected: u64,
+fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
+ // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
+ // balance scrub load across different cluster nodes.
+
+ let next_run_timestamp = timestamp
+ + SCRUB_INTERVAL
+ .saturating_add(Duration::from_secs(
+ rand::thread_rng().gen_range(0..3600 * 24 * 10),
+ ))
+ .as_millis() as u64;
+
+ next_run_timestamp
}
-impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
+
impl Default for ScrubWorkerPersisted {
fn default() -> Self {
ScrubWorkerPersisted {
time_last_complete_scrub: 0,
+ time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
}
@@ -279,12 +331,13 @@ impl Worker for ScrubWorker {
}
fn status(&self) -> WorkerStatus {
- let (corruptions_detected, tranquility, time_last_complete_scrub) =
+ let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) =
self.persister.get_with(|p| {
(
p.corruptions_detected,
p.tranquility,
p.time_last_complete_scrub,
+ p.time_next_run_scrub,
)
});
@@ -302,10 +355,16 @@ impl Worker for ScrubWorker {
s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
}
ScrubWorkerState::Finished => {
- s.freeform = vec![format!(
- "Last scrub completed at {}",
- msec_to_rfc3339(time_last_complete_scrub)
- )];
+ s.freeform = vec![
+ format!(
+ "Last scrub completed at {}",
+ msec_to_rfc3339(time_last_complete_scrub),
+ ),
+ format!(
+ "Next scrub scheduled for {}",
+ msec_to_rfc3339(time_next_run_scrub)
+ ),
+ ];
}
}
s
@@ -334,8 +393,10 @@ impl Worker for ScrubWorker {
.tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- self.persister
- .set_with(|p| p.time_last_complete_scrub = now_msec())?;
+ self.persister.set_with(|p| {
+ p.time_last_complete_scrub = now_msec();
+ p.time_next_run_scrub = randomize_next_scrub_run_time(now_msec());
+ })?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
Ok(WorkerState::Idle)
@@ -350,8 +411,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
- self.persister.get_with(|p| p.time_last_complete_scrub)
- + SCRUB_INTERVAL.as_millis() as u64,
+ self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
),
};
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 0a9ec608..3daa1b33 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -136,9 +136,16 @@ impl Garage {
env_builder.flag(heed::flags::Flags::MdbNoSync);
env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
}
- let db = env_builder
- .open(&db_path)
- .ok_or_message("Unable to open LMDB DB")?;
+ let db = match env_builder.open(&db_path) {
+ Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
+ return Err(Error::Message(
+ "OutOfMemory error while trying to open LMDB database. This can happen \
+ if your operating system is not allowing you to use sufficient virtual \
+ memory address space. Please check that no limit is set (ulimit -v). \
+ On 32-bit machines, you should probably switch to another database engine.".into()))
+ }
+ x => x.ok_or_message("Unable to open LMDB DB")?,
+ };
db::lmdb_adapter::LmdbDb::init(db)
}
#[cfg(not(feature = "lmdb"))]
diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs
new file mode 100644
index 00000000..6ae275aa
--- /dev/null
+++ b/src/util/forwarded_headers.rs
@@ -0,0 +1,63 @@
+use http::{HeaderMap, HeaderValue};
+use std::net::IpAddr;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+
+pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
+ let forwarded_for_header = headers
+ .get("x-forwarded-for")
+ .ok_or_message("X-Forwarded-For header not provided")?;
+
+ let forwarded_for_ip_str = forwarded_for_header
+ .to_str()
+ .ok_or_message("Error parsing X-Forwarded-For header")?;
+
+ let client_ip = IpAddr::from_str(&forwarded_for_ip_str)
+ .ok_or_message("Valid IP address not found in X-Forwarded-For header")?;
+
+ Ok(client_ip.to_string())
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_handle_forwarded_for_headers_ipv4_client() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "192.0.2.100".parse().unwrap());
+
+ if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) {
+ assert_eq!(forwarded_ip, "192.0.2.100");
+ }
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_ipv6_client() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "2001:db8::f00d:cafe".parse().unwrap());
+
+ if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) {
+ assert_eq!(forwarded_ip, "2001:db8::f00d:cafe");
+ }
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_invalid_ip() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "www.example.com".parse().unwrap());
+
+ let result = handle_forwarded_for_headers(&test_headers);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_missing() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("Host", "www.deuxfleurs.fr".parse().unwrap());
+
+ let result = handle_forwarded_for_headers(&test_headers);
+ assert!(result.is_err());
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index d35ca72f..c9110fb2 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,6 +11,7 @@ pub mod data;
pub mod encode;
pub mod error;
pub mod formater;
+pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;
pub mod persister;
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 5719da54..0c7edf23 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -29,6 +29,7 @@ use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
+use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
struct WebMetrics {
@@ -104,7 +105,19 @@ impl WebServer {
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
- info!("{} {} {}", addr, req.method(), req.uri());
+ if let Ok(forwarded_for_ip_addr) =
+ forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ {
+ info!(
+ "{} (via {}) {} {}",
+ forwarded_for_ip_addr,
+ addr,
+ req.method(),
+ req.uri()
+ );
+ } else {
+ info!("{} {} {}", addr, req.method(), req.uri());
+ }
// Lots of instrumentation
let tracer = opentelemetry::global::tracer("garage");