From f8a6fff2b7bb754fa315e1c00b0f6adea4586fa8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Jun 2022 17:37:16 +0200 Subject: First implementation of counter repair procedure --- src/garage/admin.rs | 4 +- src/garage/cli/structs.rs | 24 ++++++- src/garage/main.rs | 18 ++--- src/garage/repair.rs | 163 ------------------------------------------- src/garage/repair/mod.rs | 2 + src/garage/repair/offline.rs | 52 ++++++++++++++ src/garage/repair/online.rs | 163 +++++++++++++++++++++++++++++++++++++++++++ src/garage/server.rs | 55 +-------------- 8 files changed, 254 insertions(+), 227 deletions(-) delete mode 100644 src/garage/repair.rs create mode 100644 src/garage/repair/mod.rs create mode 100644 src/garage/repair/offline.rs create mode 100644 src/garage/repair/online.rs (limited to 'src/garage') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c662aa00..afe7fe7a 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,7 +24,7 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::Repair; +use crate::repair::online::OnlineRepair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; @@ -619,7 +619,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = Repair { + let repair = OnlineRepair { garage: self.garage.clone(), }; self.garage diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..575ac857 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -33,10 +33,15 @@ pub enum Command { #[structopt(name = "migrate")] Migrate(MigrateOpt), - /// Start repair of node data + /// Start repair of node data on remote node #[structopt(name = "repair")] Repair(RepairOpt), + /// Offline reparation of node data (these repairs must be run offline + /// directly on the server node) + #[structopt(name = "offline-repair")] + OfflineRepair(OfflineRepairOpt), + /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), @@ -405,6 +410,23 @@ pub enum RepairWhat { }, } +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct OfflineRepairOpt { + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: OfflineRepairWhat, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum OfflineRepairWhat { + /// Repair K2V item counters + #[structopt(name = "k2v_item_counters")] + K2VItemCounters, +} + #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes diff --git a/src/garage/main.rs b/src/garage/main.rs index bd09b6ea..3fa5c3c0 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -61,17 +61,17 @@ async fn main() { pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - let opt = Opt::from_args(); + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + let opt = Opt::from_args(); let res = match opt.cmd { - Command::Server => { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(opt.config_file).await + Command::Server => server::run_server(opt.config_file).await, + Command::OfflineRepair(repair_opt) => { + repair::offline::offline_repair(opt.config_file, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) diff --git a/src/garage/repair.rs b/src/garage/repair.rs deleted file mode 100644 index 17e14b8b..00000000 --- a/src/garage/repair.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::watch; - -use garage_model::garage::Garage; -use garage_model::s3::block_ref_table::*; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; -use garage_table::*; -use garage_util::error::Error; - -use crate::*; - -pub struct Repair { - pub garage: Arc, -} - -impl Repair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); - } - } - - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } - } - Ok(()) - } - - async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; - - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); - } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; - } - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) - .await?; - } - } - info!("repair_versions: finished, done {}", i); - Ok(()) - } - - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; - - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } - - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; - } - let version = self - .garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; - } - } - info!("repair_block_ref: finished, done {}", i); - Ok(()) - } -} diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs new file mode 100644 index 00000000..4699ace5 --- /dev/null +++ b/src/garage/repair/mod.rs @@ -0,0 +1,2 @@ +pub mod offline; +pub mod online; diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs new file mode 100644 index 00000000..853bfdf3 --- /dev/null +++ b/src/garage/repair/offline.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::*; + +use garage_model::garage::Garage; + +use crate::cli::structs::*; + +pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { + if !opt.yes { + return Err(Error::Message( + "Please add the --yes flag to launch repair operation".into(), + )); + } + + info!("Loading configuration..."); + let config = read_config(config_file)?; + + info!("Initializing background runner..."); + let (done_tx, done_rx) = watch::channel(false); + let (background, await_background_done) = BackgroundRunner::new(16, done_rx); + + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone(), background)?; + + info!("Launching repair operation..."); + match opt.what { + OfflineRepairWhat::K2VItemCounters => { + #[cfg(feature = "k2v")] + garage + .k2v + .counter_table + .offline_recount_all(&garage.k2v.item_table)?; + #[cfg(not(feature = "k2v"))] + error!("K2V not enabled in this build."); + } + } + + info!("Repair operation finished, shutting down Garage internals..."); + done_tx.send(true).unwrap(); + drop(garage); + + await_background_done.await?; + + info!("Cleaning up..."); + + Ok(()) +} diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs new file mode 100644 index 00000000..d6a71742 --- /dev/null +++ b/src/garage/repair/online.rs @@ -0,0 +1,163 @@ +use std::sync::Arc; + +use tokio::sync::watch; + +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; +use garage_table::*; +use garage_util::error::Error; + +use crate::*; + +pub struct OnlineRepair { + pub garage: Arc, +} + +impl OnlineRepair { + pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { + if let Err(e) = self.repair_worker_aux(opt, must_exit).await { + warn!("Repair worker failed with error: {}", e); + } + } + + async fn repair_worker_aux( + &self, + opt: RepairOpt, + must_exit: watch::Receiver, + ) -> Result<(), Error> { + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + self.garage.bucket_table.syncer.add_full_sync(); + self.garage.object_table.syncer.add_full_sync(); + self.garage.version_table.syncer.add_full_sync(); + self.garage.block_ref_table.syncer.add_full_sync(); + self.garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + RepairWhat::Scrub { tranquility } => { + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, tranquility) + .await?; + } + } + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + let mut i = 0; + + while !*must_exit.borrow() { + let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { + Some((k, v)) => { + pos = k; + v + } + None => break, + }; + + i += 1; + if i % 1000 == 0 { + info!("repair_versions: {}", i); + } + + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + if version.deleted.get() { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket_id, &version.key) + .await?; + let version_exists = match object { + Some(o) => o + .versions() + .iter() + .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), + None => false, + }; + if !version_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + self.garage + .version_table + .insert(&Version::new( + version.uuid, + version.bucket_id, + version.key, + true, + )) + .await?; + } + } + info!("repair_versions: finished, done {}", i); + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + let mut i = 0; + + while !*must_exit.borrow() { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { + Some((k, v)) => { + pos = k; + v + } + None => break, + }; + + i += 1; + if i % 1000 == 0 { + info!("repair_block_ref: {}", i); + } + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + if block_ref.deleted.get() { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + // The version might not exist if it has been GC'ed + let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true.into(), + }) + .await?; + } + } + info!("repair_block_ref: finished, done {}", i); + Ok(()) + } +} diff --git a/src/garage/server.rs b/src/garage/server.rs index 697d3358..6321357a 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -2,8 +2,6 @@ use std::path::PathBuf; use tokio::sync::watch; -use garage_db as db; - use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; @@ -29,62 +27,14 @@ async fn wait_from(mut chan: watch::Receiver) { pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file).expect("Unable to read config file"); - - info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); - let db = match config.db_engine.as_str() { - "sled" => { - db_path.push("db"); - info!("Opening Sled database at: {}", db_path.display()); - let db = db::sled_adapter::sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); - db::sled_adapter::SledDb::init(db) - } - "sqlite" | "sqlite3" | "rusqlite" => { - db_path.push("db.sqlite"); - info!("Opening Sqlite database at: {}", db_path.display()); - let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) - .expect("Unable to open sqlite DB"); - db::sqlite_adapter::SqliteDb::init(db) - } - "lmdb" | "heed" => { - db_path.push("db.lmdb"); - info!("Opening LMDB database at: {}", db_path.display()); - std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); - let map_size = if u32::MAX as usize == usize::MAX { - warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); - 1usize << 30 // 1GB for 32-bit systems - } else { - 1usize << 40 // 1TB for 64-bit systems - }; - - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); - db::lmdb_adapter::LmdbDb::init(db) - } - e => { - return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e - ))); - } - }; + let config = read_config(config_file)?; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background); + let garage = Garage::new(config.clone(), background)?; info!("Initialize tracing..."); if let Some(export_to) = config.admin.trace_sink { @@ -94,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new(garage.clone()); + info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); -- cgit v1.2.3