aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/db/Cargo.toml2
-rw-r--r--src/db/lib.rs12
-rw-r--r--src/db/lmdb_adapter.rs10
-rw-r--r--src/db/sled_adapter.rs8
-rw-r--r--src/db/sqlite_adapter.rs12
-rw-r--r--src/garage/admin/mod.rs40
-rw-r--r--src/garage/cli/cmd.rs3
-rw-r--r--src/garage/cli/structs.rs15
-rw-r--r--src/garage/server.rs2
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/garage.rs19
-rw-r--r--src/model/lib.rs1
-rw-r--r--src/model/snapshot.rs136
-rw-r--r--src/util/config.rs4
14 files changed, 262 insertions, 3 deletions
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index d7c89620..324de74c 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -17,7 +17,7 @@ hexdump.workspace = true
tracing.workspace = true
heed = { workspace = true, optional = true }
-rusqlite = { workspace = true, optional = true }
+rusqlite = { workspace = true, optional = true, features = ["backup"] }
sled = { workspace = true, optional = true }
[dev-dependencies]
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 0fb457ce..7f19172f 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -19,6 +19,7 @@ use core::ops::{Bound, RangeBounds};
use std::borrow::Cow;
use std::cell::Cell;
+use std::path::PathBuf;
use std::sync::Arc;
use err_derive::Error;
@@ -48,6 +49,12 @@ pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value,
#[error(display = "{}", _0)]
pub struct Error(pub Cow<'static, str>);
+impl From<std::io::Error> for Error {
+ fn from(e: std::io::Error) -> Error {
+ Error(format!("IO: {}", e).into())
+ }
+}
+
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Error)]
@@ -129,6 +136,10 @@ impl Db {
}
}
+ pub fn snapshot(&self, path: &PathBuf) -> Result<()> {
+ self.0.snapshot(path)
+ }
+
pub fn import(&self, other: &Db) -> Result<()> {
let existing_trees = self.list_trees()?;
if !existing_trees.is_empty() {
@@ -325,6 +336,7 @@ pub(crate) trait IDb: Send + Sync {
fn engine(&self) -> String;
fn open_tree(&self, name: &str) -> Result<usize>;
fn list_trees(&self) -> Result<Vec<String>>;
+ fn snapshot(&self, path: &PathBuf) -> Result<()>;
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>;
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index 59fa132d..4b131aff 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -3,6 +3,7 @@ use core::ptr::NonNull;
use std::collections::HashMap;
use std::convert::TryInto;
+use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use heed::types::ByteSlice;
@@ -102,6 +103,15 @@ impl IDb for LmdbDb {
Ok(ret2)
}
+ fn snapshot(&self, to: &PathBuf) -> Result<()> {
+ std::fs::create_dir_all(to)?;
+ let mut path = to.clone();
+ path.push("data.mdb");
+ self.db
+ .copy_to_path(path, heed::CompactionOption::Disabled)?;
+ Ok(())
+ }
+
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
index 84f2001b..c34b4d81 100644
--- a/src/db/sled_adapter.rs
+++ b/src/db/sled_adapter.rs
@@ -2,6 +2,7 @@ use core::ops::Bound;
use std::cell::Cell;
use std::collections::HashMap;
+use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use sled::transaction::{
@@ -96,6 +97,13 @@ impl IDb for SledDb {
Ok(trees)
}
+ fn snapshot(&self, to: &PathBuf) -> Result<()> {
+ let to_db = sled::open(to)?;
+ let export = self.db.export();
+ to_db.import(export);
+ Ok(())
+ }
+
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 9f967c66..827f3cc3 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -2,6 +2,7 @@ use core::ops::Bound;
use std::borrow::BorrowMut;
use std::marker::PhantomPinned;
+use std::path::PathBuf;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard};
@@ -119,6 +120,17 @@ impl IDb for SqliteDb {
Ok(trees)
}
+ fn snapshot(&self, to: &PathBuf) -> Result<()> {
+ fn progress(p: rusqlite::backup::Progress) {
+ let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
+ info!("Sqlite snapshot progres: {}%", percent);
+ }
+ let this = self.0.lock().unwrap();
+ this.db
+ .backup(rusqlite::DatabaseName::Main, to, Some(progress))?;
+ Ok(())
+ }
+
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index b6f9c426..f01ef3d6 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -46,6 +46,7 @@ pub enum AdminRpc {
Stats(StatsOpt),
Worker(WorkerOperation),
BlockOperation(BlockOperation),
+ MetaOperation(MetaOperation),
// Replies
Ok(String),
@@ -518,6 +519,44 @@ impl AdminRpcHandler {
)]))
}
}
+
+ // ================ META DB COMMANDS ====================
+
+ async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
+ match mo {
+ MetaOperation::Snapshot { all: true } => {
+ let ring = self.garage.system.ring.borrow().clone();
+ let to = ring.layout.node_ids().to_vec();
+
+ let resps = futures::future::join_all(to.iter().map(|to| async move {
+ let to = (*to).into();
+ self.endpoint
+ .call(
+ &to,
+ AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
+ PRIO_NORMAL,
+ )
+ .await
+ }))
+ .await;
+
+ let mut ret = vec![];
+ for (to, resp) in to.iter().zip(resps.iter()) {
+ let res_str = match resp {
+ Ok(_) => "ok".to_string(),
+ Err(e) => format!("error: {}", e),
+ };
+ ret.push(format!("{:?}\t{}", to, res_str));
+ }
+
+ Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ }
+ MetaOperation::Snapshot { all: false } => {
+ garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
+ Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
+ }
+ }
+ }
}
#[async_trait]
@@ -535,6 +574,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
+ AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 48359614..4c0a5322 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -44,6 +44,9 @@ pub async fn cli_command_dispatch(
Command::Block(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
}
+ Command::Meta(mo) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
+ }
_ => unreachable!(),
}
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index be4d5bd6..51d2bed3 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -57,6 +57,10 @@ pub enum Command {
#[structopt(name = "block", version = garage_version())]
Block(BlockOperation),
+ /// Operations on the metadata db
+ #[structopt(name = "meta", version = garage_version())]
+ Meta(MetaOperation),
+
/// Convert metadata db between database engine formats
#[structopt(name = "convert-db", version = garage_version())]
ConvertDb(convert_db::ConvertDbOpt),
@@ -617,3 +621,14 @@ pub enum BlockOperation {
blocks: Vec<String>,
},
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub enum MetaOperation {
+ /// Save a snapshot of the metadata db file
+ #[structopt(name = "snapshot", version = garage_version())]
+ Snapshot {
+ /// Run on all nodes instead of only local node
+ #[structopt(long = "all")]
+ all: bool,
+ },
+}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 6323f957..65bf34db 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -51,7 +51,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone());
info!("Spawning Garage workers...");
- garage.spawn_workers(&background);
+ garage.spawn_workers(&background)?;
if config.admin.trace_sink.is_some() {
info!("Initialize tracing...");
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 2e5b047d..bde354b5 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -28,6 +28,7 @@ chrono.workspace = true
err-derive.workspace = true
hex.workspace = true
base64.workspace = true
+parse_duration.workspace = true
tracing.workspace = true
rand.workspace = true
zstd.workspace = true
diff --git a/src/model/garage.rs b/src/model/garage.rs
index acf943f6..a6f60546 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -278,7 +278,7 @@ impl Garage {
}))
}
- pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> {
self.block_manager.spawn_workers(bg);
self.bucket_table.spawn_workers(bg);
@@ -299,6 +299,23 @@ impl Garage {
#[cfg(feature = "k2v")]
self.k2v.spawn_workers(bg);
+
+ if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() {
+ let interval = parse_duration::parse(itv)
+ .ok_or_message("Invalid `metadata_auto_snapshot_interval`")?;
+ if interval < std::time::Duration::from_secs(600) {
+ return Err(Error::Message(
+ "metadata_auto_snapshot_interval too small or negative".into(),
+ ));
+ }
+
+ bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new(
+ self.clone(),
+ interval,
+ ));
+ }
+
+ Ok(())
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 4f20ea46..8ec338da 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -19,3 +19,4 @@ pub mod s3;
pub mod garage;
pub mod helper;
pub mod migrate;
+pub mod snapshot;
diff --git a/src/model/snapshot.rs b/src/model/snapshot.rs
new file mode 100644
index 00000000..36f9ec7d
--- /dev/null
+++ b/src/model/snapshot.rs
@@ -0,0 +1,136 @@
+use std::fs;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use rand::prelude::*;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::*;
+
+use crate::garage::Garage;
+
+// The two most recent snapshots are kept
+const KEEP_SNAPSHOTS: usize = 2;
+
+static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(());
+
+// ================ snapshotting logic =====================
+
+/// Run snashot_metadata in a blocking thread and async await on it
+pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
+ let garage = garage.clone();
+ let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
+ worker.await.unwrap()?;
+ Ok(())
+}
+
+/// Take a snapshot of the metadata database, and erase older
+/// snapshots if necessary.
+/// This is not an async function, it should be spawned on a thread pool
+pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> {
+ let lock = match SNAPSHOT_MUTEX.try_lock() {
+ Ok(lock) => lock,
+ Err(_) => {
+ return Err(Error::Message(
+ "Cannot acquire lock, another snapshot might be in progress".into(),
+ ))
+ }
+ };
+
+ let mut snapshots_dir = garage.config.metadata_dir.clone();
+ snapshots_dir.push("snapshots");
+ fs::create_dir_all(&snapshots_dir)?;
+
+ let mut new_path = snapshots_dir.clone();
+ new_path.push(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true));
+
+ info!("Snapshotting metadata db to {}", new_path.display());
+ garage.db.snapshot(&new_path)?;
+ info!("Metadata db snapshot finished");
+
+ if let Err(e) = cleanup_snapshots(&snapshots_dir) {
+ error!("Failed to do cleanup in snapshots directory: {}", e);
+ }
+
+ drop(lock);
+
+ Ok(())
+}
+
+fn cleanup_snapshots(snapshots_dir: &PathBuf) -> Result<(), Error> {
+ let mut snapshots =
+ fs::read_dir(&snapshots_dir)?.collect::<Result<Vec<fs::DirEntry>, std::io::Error>>()?;
+
+ snapshots.retain(|x| x.file_name().len() > 8);
+ snapshots.sort_by_key(|x| x.file_name());
+
+ for to_delete in snapshots.iter().rev().skip(KEEP_SNAPSHOTS) {
+ let path = snapshots_dir.join(to_delete.path());
+ if to_delete.metadata()?.file_type().is_dir() {
+ for file in fs::read_dir(&path)? {
+ let file = file?;
+ if file.metadata()?.is_file() {
+ fs::remove_file(path.join(file.path()))?;
+ }
+ }
+ std::fs::remove_dir(&path)?;
+ } else {
+ std::fs::remove_file(&path)?;
+ }
+ }
+ Ok(())
+}
+
+// ================ auto snapshot worker =====================
+
+pub struct AutoSnapshotWorker {
+ garage: Arc<Garage>,
+ next_snapshot: Instant,
+ snapshot_interval: Duration,
+}
+
+impl AutoSnapshotWorker {
+ pub(crate) fn new(garage: Arc<Garage>, snapshot_interval: Duration) -> Self {
+ Self {
+ garage,
+ snapshot_interval,
+ next_snapshot: Instant::now() + (snapshot_interval / 2),
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for AutoSnapshotWorker {
+ fn name(&self) -> String {
+ "Metadata snapshot worker".into()
+ }
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ freeform: vec![format!(
+ "Next snapshot: {}",
+ (chrono::Utc::now() + (self.next_snapshot - Instant::now())).to_rfc3339()
+ )],
+ ..Default::default()
+ }
+ }
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if Instant::now() < self.next_snapshot {
+ return Ok(WorkerState::Idle);
+ }
+
+ async_snapshot_metadata(&self.garage).await?;
+
+ let rand_factor = 1f32 + thread_rng().gen::<f32>() / 5f32;
+ self.next_snapshot = Instant::now() + self.snapshot_interval.mul_f32(rand_factor);
+
+ Ok(WorkerState::Idle)
+ }
+ async fn wait_for_work(&mut self) -> WorkerState {
+ tokio::time::sleep_until(self.next_snapshot.into()).await;
+ WorkerState::Busy
+ }
+}
diff --git a/src/util/config.rs b/src/util/config.rs
index 7338a506..8ecbdfbb 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -27,6 +27,10 @@ pub struct Config {
#[serde(default)]
pub disable_scrub: bool,
+ /// Automatic snapshot interval for metadata
+ #[serde(default)]
+ pub metadata_auto_snapshot_interval: Option<String>,
+
/// Size of data blocks to save to disk
#[serde(
deserialize_with = "deserialize_capacity",