aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/cli/structs.rs24
-rw-r--r--src/garage/main.rs18
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/repair/offline.rs52
-rw-r--r--src/garage/repair/online.rs (renamed from src/garage/repair.rs)4
-rw-r--r--src/garage/server.rs55
-rw-r--r--src/model/garage.rs59
-rw-r--r--src/model/index_counter.rs173
9 files changed, 302 insertions, 89 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c662aa00..afe7fe7a 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -24,7 +24,7 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
-use crate::repair::Repair;
+use crate::repair::online::OnlineRepair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
@@ -619,7 +619,7 @@ impl AdminRpcHandler {
)))
}
} else {
- let repair = Repair {
+ let repair = OnlineRepair {
garage: self.garage.clone(),
};
self.garage
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index a0c49aeb..575ac857 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -33,10 +33,15 @@ pub enum Command {
#[structopt(name = "migrate")]
Migrate(MigrateOpt),
- /// Start repair of node data
+ /// Start repair of node data on remote node
#[structopt(name = "repair")]
Repair(RepairOpt),
+ /// Offline reparation of node data (these repairs must be run offline
+ /// directly on the server node)
+ #[structopt(name = "offline-repair")]
+ OfflineRepair(OfflineRepairOpt),
+
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
@@ -406,6 +411,23 @@ pub enum RepairWhat {
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct OfflineRepairOpt {
+ /// Confirm the launch of the repair operation
+ #[structopt(long = "yes")]
+ pub yes: bool,
+
+ #[structopt(subcommand)]
+ pub what: OfflineRepairWhat,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum OfflineRepairWhat {
+ /// Repair K2V item counters
+ #[structopt(name = "k2v_item_counters")]
+ K2VItemCounters,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
diff --git a/src/garage/main.rs b/src/garage/main.rs
index bd09b6ea..3fa5c3c0 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -61,17 +61,17 @@ async fn main() {
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
- let opt = Opt::from_args();
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+ let opt = Opt::from_args();
let res = match opt.cmd {
- Command::Server => {
- // Abort on panic (same behavior as in Go)
- std::panic::set_hook(Box::new(|panic_info| {
- error!("{}", panic_info.to_string());
- std::process::abort();
- }));
-
- server::run_server(opt.config_file).await
+ Command::Server => server::run_server(opt.config_file).await,
+ Command::OfflineRepair(repair_opt) => {
+ repair::offline::offline_repair(opt.config_file, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
new file mode 100644
index 00000000..4699ace5
--- /dev/null
+++ b/src/garage/repair/mod.rs
@@ -0,0 +1,2 @@
+pub mod offline;
+pub mod online;
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
new file mode 100644
index 00000000..853bfdf3
--- /dev/null
+++ b/src/garage/repair/offline.rs
@@ -0,0 +1,52 @@
+use std::path::PathBuf;
+
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::config::*;
+use garage_util::error::*;
+
+use garage_model::garage::Garage;
+
+use crate::cli::structs::*;
+
+pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
+ if !opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to launch repair operation".into(),
+ ));
+ }
+
+ info!("Loading configuration...");
+ let config = read_config(config_file)?;
+
+ info!("Initializing background runner...");
+ let (done_tx, done_rx) = watch::channel(false);
+ let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
+
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone(), background)?;
+
+ info!("Launching repair operation...");
+ match opt.what {
+ OfflineRepairWhat::K2VItemCounters => {
+ #[cfg(feature = "k2v")]
+ garage
+ .k2v
+ .counter_table
+ .offline_recount_all(&garage.k2v.item_table)?;
+ #[cfg(not(feature = "k2v"))]
+ error!("K2V not enabled in this build.");
+ }
+ }
+
+ info!("Repair operation finished, shutting down Garage internals...");
+ done_tx.send(true).unwrap();
+ drop(garage);
+
+ await_background_done.await?;
+
+ info!("Cleaning up...");
+
+ Ok(())
+}
diff --git a/src/garage/repair.rs b/src/garage/repair/online.rs
index 17e14b8b..d6a71742 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair/online.rs
@@ -11,11 +11,11 @@ use garage_util::error::Error;
use crate::*;
-pub struct Repair {
+pub struct OnlineRepair {
pub garage: Arc<Garage>,
}
-impl Repair {
+impl OnlineRepair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 697d3358..6321357a 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -2,8 +2,6 @@ use std::path::PathBuf;
use tokio::sync::watch;
-use garage_db as db;
-
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
@@ -29,62 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
- let config = read_config(config_file).expect("Unable to read config file");
-
- info!("Opening database...");
- let mut db_path = config.metadata_dir.clone();
- std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
- let db = match config.db_engine.as_str() {
- "sled" => {
- db_path.push("db");
- info!("Opening Sled database at: {}", db_path.display());
- let db = db::sled_adapter::sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
- db::sled_adapter::SledDb::init(db)
- }
- "sqlite" | "sqlite3" | "rusqlite" => {
- db_path.push("db.sqlite");
- info!("Opening Sqlite database at: {}", db_path.display());
- let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
- .expect("Unable to open sqlite DB");
- db::sqlite_adapter::SqliteDb::init(db)
- }
- "lmdb" | "heed" => {
- db_path.push("db.lmdb");
- info!("Opening LMDB database at: {}", db_path.display());
- std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
- let map_size = if u32::MAX as usize == usize::MAX {
- warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
- 1usize << 30 // 1GB for 32-bit systems
- } else {
- 1usize << 40 // 1TB for 64-bit systems
- };
-
- let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
- .max_dbs(100)
- .map_size(map_size)
- .open(&db_path)
- .expect("Unable to open LMDB DB");
- db::lmdb_adapter::LmdbDb::init(db)
- }
- e => {
- return Err(Error::Message(format!(
- "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
- e
- )));
- }
- };
+ let config = read_config(config_file)?;
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), db, background);
+ let garage = Garage::new(config.clone(), background)?;
info!("Initialize tracing...");
if let Some(export_to) = config.admin.trace_sink {
@@ -94,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(garage.clone());
+ info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 106ff858..eed9445c 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -6,6 +6,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
+use garage_util::error::Error;
use garage_rpc::system::System;
@@ -73,7 +74,56 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ info!("Opening database...");
+ let mut db_path = config.metadata_dir.clone();
+ std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
+ let db = match config.db_engine.as_str() {
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size =
+ if u32::MAX as usize == usize::MAX {
+ warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
+ 1usize << 30 // 1GB for 32-bit systems
+ } else {
+ 1usize << 40 // 1TB for 64-bit systems
+ };
+
+ let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&db_path)
+ .expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
+ e
+ )));
+ }
+ };
+
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -171,9 +221,8 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- info!("Initialize Garage...");
-
- Arc::new(Self {
+ // -- done --
+ Ok(Arc::new(Self {
config,
db,
background,
@@ -187,7 +236,7 @@ impl Garage {
block_ref_table,
#[cfg(feature = "k2v")]
k2v,
- })
+ }))
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 109b9828..313d8612 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -1,3 +1,4 @@
+use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -12,9 +13,10 @@ use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_table::crdt::*;
-use garage_table::replication::TableShardedReplication;
+use garage_table::replication::*;
use garage_table::*;
pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
@@ -139,7 +141,7 @@ impl<T: CountedItem> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -203,17 +205,22 @@ impl<T: CountedItem> IndexCounter<T> {
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?,
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?
+ }
None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
values: BTreeMap::new(),
},
};
+ let now = now_msec();
for (s, inc) in counts.iter() {
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
+ ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@@ -234,7 +241,7 @@ impl<T: CountedItem> IndexCounter<T> {
async fn propagate_loop(
self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
@@ -257,7 +264,7 @@ impl<T: CountedItem> IndexCounter<T> {
if let Some((pk, sk, counters)) = ent {
let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
+ let dist_entry = counters.into_counter_entry(self.this_node);
match buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
@@ -293,23 +300,153 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
}
+
+ pub fn offline_recount_all<TS, TR>(
+ &self,
+ counted_table: &Arc<Table<TS, TR>>,
+ ) -> Result<(), Error>
+ where
+ TS: TableSchema<E = T>,
+ TR: TableReplication,
+ {
+ let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
+ let entry_k = self
+ .table
+ .data
+ .tree_key(entry.partition_key(), entry.sort_key());
+ self.table
+ .data
+ .update_entry_with(&entry_k, |ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&entry);
+ ent
+ }
+ None => entry.clone(),
+ })?;
+ Ok(())
+ };
+
+ // 1. Set all old local counters to zero
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ for (local_counter_k, local_counter) in batch {
+ let mut local_counter =
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+
+ for (_, tv) in local_counter.values.iter_mut() {
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 = 0;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_k, &local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(local_counter_k);
+ }
+ }
+
+ // 2. Recount all table entries
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in counted_table
+ .data
+ .store
+ .range((low_bound, Bound::Unbounded))?
+ {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ for (counted_entry_k, counted_entry) in batch {
+ let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
+
+ let pk = counted_entry.counter_partition_key();
+ let sk = counted_entry.counter_sort_key();
+ let counts = counted_entry.counts();
+
+ let local_counter_key = self.table.data.tree_key(pk, sk);
+ let mut local_counter = match self.local_counter.get(&local_counter_key)? {
+ Some(old_bytes) => {
+ let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
+ &old_bytes,
+ )?;
+ assert!(ent.pk == *pk);
+ assert!(ent.sk == *sk);
+ ent
+ }
+ None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
+ values: BTreeMap::new(),
+ },
+ };
+ for (s, v) in counts.iter() {
+ let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 += v;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_key, local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(counted_entry_k);
+ }
+ }
+
+ // Done
+ Ok(())
+ }
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry {
+struct LocalCounterEntry<T: CountedItem> {
+ pk: T::CP,
+ sk: T::CS,
values: BTreeMap<String, (u64, i64)>,
}
-impl LocalCounterEntry {
- fn into_counter_entry<T: CountedItem>(
- self,
- this_node: Uuid,
- pk: T::CP,
- sk: T::CS,
- ) -> CounterEntry<T> {
+impl<T: CountedItem> LocalCounterEntry<T> {
+ fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
- pk,
- sk,
+ pk: self.pk,
+ sk: self.sk,
values: self
.values
.into_iter()