diff options
-rw-r--r-- | src/garage/admin.rs | 4 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 24 | ||||
-rw-r--r-- | src/garage/main.rs | 18 | ||||
-rw-r--r-- | src/garage/repair/mod.rs | 2 | ||||
-rw-r--r-- | src/garage/repair/offline.rs | 52 | ||||
-rw-r--r-- | src/garage/repair/online.rs (renamed from src/garage/repair.rs) | 4 | ||||
-rw-r--r-- | src/garage/server.rs | 55 | ||||
-rw-r--r-- | src/model/garage.rs | 59 | ||||
-rw-r--r-- | src/model/index_counter.rs | 173 |
9 files changed, 302 insertions, 89 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c662aa00..afe7fe7a 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,7 +24,7 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::Repair; +use crate::repair::online::OnlineRepair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; @@ -619,7 +619,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = Repair { + let repair = OnlineRepair { garage: self.garage.clone(), }; self.garage diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..575ac857 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -33,10 +33,15 @@ pub enum Command { #[structopt(name = "migrate")] Migrate(MigrateOpt), - /// Start repair of node data + /// Start repair of node data on remote node #[structopt(name = "repair")] Repair(RepairOpt), + /// Offline reparation of node data (these repairs must be run offline + /// directly on the server node) + #[structopt(name = "offline-repair")] + OfflineRepair(OfflineRepairOpt), + /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), @@ -406,6 +411,23 @@ pub enum RepairWhat { } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct OfflineRepairOpt { + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: OfflineRepairWhat, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum OfflineRepairWhat { + /// Repair K2V item counters + #[structopt(name = "k2v_item_counters")] + K2VItemCounters, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] diff --git a/src/garage/main.rs b/src/garage/main.rs index bd09b6ea..3fa5c3c0 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -61,17 +61,17 @@ async fn main() { pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - let opt = Opt::from_args(); + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + let opt = Opt::from_args(); let res = match opt.cmd { - Command::Server => { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(opt.config_file).await + Command::Server => server::run_server(opt.config_file).await, + Command::OfflineRepair(repair_opt) => { + repair::offline::offline_repair(opt.config_file, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs new file mode 100644 index 00000000..4699ace5 --- /dev/null +++ b/src/garage/repair/mod.rs @@ -0,0 +1,2 @@ +pub mod offline; +pub mod online; diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs new file mode 100644 index 00000000..853bfdf3 --- /dev/null +++ b/src/garage/repair/offline.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::*; + +use garage_model::garage::Garage; + +use crate::cli::structs::*; + +pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { + if !opt.yes { + return Err(Error::Message( + "Please add the --yes flag to launch repair operation".into(), + )); + } + + info!("Loading configuration..."); + let config = read_config(config_file)?; + + info!("Initializing background runner..."); + let (done_tx, done_rx) = watch::channel(false); + let (background, await_background_done) = BackgroundRunner::new(16, done_rx); + + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone(), background)?; + + info!("Launching repair operation..."); + match opt.what { + OfflineRepairWhat::K2VItemCounters => { + #[cfg(feature = "k2v")] + garage + .k2v + .counter_table + .offline_recount_all(&garage.k2v.item_table)?; + #[cfg(not(feature = "k2v"))] + error!("K2V not enabled in this build."); + } + } + + info!("Repair operation finished, shutting down Garage internals..."); + done_tx.send(true).unwrap(); + drop(garage); + + await_background_done.await?; + + info!("Cleaning up..."); + + Ok(()) +} diff --git a/src/garage/repair.rs b/src/garage/repair/online.rs index 17e14b8b..d6a71742 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair/online.rs @@ -11,11 +11,11 @@ use garage_util::error::Error; use crate::*; -pub struct Repair { +pub struct OnlineRepair { pub garage: Arc<Garage>, } -impl Repair { +impl OnlineRepair { pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { if let Err(e) = self.repair_worker_aux(opt, must_exit).await { warn!("Repair worker failed with error: {}", e); diff --git a/src/garage/server.rs b/src/garage/server.rs index 697d3358..6321357a 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -2,8 +2,6 @@ use std::path::PathBuf; use tokio::sync::watch; -use garage_db as db; - use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; @@ -29,62 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) { pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file).expect("Unable to read config file"); - - info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); - let db = match config.db_engine.as_str() { - "sled" => { - db_path.push("db"); - info!("Opening Sled database at: {}", db_path.display()); - let db = db::sled_adapter::sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); - db::sled_adapter::SledDb::init(db) - } - "sqlite" | "sqlite3" | "rusqlite" => { - db_path.push("db.sqlite"); - info!("Opening Sqlite database at: {}", db_path.display()); - let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) - .expect("Unable to open sqlite DB"); - db::sqlite_adapter::SqliteDb::init(db) - } - "lmdb" | "heed" => { - db_path.push("db.lmdb"); - info!("Opening LMDB database at: {}", db_path.display()); - std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); - let map_size = if u32::MAX as usize == usize::MAX { - warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); - 1usize << 30 // 1GB for 32-bit systems - } else { - 1usize << 40 // 1TB for 64-bit systems - }; - - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); - db::lmdb_adapter::LmdbDb::init(db) - } - e => { - return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e - ))); - } - }; + let config = read_config(config_file)?; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background); + let garage = Garage::new(config.clone(), background)?; info!("Initialize tracing..."); if let Some(export_to) = config.admin.trace_sink { @@ -94,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new(garage.clone()); + info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); diff --git a/src/model/garage.rs b/src/model/garage.rs index 106ff858..eed9445c 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -6,6 +6,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; +use garage_util::error::Error; use garage_rpc::system::System; @@ -73,7 +74,56 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> { + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); + let db = match config.db_engine.as_str() { + "sled" => { + db_path.push("db"); + info!("Opening Sled database at: {}", db_path.display()); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + "sqlite" | "sqlite3" | "rusqlite" => { + db_path.push("db.sqlite"); + info!("Opening Sqlite database at: {}", db_path.display()); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + "lmdb" | "heed" => { + db_path.push("db.lmdb"); + info!("Opening LMDB database at: {}", db_path.display()); + std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + let map_size = + if u32::MAX as usize == usize::MAX { + warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); + 1usize << 30 // 1GB for 32-bit systems + } else { + 1usize << 40 // 1TB for 64-bit systems + }; + + let db = db::lmdb_adapter::heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(map_size) + .open(&db_path) + .expect("Unable to open LMDB DB"); + db::lmdb_adapter::LmdbDb::init(db) + } + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", + e + ))); + } + }; + let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -171,9 +221,8 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - info!("Initialize Garage..."); - - Arc::new(Self { + // -- done -- + Ok(Arc::new(Self { config, db, background, @@ -187,7 +236,7 @@ impl Garage { block_ref_table, #[cfg(feature = "k2v")] k2v, - }) + })) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 109b9828..313d8612 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; @@ -12,9 +13,10 @@ use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; +use garage_table::replication::*; use garage_table::*; pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { @@ -139,7 +141,7 @@ impl<T: CountedItem> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -203,17 +205,22 @@ impl<T: CountedItem> IndexCounter<T> { let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)?, + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), values: BTreeMap::new(), }, }; + let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; + ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -234,7 +241,7 @@ impl<T: CountedItem> IndexCounter<T> { async fn propagate_loop( self: Arc<Self>, - mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, must_exit: watch::Receiver<bool>, ) { // This loop batches updates to counters to be sent all at once. @@ -257,7 +264,7 @@ impl<T: CountedItem> IndexCounter<T> { if let Some((pk, sk, counters)) = ent { let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk); + let dist_entry = counters.into_counter_entry(self.this_node); match buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); @@ -293,23 +300,153 @@ impl<T: CountedItem> IndexCounter<T> { } } } + + pub fn offline_recount_all<TS, TR>( + &self, + counted_table: &Arc<Table<TS, TR>>, + ) -> Result<(), Error> + where + TS: TableSchema<E = T>, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { +struct LocalCounterEntry<T: CountedItem> { + pk: T::CP, + sk: T::CS, values: BTreeMap<String, (u64, i64)>, } -impl LocalCounterEntry { - fn into_counter_entry<T: CountedItem>( - self, - this_node: Uuid, - pk: T::CP, - sk: T::CS, - ) -> CounterEntry<T> { +impl<T: CountedItem> LocalCounterEntry<T> { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { CounterEntry { - pk, - sk, + pk: self.pk, + sk: self.sk, values: self .values .into_iter() |