aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-18 20:17:54 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-18 20:19:30 +0100
commit0038ca8a78f147b9c0ec07ef0121773aaf110dc9 (patch)
tree43f39f30c63a6affa62eeea62cfec674f217c2b4 /src/model
parent81191d2d92e58ff82ace0f4d82b275c157673ade (diff)
parent1a0bffae3491fae6af5a8d4defc5c6b84839e197 (diff)
downloadgarage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.tar.gz
garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.zip
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/garage.rs28
-rw-r--r--src/model/lib.rs1
-rw-r--r--src/model/snapshot.rs136
4 files changed, 157 insertions, 9 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index a6bcfbe7..1e1ce0f7 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -29,6 +29,7 @@ err-derive.workspace = true
hex.workspace = true
http.workspace = true
base64.workspace = true
+parse_duration.workspace = true
tracing.workspace = true
rand.workspace = true
zstd.workspace = true
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 8987c594..4405d22d 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -170,14 +170,7 @@ impl Garage {
};
info!("Initialize block manager...");
- let block_manager = BlockManager::new(
- &db,
- config.data_dir.clone(),
- config.data_fsync,
- config.compression_level,
- data_rep_param,
- system.clone(),
- )?;
+ let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?;
block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----
@@ -278,7 +271,7 @@ impl Garage {
}))
}
- pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> {
self.block_manager.spawn_workers(bg);
self.bucket_table.spawn_workers(bg);
@@ -299,6 +292,23 @@ impl Garage {
#[cfg(feature = "k2v")]
self.k2v.spawn_workers(bg);
+
+ if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() {
+ let interval = parse_duration::parse(itv)
+ .ok_or_message("Invalid `metadata_auto_snapshot_interval`")?;
+ if interval < std::time::Duration::from_secs(600) {
+ return Err(Error::Message(
+ "metadata_auto_snapshot_interval too small or negative".into(),
+ ));
+ }
+
+ bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new(
+ self.clone(),
+ interval,
+ ));
+ }
+
+ Ok(())
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 2166105f..1939a7a9 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -15,3 +15,4 @@ pub mod s3;
pub mod garage;
pub mod helper;
+pub mod snapshot;
diff --git a/src/model/snapshot.rs b/src/model/snapshot.rs
new file mode 100644
index 00000000..36f9ec7d
--- /dev/null
+++ b/src/model/snapshot.rs
@@ -0,0 +1,136 @@
+use std::fs;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use rand::prelude::*;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::*;
+
+use crate::garage::Garage;
+
+// The two most recent snapshots are kept
+const KEEP_SNAPSHOTS: usize = 2;
+
+static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(());
+
+// ================ snapshotting logic =====================
+
+/// Run snashot_metadata in a blocking thread and async await on it
+pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
+ let garage = garage.clone();
+ let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
+ worker.await.unwrap()?;
+ Ok(())
+}
+
+/// Take a snapshot of the metadata database, and erase older
+/// snapshots if necessary.
+/// This is not an async function, it should be spawned on a thread pool
+pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> {
+ let lock = match SNAPSHOT_MUTEX.try_lock() {
+ Ok(lock) => lock,
+ Err(_) => {
+ return Err(Error::Message(
+ "Cannot acquire lock, another snapshot might be in progress".into(),
+ ))
+ }
+ };
+
+ let mut snapshots_dir = garage.config.metadata_dir.clone();
+ snapshots_dir.push("snapshots");
+ fs::create_dir_all(&snapshots_dir)?;
+
+ let mut new_path = snapshots_dir.clone();
+ new_path.push(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true));
+
+ info!("Snapshotting metadata db to {}", new_path.display());
+ garage.db.snapshot(&new_path)?;
+ info!("Metadata db snapshot finished");
+
+ if let Err(e) = cleanup_snapshots(&snapshots_dir) {
+ error!("Failed to do cleanup in snapshots directory: {}", e);
+ }
+
+ drop(lock);
+
+ Ok(())
+}
+
+fn cleanup_snapshots(snapshots_dir: &PathBuf) -> Result<(), Error> {
+ let mut snapshots =
+ fs::read_dir(&snapshots_dir)?.collect::<Result<Vec<fs::DirEntry>, std::io::Error>>()?;
+
+ snapshots.retain(|x| x.file_name().len() > 8);
+ snapshots.sort_by_key(|x| x.file_name());
+
+ for to_delete in snapshots.iter().rev().skip(KEEP_SNAPSHOTS) {
+ let path = snapshots_dir.join(to_delete.path());
+ if to_delete.metadata()?.file_type().is_dir() {
+ for file in fs::read_dir(&path)? {
+ let file = file?;
+ if file.metadata()?.is_file() {
+ fs::remove_file(path.join(file.path()))?;
+ }
+ }
+ std::fs::remove_dir(&path)?;
+ } else {
+ std::fs::remove_file(&path)?;
+ }
+ }
+ Ok(())
+}
+
+// ================ auto snapshot worker =====================
+
+pub struct AutoSnapshotWorker {
+ garage: Arc<Garage>,
+ next_snapshot: Instant,
+ snapshot_interval: Duration,
+}
+
+impl AutoSnapshotWorker {
+ pub(crate) fn new(garage: Arc<Garage>, snapshot_interval: Duration) -> Self {
+ Self {
+ garage,
+ snapshot_interval,
+ next_snapshot: Instant::now() + (snapshot_interval / 2),
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for AutoSnapshotWorker {
+ fn name(&self) -> String {
+ "Metadata snapshot worker".into()
+ }
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ freeform: vec![format!(
+ "Next snapshot: {}",
+ (chrono::Utc::now() + (self.next_snapshot - Instant::now())).to_rfc3339()
+ )],
+ ..Default::default()
+ }
+ }
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if Instant::now() < self.next_snapshot {
+ return Ok(WorkerState::Idle);
+ }
+
+ async_snapshot_metadata(&self.garage).await?;
+
+ let rand_factor = 1f32 + thread_rng().gen::<f32>() / 5f32;
+ self.next_snapshot = Instant::now() + self.snapshot_interval.mul_f32(rand_factor);
+
+ Ok(WorkerState::Idle)
+ }
+ async fn wait_for_work(&mut self) -> WorkerState {
+ tokio::time::sleep_until(self.next_snapshot.into()).await;
+ WorkerState::Busy
+ }
+}