diff options
-rw-r--r-- | src/block/manager.rs | 42 | ||||
-rw-r--r-- | src/block/metrics.rs | 12 | ||||
-rw-r--r-- | src/block/rc.rs | 7 | ||||
-rw-r--r-- | src/block/repair.rs | 58 | ||||
-rw-r--r-- | src/block/resync.rs | 57 | ||||
-rw-r--r-- | src/db/lib.rs | 7 | ||||
-rw-r--r-- | src/db/lmdb_adapter.rs | 4 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 4 | ||||
-rw-r--r-- | src/garage/admin.rs | 306 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 18 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 53 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 163 | ||||
-rw-r--r-- | src/garage/main.rs | 36 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 14 | ||||
-rw-r--r-- | src/model/index_counter.rs | 11 | ||||
-rw-r--r-- | src/table/data.rs | 8 | ||||
-rw-r--r-- | src/table/gc.rs | 10 | ||||
-rw-r--r-- | src/table/merkle.rs | 16 | ||||
-rw-r--r-- | src/table/metrics.rs | 38 | ||||
-rw-r--r-- | src/table/sync.rs | 10 | ||||
-rw-r--r-- | src/table/util.rs | 6 | ||||
-rw-r--r-- | src/util/background/mod.rs | 13 | ||||
-rw-r--r-- | src/util/background/worker.rs | 12 | ||||
-rw-r--r-- | src/util/formater.rs | 8 |
24 files changed, 724 insertions, 189 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 7f439b96..28523a93 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -90,6 +90,15 @@ pub struct BlockManager { tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct BlockResyncErrorInfo { + pub hash: Hash, + pub refcount: u64, + pub error_count: u64, + pub last_try: u64, + pub next_try: u64, +} + // This custom struct contains functions that must only be ran // when the lock is held. We ensure that it is the case by storing // it INSIDE a Mutex. @@ -114,7 +123,8 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); + let metrics = + BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); let (scrub_tx, scrub_rx) = mpsc::channel(1); @@ -309,11 +319,41 @@ impl BlockManager { Ok(self.rc.rc.len()?) } + /// Get number of items in the refcount table + pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> { + Ok(self.rc.rc.fast_len()?) + } + /// Send command to start/stop/manager scrub worker pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { let _ = self.tx_scrub_command.send(cmd).await; } + /// Get the reference count of a block + pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> { + Ok(self.rc.get_block_rc(hash)?.as_u64()) + } + + /// List all resync errors + pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> { + let mut blocks = Vec::with_capacity(self.resync.errors.len()); + for ent in self.resync.errors.iter()? { + let (hash, cnt) = ent?; + let cnt = ErrorCounter::decode(&cnt); + blocks.push(BlockResyncErrorInfo { + hash: Hash::try_from(&hash).unwrap(), + refcount: 0, + error_count: cnt.errors, + last_try: cnt.last_try, + next_try: cnt.next_try(), + }); + } + for block in blocks.iter_mut() { + block.refcount = self.get_block_rc(&block.hash)?; + } + Ok(blocks) + } + //// ----- Managing the reference counter ---- /// Increment the number of time a block is used, putting it to resynchronization if it is diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 477add66..fbef95af 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,9 +1,11 @@ use opentelemetry::{global, metrics::*}; +use garage_db as db; use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { + pub(crate) _rc_size: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>, @@ -23,9 +25,17 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self { + pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { + _rc_size: meter + .u64_value_observer("block.rc_size", move |observer| { + if let Ok(Some(v)) = rc_tree.fast_len() { + observer.observe(v as u64, &[]) + } + }) + .with_description("Number of blocks known to the reference counter") + .init(), _resync_queue_len: meter .u64_value_observer("block.resync_queue_length", move |observer| { observer.observe(resync_queue.len() as u64, &[]) diff --git a/src/block/rc.rs b/src/block/rc.rs index ce6defad..8dae3960 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -169,4 +169,11 @@ impl RcEntry { pub(crate) fn is_needed(&self) -> bool { !self.is_deletable() } + + pub(crate) fn as_u64(&self) -> u64 { + match self { + RcEntry::Present { count } => *count, + _ => 0, + } + } } diff --git a/src/block/repair.rs b/src/block/repair.rs index e2884b69..1878027e 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -53,7 +53,7 @@ impl Worker for RepairWorker { "Block repair worker".into() } - fn info(&self) -> Option<String> { + fn status(&self) -> WorkerStatus { match self.block_iter.as_ref() { None => { let idx_bytes = self @@ -66,9 +66,20 @@ impl Worker for RepairWorker { } else { idx_bytes }; - Some(format!("Phase 1: {}", hex::encode(idx_bytes))) + WorkerStatus { + progress: Some("0.00%".into()), + freeform: vec![format!( + "Currently in phase 1, iterator position: {}", + hex::encode(idx_bytes) + )], + ..Default::default() + } } - Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)), + Some(bi) => WorkerStatus { + progress: Some(format!("{:.2}%", bi.progress() * 100.)), + freeform: vec!["Currently in phase 2".into()], + ..Default::default() + }, } } @@ -271,29 +282,28 @@ impl Worker for ScrubWorker { "Block scrub worker".into() } - fn info(&self) -> Option<String> { - let s = match &self.work { - ScrubWorkerState::Running(bsi) => format!( - "{:.2}% done (tranquility = {})", - bsi.progress() * 100., - self.persisted.tranquility - ), + fn status(&self) -> WorkerStatus { + let mut s = WorkerStatus { + persistent_errors: Some(self.persisted.corruptions_detected), + tranquility: Some(self.persisted.tranquility), + ..Default::default() + }; + match &self.work { + ScrubWorkerState::Running(bsi) => { + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + } ScrubWorkerState::Paused(bsi, rt) => { - format!( - "Paused, {:.2}% done, resumes at {}", - bsi.progress() * 100., - msec_to_rfc3339(*rt) - ) + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; } - ScrubWorkerState::Finished => format!( - "Last completed scrub: {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) - ), - }; - Some(format!( - "{} ; corruptions detected: {}", - s, self.persisted.corruptions_detected - )) + ScrubWorkerState::Finished => { + s.freeform = vec![format!( + "Last scrub completed at {}", + msec_to_rfc3339(self.persisted.time_last_complete_scrub) + )]; + } + } + s } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { diff --git a/src/block/resync.rs b/src/block/resync.rs index ada3ac54..8231b55d 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -123,6 +123,24 @@ impl BlockResyncManager { Ok(self.errors.len()) } + /// Clear the error counter for a block and put it in queue immediately + pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + if let Some(ec) = self.errors.get(hash)? { + let mut ec = ErrorCounter::decode(&ec); + if ec.errors > 0 { + ec.last_try = now - ec.delay_msec(); + self.errors.insert(hash, ec.encode())?; + self.put_to_resync_at(hash, now)?; + return Ok(()); + } + } + Err(Error::Message(format!( + "Block {:?} was not in an errored state", + hash + ))) + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -257,7 +275,7 @@ impl BlockResyncManager { if let Err(e) = &res { manager.metrics.resync_error_counter.add(1); - warn!("Error when resyncing {:?}: {}", hash, e); + error!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.errors.get(hash.as_slice())? { Some(ec) => ErrorCounter::decode(&ec).add1(now + 1), @@ -477,27 +495,22 @@ impl Worker for ResyncWorker { format!("Block resync worker #{}", self.index + 1) } - fn info(&self) -> Option<String> { + fn status(&self) -> WorkerStatus { let persisted = self.manager.resync.persisted.load(); if self.index >= persisted.n_workers { - return Some("(unused)".into()); - } - - let mut ret = vec![]; - ret.push(format!("tranquility = {}", persisted.tranquility)); - - let qlen = self.manager.resync.queue_len().unwrap_or(0); - if qlen > 0 { - ret.push(format!("{} blocks in queue", qlen)); + return WorkerStatus { + freeform: vec!["This worker is currently disabled".into()], + ..Default::default() + }; } - let elen = self.manager.resync.errors_len().unwrap_or(0); - if elen > 0 { - ret.push(format!("{} blocks in error state", elen)); + WorkerStatus { + queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), + tranquility: Some(persisted.tranquility), + persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), + ..Default::default() } - - Some(ret.join(", ")) } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { @@ -545,9 +558,9 @@ impl Worker for ResyncWorker { /// and the time of the last try. /// Used to implement exponential backoff. #[derive(Clone, Copy, Debug)] -struct ErrorCounter { - errors: u64, - last_try: u64, +pub(crate) struct ErrorCounter { + pub(crate) errors: u64, + pub(crate) last_try: u64, } impl ErrorCounter { @@ -558,12 +571,13 @@ impl ErrorCounter { } } - fn decode(data: &[u8]) -> Self { + pub(crate) fn decode(data: &[u8]) -> Self { Self { errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), } } + fn encode(&self) -> Vec<u8> { [ u64::to_be_bytes(self.errors), @@ -583,7 +597,8 @@ impl ErrorCounter { (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER) } - fn next_try(&self) -> u64 { + + pub(crate) fn next_try(&self) -> u64 { self.last_try + self.delay_msec() } } diff --git a/src/db/lib.rs b/src/db/lib.rs index d96586be..11cae4e3 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -181,6 +181,10 @@ impl Tree { pub fn len(&self) -> Result<usize> { self.0.len(self.1) } + #[inline] + pub fn fast_len(&self) -> Result<Option<usize>> { + self.0.fast_len(self.1) + } #[inline] pub fn first(&self) -> Result<Option<(Value, Value)>> { @@ -323,6 +327,9 @@ pub(crate) trait IDb: Send + Sync { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn len(&self, tree: usize) -> Result<usize>; + fn fast_len(&self, _tree: usize) -> Result<Option<usize>> { + Ok(None) + } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index c036c990..31956612 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -121,6 +121,10 @@ impl IDb for LmdbDb { Ok(tree.len(&tx)?.try_into().unwrap()) } + fn fast_len(&self, tree: usize) -> Result<Option<usize>> { + Ok(Some(self.len(tree)?)) + } + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { let tree = self.get_tree(tree)?; let mut tx = self.db.write_txn()?; diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 886fda6e..63b4506e 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -144,6 +144,10 @@ impl IDb for SqliteDb { } } + fn fast_len(&self, tree: usize) -> Result<Option<usize>> { + Ok(Some(self.len(tree)?)) + } + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { trace!("insert {}: lock db", tree); let this = self.0.lock().unwrap(); diff --git a/src/garage/admin.rs b/src/garage/admin.rs index e973cfe7..1ca3698a 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::Error as GarageError; +use garage_util::formater::format_table_to_string; use garage_util::time::*; use garage_table::replication::*; @@ -15,6 +16,7 @@ use garage_table::*; use garage_rpc::*; +use garage_block::manager::BlockResyncErrorInfo; use garage_block::repair::ScrubWorkerCommand; use garage_model::bucket_alias_table::*; @@ -24,6 +26,8 @@ use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; use garage_model::permission::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::Version; use crate::cli::*; use crate::repair::online::launch_online_repair; @@ -38,7 +42,8 @@ pub enum AdminRpc { LaunchRepair(RepairOpt), Migrate(MigrateOpt), Stats(StatsOpt), - Worker(WorkerOpt), + Worker(WorkerOperation), + BlockOperation(BlockOperation), // Replies Ok(String), @@ -54,6 +59,13 @@ pub enum AdminRpc { HashMap<usize, garage_util::background::WorkerInfo>, WorkerListOpt, ), + WorkerInfo(usize, garage_util::background::WorkerInfo), + BlockErrorList(Vec<BlockResyncErrorInfo>), + BlockInfo { + hash: Hash, + refcount: u64, + versions: Vec<Result<Version, Uuid>>, + }, } impl Rpc for AdminRpc { @@ -73,6 +85,8 @@ impl AdminRpcHandler { admin } + // ================ BUCKET COMMANDS ==================== + async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> { match cmd { BucketOperation::List => self.handle_list_buckets().await, @@ -551,6 +565,8 @@ impl AdminRpcHandler { Ok(AdminRpc::Ok(ret)) } + // ================ KEY COMMANDS ==================== + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => self.handle_list_keys().await, @@ -688,6 +704,8 @@ impl AdminRpcHandler { Ok(AdminRpc::KeyInfo(key, relevant_buckets)) } + // ================ MIGRATION COMMANDS ==================== + async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> { if !opt.yes { return Err(Error::BadRequest( @@ -704,6 +722,8 @@ impl AdminRpcHandler { Ok(AdminRpc::Ok("Migration successfull.".into())) } + // ================ REPAIR COMMANDS ==================== + async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> { if !opt.yes { return Err(Error::BadRequest( @@ -747,6 +767,8 @@ impl AdminRpcHandler { } } + // ================ STATS COMMANDS ==================== + async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { if opt.all_nodes { let mut ret = String::new(); @@ -763,11 +785,12 @@ impl AdminRpcHandler { match self .endpoint .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await? + .await { - Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(), + Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), + Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), + Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), + Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), } } Ok(AdminRpc::Ok(ret)) @@ -787,6 +810,7 @@ impl AdminRpcHandler { .unwrap_or_else(|| "(unknown)".into()), ) .unwrap(); + writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); // Gather ring statistics @@ -805,21 +829,38 @@ impl AdminRpcHandler { writeln!(&mut ret, " {:?} {}", n, c).unwrap(); } - self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; + // Gather table statistics + let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?); + write!( + &mut ret, + "\nTable stats:\n{}", + format_table_to_string(table) + ) + .unwrap(); + // Gather block manager statistics writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - if opt.detailed { - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - self.garage.block_manager.rc_len()? - ) - .unwrap(); - } + let rc_len = if opt.detailed { + self.garage.block_manager.rc_len()?.to_string() + } else { + self.garage + .block_manager + .rc_fast_len()? + .map(|x| x.to_string()) + .unwrap_or_else(|| "NC".into()) + }; + + writeln!( + &mut ret, + " number of RC entries (~= number of blocks): {}", + rc_len + ) + .unwrap(); writeln!( &mut ret, " resync queue length: {}", @@ -833,67 +874,84 @@ impl AdminRpcHandler { ) .unwrap(); + if !opt.detailed { + writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap(); + } + Ok(ret) } fn gather_table_stats<F, R>( &self, - to: &mut String, t: &Arc<Table<F, R>>, - opt: &StatsOpt, - ) -> Result<(), Error> + detailed: bool, + ) -> Result<String, Error> where F: TableSchema + 'static, R: TableReplication + 'static, { - writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); - if opt.detailed { - writeln!( - to, - " number of items: {}", - t.data.store.len().map_err(GarageError::from)? + let (data_len, mkl_len) = if detailed { + ( + t.data.store.len().map_err(GarageError::from)?.to_string(), + t.merkle_updater.merkle_tree_len()?.to_string(), ) - .unwrap(); - writeln!( - to, - " Merkle tree size: {}", - t.merkle_updater.merkle_tree_len()? + } else { + ( + t.data + .store + .fast_len() + .map_err(GarageError::from)? + .map(|x| x.to_string()) + .unwrap_or_else(|| "NC".into()), + t.merkle_updater + .merkle_tree_fast_len()? + .map(|x| x.to_string()) + .unwrap_or_else(|| "NC".into()), ) - .unwrap(); - } - writeln!( - to, - " Merkle updater todo queue length: {}", - t.merkle_updater.todo_len()? - ) - .unwrap(); - writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap(); + }; - Ok(()) + Ok(format!( + " {}\t{}\t{}\t{}\t{}", + F::TABLE_NAME, + data_len, + mkl_len, + t.merkle_updater.todo_len()?, + t.data.gc_todo_len()? + )) } - // ---- + // ================ WORKER COMMANDS ==================== - async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> { - match opt.cmd { - WorkerCmd::List { opt } => { + async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> { + match cmd { + WorkerOperation::List { opt } => { let workers = self.garage.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, opt)) + Ok(AdminRpc::WorkerList(workers, *opt)) } - WorkerCmd::Set { opt } => match opt { + WorkerOperation::Info { tid } => { + let info = self + .garage + .background + .get_worker_info() + .get(tid) + .ok_or_bad_request(format!("No worker with TID {}", tid))? + .clone(); + Ok(AdminRpc::WorkerInfo(*tid, info)) + } + WorkerOperation::Set { opt } => match opt { WorkerSetCmd::ScrubTranquility { tranquility } => { - let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility); + let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); self.garage .block_manager .send_scrub_command(scrub_command) .await; Ok(AdminRpc::Ok("Scrub tranquility updated".into())) } - WorkerSetCmd::ResyncNWorkers { n_workers } => { + WorkerSetCmd::ResyncWorkerCount { worker_count } => { self.garage .block_manager .resync - .set_n_workers(n_workers) + .set_n_workers(*worker_count) .await?; Ok(AdminRpc::Ok("Number of resync workers updated".into())) } @@ -901,13 +959,154 @@ impl AdminRpcHandler { self.garage .block_manager .resync - .set_tranquility(tranquility) + .set_tranquility(*tranquility) .await?; Ok(AdminRpc::Ok("Resync tranquility updated".into())) } }, } } + + // ================ BLOCK COMMANDS ==================== + + async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> { + match cmd { + BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList( + self.garage.block_manager.list_resync_errors()?, + )), + BlockOperation::Info { hash } => { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let refcount = self.garage.block_manager.get_block_rc(&hash)?; + let block_refs = self + .garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = self + .garage + .version_table + .get(&br.version, &EmptyKey) + .await? + { + versions.push(Ok(v)); + } else { + versions.push(Err(br.version)); + } + } + Ok(AdminRpc::BlockInfo { + hash, + refcount, + versions, + }) + } + BlockOperation::RetryNow { all, blocks } => { + if *all { + if !blocks.is_empty() { + return Err(Error::BadRequest( + "--all was specified, cannot also specify blocks".into(), + )); + } + let blocks = self.garage.block_manager.list_resync_errors()?; + for b in blocks.iter() { + self.garage.block_manager.resync.clear_backoff(&b.hash)?; + } + Ok(AdminRpc::Ok(format!( + "{} blocks returned in queue for a retry now (check logs to see results)", + blocks.len() + ))) + } else { + for hash in blocks { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + self.garage.block_manager.resync.clear_backoff(&hash)?; + } + Ok(AdminRpc::Ok(format!( + "{} blocks returned in queue for a retry now (check logs to see results)", + blocks.len() + ))) + } + } + BlockOperation::Purge { yes, blocks } => { + if !yes { + return Err(Error::BadRequest( + "Pass the --yes flag to confirm block purge operation.".into(), + )); + } + + let mut obj_dels = 0; + let mut ver_dels = 0; + + for hash in blocks { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let block_refs = self + .garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + + for br in block_refs { + let version = match self + .garage + .version_table + .get(&br.version, &EmptyKey) + .await? + { + Some(v) => v, + None => continue, + }; + + if let Some(object) = self + .garage + .object_table + .get(&version.bucket_id, &version.key) + .await? + { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == br.version { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + version.bucket_id, + version.key.clone(), + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete( + ObjectVersionData::DeleteMarker, + ), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + obj_dels += 1; + } + } + } + + if !version.deleted.get() { + let deleted_version = Version::new( + version.uuid, + version.bucket_id, + version.key.clone(), + true, + ); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + Ok(AdminRpc::Ok(format!( + "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + blocks.len(), + obj_dels, + ver_dels + ))) + } + } + } } #[async_trait] @@ -923,7 +1122,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await, + AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, + AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index c8b96489..6c5598b1 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -41,6 +41,9 @@ pub async fn cli_command_dispatch( } Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, + Command::Block(bo) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await + } _ => unreachable!(), } } @@ -186,7 +189,20 @@ pub async fn cmd_admin( print_key_info(&key, &rb); } AdminRpc::WorkerList(wi, wlo) => { - print_worker_info(wi, wlo); + print_worker_list(wi, wlo); + } + AdminRpc::WorkerInfo(tid, wi) => { + print_worker_info(tid, wi); + } + AdminRpc::BlockErrorList(el) => { + print_block_error_list(el); + } + AdminRpc::BlockInfo { + hash, + refcount, + versions, + } => { + print_block_info(hash, refcount, versions); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index cb085813..e2f632f3 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -49,7 +49,11 @@ pub enum Command { /// Manage background workers #[structopt(name = "worker", version = garage_version())] - Worker(WorkerOpt), + Worker(WorkerOperation), + + /// Low-level debug operations on data blocks + #[structopt(name = "block", version = garage_version())] + Block(BlockOperation), } #[derive(StructOpt, Debug)] @@ -502,20 +506,17 @@ pub struct StatsOpt { pub detailed: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct WorkerOpt { - #[structopt(subcommand)] - pub cmd: WorkerCmd, -} - #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum WorkerCmd { +pub enum WorkerOperation { /// List all workers on Garage node #[structopt(name = "list", version = garage_version())] List { #[structopt(flatten)] opt: WorkerListOpt, }, + /// Get detailed information about a worker + #[structopt(name = "info", version = garage_version())] + Info { tid: usize }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] Set { @@ -540,9 +541,41 @@ pub enum WorkerSetCmd { #[structopt(name = "scrub-tranquility", version = garage_version())] ScrubTranquility { tranquility: u32 }, /// Set number of concurrent block resync workers - #[structopt(name = "resync-n-workers", version = garage_version())] - ResyncNWorkers { n_workers: usize }, + #[structopt(name = "resync-worker-count", version = garage_version())] + ResyncWorkerCount { worker_count: usize }, /// Set tranquility of block resync operations #[structopt(name = "resync-tranquility", version = garage_version())] ResyncTranquility { tranquility: u32 }, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum BlockOperation { + /// List all blocks that currently have a resync error + #[structopt(name = "list-errors", version = garage_version())] + ListErrors, + /// Get detailed information about a single block + #[structopt(name = "info", version = garage_version())] + Info { + /// Hash of the block for which to retrieve information + hash: String, + }, + /// Retry now the resync of one or many blocks + #[structopt(name = "retry-now", version = garage_version())] + RetryNow { + /// Retry all blocks that have a resync error + #[structopt(long = "all")] + all: bool, + /// Hashes of the block to retry to resync now + blocks: Vec<String>, + }, + /// Delete all objects referencing a missing block + #[structopt(name = "purge", version = garage_version())] + Purge { + /// Mandatory to confirm this operation + #[structopt(long = "yes")] + yes: bool, + /// Hashes of the block to purge + #[structopt(required = true)] + blocks: Vec<String>, + }, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 396938ae..63fd9eba 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -3,14 +3,17 @@ use std::time::Duration; use garage_util::background::*; use garage_util::crdt::*; -use garage_util::data::Uuid; +use garage_util::data::*; use garage_util::error::*; use garage_util::formater::format_table; use garage_util::time::*; +use garage_block::manager::BlockResyncErrorInfo; + use garage_model::bucket_table::*; use garage_model::key_table::*; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; +use garage_model::s3::version_table::Version; use crate::cli::structs::WorkerListOpt; @@ -241,7 +244,7 @@ pub fn find_matching_node( } } -pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { +pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { let mut wi = wi.into_iter().collect::<Vec<_>>(); wi.sort_by_key(|(tid, info)| { ( @@ -254,7 +257,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { ) }); - let mut table = vec![]; + let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; for (tid, info) in wi.iter() { if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { continue; @@ -263,33 +266,147 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { continue; } - table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); - if let Some(i) = &info.info { - table.push(format!("\t\t {}", i)); - } let tf = timeago::Formatter::new(); - let (err_ago, err_msg) = info + let err_ago = info .last_error .as_ref() - .map(|(m, t)| { - ( - tf.convert(Duration::from_millis(now_msec() - t)), - m.as_str(), - ) - }) - .unwrap_or(("(?) ago".into(), "(?)")); - if info.consecutive_errors > 0 { + .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + tid, + info.state, + info.name, + info.status + .tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.status.progress.as_deref().unwrap_or("-"), + info.status + .queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); + } + format_table(table); +} + +pub fn print_worker_info(tid: usize, info: WorkerInfo) { + let mut table = vec![]; + table.push(format!("Task id:\t{}", tid)); + table.push(format!("Worker name:\t{}", info.name)); + match info.state { + WorkerState::Throttled(t) => { table.push(format!( - "\t\t {} consecutive errors ({} total), last {}", - info.consecutive_errors, info.errors, err_ago, + "Worker state:\tBusy (throttled, paused for {:.3}s)", + t )); - table.push(format!("\t\t {}", err_msg)); - } else if info.errors > 0 { - table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,)); - if wlo.errors { - table.push(format!("\t\t {}", err_msg)); + } + s => { + table.push(format!("Worker state:\t{}", s)); + } + }; + if let Some(tql) = info.status.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some((s, t)) = info.last_error { + table.push(format!("Last error:\t{}", s)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_millis(now_msec() - t)) + )); + } + + table.push("".into()); + if let Some(p) = info.status.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.status.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.status.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.status.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); } } format_table(table); } + +pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { + let now = now_msec(); + let tf = timeago::Formatter::new(); + let mut tf2 = timeago::Formatter::new(); + tf2.ago(""); + + let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; + for e in el { + table.push(format!( + "{}\t{}\t{}\t{}\tin {}", + hex::encode(e.hash.as_slice()), + e.refcount, + e.error_count, + tf.convert(Duration::from_millis(now - e.last_try)), + tf2.convert(Duration::from_millis(e.next_try - now)) + )); + } + format_table(table); +} + +pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) { + println!("Block hash: {}", hex::encode(hash.as_slice())); + println!("Refcount: {}", refcount); + println!(); + + let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut nondeleted_count = 0; + for v in versions.iter() { + match v { + Ok(ver) => { + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}", + ver.uuid, + ver.bucket_id, + ver.key, + ver.deleted.get() + )); + if !ver.deleted.get() { + nondeleted_count += 1; + } + } + Err(vh) => { + table.push(format!("{:?}\t\t\tyes", vh)); + } + } + } + format_table(table); + + if refcount != nondeleted_count { + println!(); + println!("Warning: refcount does not match number of non-deleted versions"); + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index edda734b..107b1389 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -127,9 +127,16 @@ async fn main() { std::process::abort(); })); + // Parse arguments and dispatch command line + let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches()); + // Initialize logging as well as other libraries used in Garage if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "netapp=info,garage=info") + let default_log = match &opt.cmd { + Command::Server => "netapp=info,garage=info", + _ => "netapp=warn,garage=warn", + }; + std::env::set_var("RUST_LOG", default_log) } tracing_subscriber::fmt() .with_writer(std::io::stderr) @@ -137,9 +144,6 @@ async fn main() { .init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - // Parse arguments and dispatch command line - let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches()); - let res = match opt.cmd { Command::Server => server::run_server(opt.config_file).await, Command::OfflineRepair(repair_opt) => { @@ -182,9 +186,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); // Find and parse the address of the target host - let (id, addr) = if let Some(h) = opt.rpc_host { + let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?; - (id, addrs[0]) + (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) .err_context(READ_KEY_ERROR)?; @@ -195,24 +199,26 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { .ok_or_message("unable to resolve rpc_public_addr specified in config file")? .next() .ok_or_message("unable to resolve rpc_public_addr specified in config file")?; - (node_id, a) + (node_id, a, false) } else { let default_addr = SocketAddr::new( "127.0.0.1".parse().unwrap(), config.as_ref().unwrap().rpc_bind_addr.port(), ); - warn!( - "Trying to contact Garage node at default address {}", - default_addr - ); - warn!("If this doesn't work, consider adding rpc_public_addr in your config file or specifying the -h command line parameter."); - (node_id, default_addr) + (node_id, default_addr, true) } }; // Connect to target host - netapp.clone().try_connect(addr, id).await - .err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; + if let Err(e) = netapp.clone().try_connect(addr, id).await { + if is_default_addr { + warn!( + "Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.", + addr + ); + } + Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; + } let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index e33cf097..42221c2a 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -85,8 +85,11 @@ impl Worker for RepairVersionsWorker { "Version repair worker".into() } - fn info(&self) -> Option<String> { - Some(format!("{} items done", self.counter)) + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(self.counter.to_string()), + ..Default::default() + } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { @@ -163,8 +166,11 @@ impl Worker for RepairBlockrefsWorker { "Block refs repair worker".into() } - fn info(&self) -> Option<String> { - Some(format!("{} items done", self.counter)) + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(self.counter.to_string()), + ..Default::default() + } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index e6394f0c..b9594406 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -404,14 +404,13 @@ impl<T: CountedItem> IndexPropagatorWorker<T> { #[async_trait] impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { fn name(&self) -> String { - format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + format!("{} counter", T::COUNTER_TABLE_NAME) } - fn info(&self) -> Option<String> { - if !self.buf.is_empty() { - Some(format!("{} items in queue", self.buf.len())) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.buf.len() as u64), + ..Default::default() } } diff --git a/src/table/data.rs b/src/table/data.rs index 3212e82b..93da2110 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -58,7 +58,13 @@ where .expect("Unable to open DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); - let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); + let metrics = TableMetrics::new( + F::TABLE_NAME, + store.clone(), + merkle_tree.clone(), + merkle_todo.clone(), + gc_todo.clone(), + ); Arc::new(Self { system, diff --git a/src/table/gc.rs b/src/table/gc.rs index 83e7eeff..cfdc9d2d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -330,12 +330,10 @@ where format!("{} GC", F::TABLE_NAME) } - fn info(&self) -> Option<String> { - let l = self.gc.data.gc_todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), + ..Default::default() } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a5c29723..e977bfb5 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -293,6 +293,10 @@ where Ok(self.data.merkle_tree.len()?) } + pub fn merkle_tree_fast_len(&self) -> Result<Option<usize>, Error> { + Ok(self.data.merkle_tree.fast_len()?) + } + pub fn todo_len(&self) -> Result<usize, Error> { Ok(self.data.merkle_todo.len()?) } @@ -310,15 +314,13 @@ where R: TableReplication + 'static, { fn name(&self) -> String { - format!("{} Merkle tree updater", F::TABLE_NAME) + format!("{} Merkle", F::TABLE_NAME) } - fn info(&self) -> Option<String> { - let l = self.0.todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.todo_len().unwrap_or(0) as u64), + ..Default::default() } } diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 3a1783e0..8318a84f 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -5,6 +5,8 @@ use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { + pub(crate) _table_size: ValueObserver<u64>, + pub(crate) _merkle_tree_size: ValueObserver<u64>, pub(crate) _merkle_todo_len: ValueObserver<u64>, pub(crate) _gc_todo_len: ValueObserver<u64>, @@ -20,9 +22,43 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self { + pub fn new( + table_name: &'static str, + store: db::Tree, + merkle_tree: db::Tree, + merkle_todo: db::Tree, + gc_todo: CountedTree, + ) -> Self { let meter = global::meter(table_name); TableMetrics { + _table_size: meter + .u64_value_observer( + "table.size", + move |observer| { + if let Ok(Some(v)) = store.fast_len() { + observer.observe( + v as u64, + &[KeyValue::new("table_name", table_name)], + ); + } + }, + ) + .with_description("Number of items in table") + .init(), + _merkle_tree_size: meter + .u64_value_observer( + "table.merkle_tree_size", + move |observer| { + if let Ok(Some(v)) = merkle_tree.fast_len() { + observer.observe( + v as u64, + &[KeyValue::new("table_name", table_name)], + ); + } + }, + ) + .with_description("Number of nodes in table's Merkle tree") + .init(), _merkle_todo_len: meter .u64_value_observer( "table.merkle_updater_todo_queue_length", diff --git a/src/table/sync.rs b/src/table/sync.rs index 9d79d856..af7aa640 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -570,12 +570,10 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor format!("{} sync", F::TABLE_NAME) } - fn info(&self) -> Option<String> { - let l = self.todo.len(); - if l > 0 { - Some(format!("{} partitions remaining", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.todo.len() as u64), + ..Default::default() } } diff --git a/src/table/util.rs b/src/table/util.rs index 20595a94..0b10cf3f 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -49,3 +49,9 @@ impl EnumerationOrder { } } } + +impl Default for EnumerationOrder { + fn default() -> Self { + EnumerationOrder::Forward + } +} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 619f5068..fd9258b8 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -29,13 +29,24 @@ pub struct BackgroundRunner { #[derive(Clone, Serialize, Deserialize, Debug)] pub struct WorkerInfo { pub name: String, - pub info: Option<String>, + pub status: WorkerStatus, pub state: WorkerState, pub errors: usize, pub consecutive_errors: usize, pub last_error: Option<(String, u64)>, } +/// WorkerStatus is a struct returned by the worker with a bunch of canonical +/// fields to indicate their status to CLI users. All fields are optional. +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct WorkerStatus { + pub tranquility: Option<u32>, + pub progress: Option<String>, + pub queue_length: Option<u64>, + pub persistent_errors: Option<u64>, + pub freeform: Vec<String>, +} + impl BackgroundRunner { /// Create a new BackgroundRunner pub fn new( diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index f5e3addb..7e9da7f8 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; -use crate::background::WorkerInfo; +use crate::background::{WorkerInfo, WorkerStatus}; use crate::error::Error; use crate::time::now_msec; @@ -26,7 +26,7 @@ impl std::fmt::Display for WorkerState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { WorkerState::Busy => write!(f, "Busy"), - WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerState::Throttled(_) => write!(f, "Busy*"), WorkerState::Idle => write!(f, "Idle"), WorkerState::Done => write!(f, "Done"), } @@ -37,8 +37,8 @@ impl std::fmt::Display for WorkerState { pub trait Worker: Send { fn name(&self) -> String; - fn info(&self) -> Option<String> { - None + fn status(&self) -> WorkerStatus { + Default::default() } /// Work: do a basic unit of work, if one is available (otherwise, should return @@ -119,7 +119,7 @@ impl WorkerProcessor { match wi.get_mut(&worker.task_id) { Some(i) => { i.state = worker.state; - i.info = worker.worker.info(); + i.status = worker.worker.status(); i.errors = worker.errors; i.consecutive_errors = worker.consecutive_errors; if worker.last_error.is_some() { @@ -130,7 +130,7 @@ impl WorkerProcessor { wi.insert(worker.task_id, WorkerInfo { name: worker.worker.name(), state: worker.state, - info: worker.worker.info(), + status: worker.worker.status(), errors: worker.errors, consecutive_errors: worker.consecutive_errors, last_error: worker.last_error.take(), diff --git a/src/util/formater.rs b/src/util/formater.rs index 95324f9a..2ea53ebb 100644 --- a/src/util/formater.rs +++ b/src/util/formater.rs @@ -1,4 +1,4 @@ -pub fn format_table(data: Vec<String>) { +pub fn format_table_to_string(data: Vec<String>) -> String { let data = data .iter() .map(|s| s.split('\t').collect::<Vec<_>>()) @@ -24,5 +24,9 @@ pub fn format_table(data: Vec<String>) { out.push('\n'); } - print!("{}", out); + out +} + +pub fn format_table(data: Vec<String>) { + print!("{}", format_table_to_string(data)); } |