From f8a6fff2b7bb754fa315e1c00b0f6adea4586fa8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Jun 2022 17:37:16 +0200 Subject: First implementation of counter repair procedure --- src/garage/admin.rs | 4 +- src/garage/cli/structs.rs | 24 +++++- src/garage/main.rs | 18 ++--- src/garage/repair.rs | 163 ---------------------------------------- src/garage/repair/mod.rs | 2 + src/garage/repair/offline.rs | 52 +++++++++++++ src/garage/repair/online.rs | 163 ++++++++++++++++++++++++++++++++++++++++ src/garage/server.rs | 55 +------------- src/model/garage.rs | 59 +++++++++++++-- src/model/index_counter.rs | 173 ++++++++++++++++++++++++++++++++++++++----- 10 files changed, 463 insertions(+), 250 deletions(-) delete mode 100644 src/garage/repair.rs create mode 100644 src/garage/repair/mod.rs create mode 100644 src/garage/repair/offline.rs create mode 100644 src/garage/repair/online.rs (limited to 'src') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c662aa00..afe7fe7a 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,7 +24,7 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::Repair; +use crate::repair::online::OnlineRepair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; @@ -619,7 +619,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = Repair { + let repair = OnlineRepair { garage: self.garage.clone(), }; self.garage diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..575ac857 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -33,10 +33,15 @@ pub enum Command { #[structopt(name = "migrate")] Migrate(MigrateOpt), - /// Start repair of node data + /// Start repair of node data on remote node #[structopt(name = "repair")] Repair(RepairOpt), + /// Offline reparation of node data (these repairs must be run offline + /// directly on the server node) + #[structopt(name = "offline-repair")] + OfflineRepair(OfflineRepairOpt), + /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), @@ -405,6 +410,23 @@ pub enum RepairWhat { }, } +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct OfflineRepairOpt { + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: OfflineRepairWhat, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum OfflineRepairWhat { + /// Repair K2V item counters + #[structopt(name = "k2v_item_counters")] + K2VItemCounters, +} + #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes diff --git a/src/garage/main.rs b/src/garage/main.rs index bd09b6ea..3fa5c3c0 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -61,17 +61,17 @@ async fn main() { pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - let opt = Opt::from_args(); + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + let opt = Opt::from_args(); let res = match opt.cmd { - Command::Server => { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(opt.config_file).await + Command::Server => server::run_server(opt.config_file).await, + Command::OfflineRepair(repair_opt) => { + repair::offline::offline_repair(opt.config_file, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) diff --git a/src/garage/repair.rs b/src/garage/repair.rs deleted file mode 100644 index 17e14b8b..00000000 --- a/src/garage/repair.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::watch; - -use garage_model::garage::Garage; -use garage_model::s3::block_ref_table::*; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; -use garage_table::*; -use garage_util::error::Error; - -use crate::*; - -pub struct Repair { - pub garage: Arc, -} - -impl Repair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); - } - } - - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } - } - Ok(()) - } - - async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; - - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); - } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; - } - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) - .await?; - } - } - info!("repair_versions: finished, done {}", i); - Ok(()) - } - - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; - - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } - - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; - } - let version = self - .garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; - } - } - info!("repair_block_ref: finished, done {}", i); - Ok(()) - } -} diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs new file mode 100644 index 00000000..4699ace5 --- /dev/null +++ b/src/garage/repair/mod.rs @@ -0,0 +1,2 @@ +pub mod offline; +pub mod online; diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs new file mode 100644 index 00000000..853bfdf3 --- /dev/null +++ b/src/garage/repair/offline.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::*; + +use garage_model::garage::Garage; + +use crate::cli::structs::*; + +pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { + if !opt.yes { + return Err(Error::Message( + "Please add the --yes flag to launch repair operation".into(), + )); + } + + info!("Loading configuration..."); + let config = read_config(config_file)?; + + info!("Initializing background runner..."); + let (done_tx, done_rx) = watch::channel(false); + let (background, await_background_done) = BackgroundRunner::new(16, done_rx); + + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone(), background)?; + + info!("Launching repair operation..."); + match opt.what { + OfflineRepairWhat::K2VItemCounters => { + #[cfg(feature = "k2v")] + garage + .k2v + .counter_table + .offline_recount_all(&garage.k2v.item_table)?; + #[cfg(not(feature = "k2v"))] + error!("K2V not enabled in this build."); + } + } + + info!("Repair operation finished, shutting down Garage internals..."); + done_tx.send(true).unwrap(); + drop(garage); + + await_background_done.await?; + + info!("Cleaning up..."); + + Ok(()) +} diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs new file mode 100644 index 00000000..d6a71742 --- /dev/null +++ b/src/garage/repair/online.rs @@ -0,0 +1,163 @@ +use std::sync::Arc; + +use tokio::sync::watch; + +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; +use garage_table::*; +use garage_util::error::Error; + +use crate::*; + +pub struct OnlineRepair { + pub garage: Arc, +} + +impl OnlineRepair { + pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { + if let Err(e) = self.repair_worker_aux(opt, must_exit).await { + warn!("Repair worker failed with error: {}", e); + } + } + + async fn repair_worker_aux( + &self, + opt: RepairOpt, + must_exit: watch::Receiver, + ) -> Result<(), Error> { + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + self.garage.bucket_table.syncer.add_full_sync(); + self.garage.object_table.syncer.add_full_sync(); + self.garage.version_table.syncer.add_full_sync(); + self.garage.block_ref_table.syncer.add_full_sync(); + self.garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + RepairWhat::Scrub { tranquility } => { + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, tranquility) + .await?; + } + } + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + let mut i = 0; + + while !*must_exit.borrow() { + let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { + Some((k, v)) => { + pos = k; + v + } + None => break, + }; + + i += 1; + if i % 1000 == 0 { + info!("repair_versions: {}", i); + } + + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + if version.deleted.get() { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket_id, &version.key) + .await?; + let version_exists = match object { + Some(o) => o + .versions() + .iter() + .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), + None => false, + }; + if !version_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + self.garage + .version_table + .insert(&Version::new( + version.uuid, + version.bucket_id, + version.key, + true, + )) + .await?; + } + } + info!("repair_versions: finished, done {}", i); + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + let mut i = 0; + + while !*must_exit.borrow() { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { + Some((k, v)) => { + pos = k; + v + } + None => break, + }; + + i += 1; + if i % 1000 == 0 { + info!("repair_block_ref: {}", i); + } + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + if block_ref.deleted.get() { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + // The version might not exist if it has been GC'ed + let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true.into(), + }) + .await?; + } + } + info!("repair_block_ref: finished, done {}", i); + Ok(()) + } +} diff --git a/src/garage/server.rs b/src/garage/server.rs index 697d3358..6321357a 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -2,8 +2,6 @@ use std::path::PathBuf; use tokio::sync::watch; -use garage_db as db; - use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; @@ -29,62 +27,14 @@ async fn wait_from(mut chan: watch::Receiver) { pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file).expect("Unable to read config file"); - - info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); - let db = match config.db_engine.as_str() { - "sled" => { - db_path.push("db"); - info!("Opening Sled database at: {}", db_path.display()); - let db = db::sled_adapter::sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); - db::sled_adapter::SledDb::init(db) - } - "sqlite" | "sqlite3" | "rusqlite" => { - db_path.push("db.sqlite"); - info!("Opening Sqlite database at: {}", db_path.display()); - let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) - .expect("Unable to open sqlite DB"); - db::sqlite_adapter::SqliteDb::init(db) - } - "lmdb" | "heed" => { - db_path.push("db.lmdb"); - info!("Opening LMDB database at: {}", db_path.display()); - std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); - let map_size = if u32::MAX as usize == usize::MAX { - warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); - 1usize << 30 // 1GB for 32-bit systems - } else { - 1usize << 40 // 1TB for 64-bit systems - }; - - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); - db::lmdb_adapter::LmdbDb::init(db) - } - e => { - return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e - ))); - } - }; + let config = read_config(config_file)?; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background); + let garage = Garage::new(config.clone(), background)?; info!("Initialize tracing..."); if let Some(export_to) = config.admin.trace_sink { @@ -94,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new(garage.clone()); + info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); diff --git a/src/model/garage.rs b/src/model/garage.rs index 106ff858..eed9445c 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -6,6 +6,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; +use garage_util::error::Error; use garage_rpc::system::System; @@ -73,7 +74,56 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, db: db::Db, background: Arc) -> Arc { + pub fn new(config: Config, background: Arc) -> Result, Error> { + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); + let db = match config.db_engine.as_str() { + "sled" => { + db_path.push("db"); + info!("Opening Sled database at: {}", db_path.display()); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + "sqlite" | "sqlite3" | "rusqlite" => { + db_path.push("db.sqlite"); + info!("Opening Sqlite database at: {}", db_path.display()); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + "lmdb" | "heed" => { + db_path.push("db.lmdb"); + info!("Opening LMDB database at: {}", db_path.display()); + std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + let map_size = + if u32::MAX as usize == usize::MAX { + warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); + 1usize << 30 // 1GB for 32-bit systems + } else { + 1usize << 40 // 1TB for 64-bit systems + }; + + let db = db::lmdb_adapter::heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(map_size) + .open(&db_path) + .expect("Unable to open LMDB DB"); + db::lmdb_adapter::LmdbDb::init(db) + } + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", + e + ))); + } + }; + let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -171,9 +221,8 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - info!("Initialize Garage..."); - - Arc::new(Self { + // -- done -- + Ok(Arc::new(Self { config, db, background, @@ -187,7 +236,7 @@ impl Garage { block_ref_table, #[cfg(feature = "k2v")] k2v, - }) + })) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 109b9828..313d8612 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; @@ -12,9 +13,10 @@ use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; +use garage_table::replication::*; use garage_table::*; pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { @@ -139,7 +141,7 @@ impl TableSchema for CounterTable { pub struct IndexCounter { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } @@ -203,17 +205,22 @@ impl IndexCounter { let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)?, + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), values: BTreeMap::new(), }, }; + let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; + ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -234,7 +241,7 @@ impl IndexCounter { async fn propagate_loop( self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, must_exit: watch::Receiver, ) { // This loop batches updates to counters to be sent all at once. @@ -257,7 +264,7 @@ impl IndexCounter { if let Some((pk, sk, counters)) = ent { let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); + let dist_entry = counters.into_counter_entry(self.this_node); match buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); @@ -293,23 +300,153 @@ impl IndexCounter { } } } + + pub fn offline_recount_all( + &self, + counted_table: &Arc>, + ) -> Result<(), Error> + where + TS: TableSchema, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { +struct LocalCounterEntry { + pk: T::CP, + sk: T::CS, values: BTreeMap, } -impl LocalCounterEntry { - fn into_counter_entry( - self, - this_node: Uuid, - pk: T::CP, - sk: T::CS, - ) -> CounterEntry { +impl LocalCounterEntry { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { - pk, - sk, + pk: self.pk, + sk: self.sk, values: self .values .into_iter() -- cgit v1.2.3