diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-18 20:17:54 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-18 20:19:30 +0100 |
commit | 0038ca8a78f147b9c0ec07ef0121773aaf110dc9 (patch) | |
tree | 43f39f30c63a6affa62eeea62cfec674f217c2b4 /src/model | |
parent | 81191d2d92e58ff82ace0f4d82b275c157673ade (diff) | |
parent | 1a0bffae3491fae6af5a8d4defc5c6b84839e197 (diff) | |
download | garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.tar.gz garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/garage.rs | 28 | ||||
-rw-r--r-- | src/model/lib.rs | 1 | ||||
-rw-r--r-- | src/model/snapshot.rs | 136 |
4 files changed, 157 insertions, 9 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index a6bcfbe7..1e1ce0f7 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -29,6 +29,7 @@ err-derive.workspace = true hex.workspace = true http.workspace = true base64.workspace = true +parse_duration.workspace = true tracing.workspace = true rand.workspace = true zstd.workspace = true diff --git a/src/model/garage.rs b/src/model/garage.rs index 8987c594..4405d22d 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -170,14 +170,7 @@ impl Garage { }; info!("Initialize block manager..."); - let block_manager = BlockManager::new( - &db, - config.data_dir.clone(), - config.data_fsync, - config.compression_level, - data_rep_param, - system.clone(), - )?; + let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?; block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- @@ -278,7 +271,7 @@ impl Garage { })) } - pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> { self.block_manager.spawn_workers(bg); self.bucket_table.spawn_workers(bg); @@ -299,6 +292,23 @@ impl Garage { #[cfg(feature = "k2v")] self.k2v.spawn_workers(bg); + + if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() { + let interval = parse_duration::parse(itv) + .ok_or_message("Invalid `metadata_auto_snapshot_interval`")?; + if interval < std::time::Duration::from_secs(600) { + return Err(Error::Message( + "metadata_auto_snapshot_interval too small or negative".into(), + )); + } + + bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new( + self.clone(), + interval, + )); + } + + Ok(()) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { diff --git a/src/model/lib.rs b/src/model/lib.rs index 2166105f..1939a7a9 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -15,3 +15,4 @@ pub mod s3; pub mod garage; pub mod helper; +pub mod snapshot; diff --git a/src/model/snapshot.rs b/src/model/snapshot.rs new file mode 100644 index 00000000..36f9ec7d --- /dev/null +++ b/src/model/snapshot.rs @@ -0,0 +1,136 @@ +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use rand::prelude::*; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::*; + +use crate::garage::Garage; + +// The two most recent snapshots are kept +const KEEP_SNAPSHOTS: usize = 2; + +static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(()); + +// ================ snapshotting logic ===================== + +/// Run snashot_metadata in a blocking thread and async await on it +pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> { + let garage = garage.clone(); + let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage)); + worker.await.unwrap()?; + Ok(()) +} + +/// Take a snapshot of the metadata database, and erase older +/// snapshots if necessary. +/// This is not an async function, it should be spawned on a thread pool +pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> { + let lock = match SNAPSHOT_MUTEX.try_lock() { + Ok(lock) => lock, + Err(_) => { + return Err(Error::Message( + "Cannot acquire lock, another snapshot might be in progress".into(), + )) + } + }; + + let mut snapshots_dir = garage.config.metadata_dir.clone(); + snapshots_dir.push("snapshots"); + fs::create_dir_all(&snapshots_dir)?; + + let mut new_path = snapshots_dir.clone(); + new_path.push(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)); + + info!("Snapshotting metadata db to {}", new_path.display()); + garage.db.snapshot(&new_path)?; + info!("Metadata db snapshot finished"); + + if let Err(e) = cleanup_snapshots(&snapshots_dir) { + error!("Failed to do cleanup in snapshots directory: {}", e); + } + + drop(lock); + + Ok(()) +} + +fn cleanup_snapshots(snapshots_dir: &PathBuf) -> Result<(), Error> { + let mut snapshots = + fs::read_dir(&snapshots_dir)?.collect::<Result<Vec<fs::DirEntry>, std::io::Error>>()?; + + snapshots.retain(|x| x.file_name().len() > 8); + snapshots.sort_by_key(|x| x.file_name()); + + for to_delete in snapshots.iter().rev().skip(KEEP_SNAPSHOTS) { + let path = snapshots_dir.join(to_delete.path()); + if to_delete.metadata()?.file_type().is_dir() { + for file in fs::read_dir(&path)? { + let file = file?; + if file.metadata()?.is_file() { + fs::remove_file(path.join(file.path()))?; + } + } + std::fs::remove_dir(&path)?; + } else { + std::fs::remove_file(&path)?; + } + } + Ok(()) +} + +// ================ auto snapshot worker ===================== + +pub struct AutoSnapshotWorker { + garage: Arc<Garage>, + next_snapshot: Instant, + snapshot_interval: Duration, +} + +impl AutoSnapshotWorker { + pub(crate) fn new(garage: Arc<Garage>, snapshot_interval: Duration) -> Self { + Self { + garage, + snapshot_interval, + next_snapshot: Instant::now() + (snapshot_interval / 2), + } + } +} + +#[async_trait] +impl Worker for AutoSnapshotWorker { + fn name(&self) -> String { + "Metadata snapshot worker".into() + } + fn status(&self) -> WorkerStatus { + WorkerStatus { + freeform: vec![format!( + "Next snapshot: {}", + (chrono::Utc::now() + (self.next_snapshot - Instant::now())).to_rfc3339() + )], + ..Default::default() + } + } + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + if Instant::now() < self.next_snapshot { + return Ok(WorkerState::Idle); + } + + async_snapshot_metadata(&self.garage).await?; + + let rand_factor = 1f32 + thread_rng().gen::<f32>() / 5f32; + self.next_snapshot = Instant::now() + self.snapshot_interval.mul_f32(rand_factor); + + Ok(WorkerState::Idle) + } + async fn wait_for_work(&mut self) -> WorkerState { + tokio::time::sleep_until(self.next_snapshot.into()).await; + WorkerState::Busy + } +} |