aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-12 17:16:49 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-12 17:17:05 +0100
commitde9d6cddf709e686ada3d1e71de7b31d7704b8b5 (patch)
treece97129877b6d0b1839f22e42fa3d8e5fb9f07c7
parentf7c65e830e66c9887d989a8281d8fae7f76f9c8c (diff)
downloadgarage-de9d6cddf709e686ada3d1e71de7b31d7704b8b5.tar.gz
garage-de9d6cddf709e686ada3d1e71de7b31d7704b8b5.zip
Prettier worker list table; remove useless CLI log messages
-rw-r--r--src/block/repair.rs55
-rw-r--r--src/block/resync.rs25
-rw-r--r--src/garage/cli/util.rs55
-rw-r--r--src/garage/main.rs36
-rw-r--r--src/garage/repair/online.rs14
-rw-r--r--src/model/index_counter.rs11
-rw-r--r--src/table/gc.rs10
-rw-r--r--src/table/merkle.rs12
-rw-r--r--src/table/sync.rs10
-rw-r--r--src/util/background/mod.rs13
-rw-r--r--src/util/background/worker.rs12
11 files changed, 138 insertions, 115 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index e2884b69..eed40599 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -53,7 +53,7 @@ impl Worker for RepairWorker {
"Block repair worker".into()
}
- fn info(&self) -> Option<String> {
+ fn status(&self) -> WorkerStatus {
match self.block_iter.as_ref() {
None => {
let idx_bytes = self
@@ -66,9 +66,17 @@ impl Worker for RepairWorker {
} else {
idx_bytes
};
- Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ WorkerStatus {
+ progress: Some("Phase 1".into()),
+ freeform: vec![format!("Now at: {}", hex::encode(idx_bytes))],
+ ..Default::default()
+ }
}
- Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ Some(bi) => WorkerStatus {
+ progress: Some(format!("{:.2}%", bi.progress() * 100.)),
+ freeform: vec!["Phase 2".into()],
+ ..Default::default()
+ },
}
}
@@ -271,29 +279,28 @@ impl Worker for ScrubWorker {
"Block scrub worker".into()
}
- fn info(&self) -> Option<String> {
- let s = match &self.work {
- ScrubWorkerState::Running(bsi) => format!(
- "{:.2}% done (tranquility = {})",
- bsi.progress() * 100.,
- self.persisted.tranquility
- ),
+ fn status(&self) -> WorkerStatus {
+ let mut s = WorkerStatus {
+ persistent_errors: Some(self.persisted.corruptions_detected),
+ tranquility: Some(self.persisted.tranquility),
+ ..Default::default()
+ };
+ match &self.work {
+ ScrubWorkerState::Running(bsi) => {
+ s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ }
ScrubWorkerState::Paused(bsi, rt) => {
- format!(
- "Paused, {:.2}% done, resumes at {}",
- bsi.progress() * 100.,
- msec_to_rfc3339(*rt)
- )
+ s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ s.freeform = vec![format!("Paused, resumes at {}", msec_to_rfc3339(*rt))];
}
- ScrubWorkerState::Finished => format!(
- "Last completed scrub: {}",
- msec_to_rfc3339(self.persisted.time_last_complete_scrub)
- ),
- };
- Some(format!(
- "{} ; corruptions detected: {}",
- s, self.persisted.corruptions_detected
- ))
+ ScrubWorkerState::Finished => {
+ s.freeform = vec![format!(
+ "Completed {}",
+ msec_to_rfc3339(self.persisted.time_last_complete_scrub)
+ )];
+ }
+ }
+ s
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
diff --git a/src/block/resync.rs b/src/block/resync.rs
index ada3ac54..875ead9b 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -477,27 +477,22 @@ impl Worker for ResyncWorker {
format!("Block resync worker #{}", self.index + 1)
}
- fn info(&self) -> Option<String> {
+ fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load();
if self.index >= persisted.n_workers {
- return Some("(unused)".into());
+ return WorkerStatus {
+ freeform: vec!["(unused)".into()],
+ ..Default::default()
+ };
}
- let mut ret = vec![];
- ret.push(format!("tranquility = {}", persisted.tranquility));
-
- let qlen = self.manager.resync.queue_len().unwrap_or(0);
- if qlen > 0 {
- ret.push(format!("{} blocks in queue", qlen));
- }
-
- let elen = self.manager.resync.errors_len().unwrap_or(0);
- if elen > 0 {
- ret.push(format!("{} blocks in error state", elen));
+ WorkerStatus {
+ queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
+ tranquility: Some(persisted.tranquility),
+ persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
+ ..Default::default()
}
-
- Some(ret.join(", "))
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 396938ae..1f098b47 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -254,7 +254,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
)
});
- let mut table = vec![];
+ let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue;
@@ -263,33 +263,38 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
continue;
}
- table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
- if let Some(i) = &info.info {
- table.push(format!("\t\t {}", i));
- }
let tf = timeago::Formatter::new();
- let (err_ago, err_msg) = info
+ let err_ago = info
.last_error
.as_ref()
- .map(|(m, t)| {
- (
- tf.convert(Duration::from_millis(now_msec() - t)),
- m.as_str(),
- )
- })
- .unwrap_or(("(?) ago".into(), "(?)"));
- if info.consecutive_errors > 0 {
- table.push(format!(
- "\t\t {} consecutive errors ({} total), last {}",
- info.consecutive_errors, info.errors, err_ago,
- ));
- table.push(format!("\t\t {}", err_msg));
- } else if info.errors > 0 {
- table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
- if wlo.errors {
- table.push(format!("\t\t {}", err_msg));
- }
- }
+ .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ tid,
+ info.state,
+ info.name,
+ info.status
+ .tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or("-".into()),
+ info.status.progress.as_deref().unwrap_or("-"),
+ info.status
+ .queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or("-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
}
format_table(table);
}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index edda734b..107b1389 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -127,9 +127,16 @@ async fn main() {
std::process::abort();
}));
+ // Parse arguments and dispatch command line
+ let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
+
// Initialize logging as well as other libraries used in Garage
if std::env::var("RUST_LOG").is_err() {
- std::env::set_var("RUST_LOG", "netapp=info,garage=info")
+ let default_log = match &opt.cmd {
+ Command::Server => "netapp=info,garage=info",
+ _ => "netapp=warn,garage=warn",
+ };
+ std::env::set_var("RUST_LOG", default_log)
}
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
@@ -137,9 +144,6 @@ async fn main() {
.init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
- // Parse arguments and dispatch command line
- let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
-
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file).await,
Command::OfflineRepair(repair_opt) => {
@@ -182,9 +186,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host
- let (id, addr) = if let Some(h) = opt.rpc_host {
+ let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
- (id, addrs[0])
+ (id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
@@ -195,24 +199,26 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?
.next()
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
- (node_id, a)
+ (node_id, a, false)
} else {
let default_addr = SocketAddr::new(
"127.0.0.1".parse().unwrap(),
config.as_ref().unwrap().rpc_bind_addr.port(),
);
- warn!(
- "Trying to contact Garage node at default address {}",
- default_addr
- );
- warn!("If this doesn't work, consider adding rpc_public_addr in your config file or specifying the -h command line parameter.");
- (node_id, default_addr)
+ (node_id, default_addr, true)
}
};
// Connect to target host
- netapp.clone().try_connect(addr, id).await
- .err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
+ if let Err(e) = netapp.clone().try_connect(addr, id).await {
+ if is_default_addr {
+ warn!(
+ "Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.",
+ addr
+ );
+ }
+ Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
+ }
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index e33cf097..42221c2a 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -85,8 +85,11 @@ impl Worker for RepairVersionsWorker {
"Version repair worker".into()
}
- fn info(&self) -> Option<String> {
- Some(format!("{} items done", self.counter))
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(self.counter.to_string()),
+ ..Default::default()
+ }
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@@ -163,8 +166,11 @@ impl Worker for RepairBlockrefsWorker {
"Block refs repair worker".into()
}
- fn info(&self) -> Option<String> {
- Some(format!("{} items done", self.counter))
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(self.counter.to_string()),
+ ..Default::default()
+ }
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index e6394f0c..b9594406 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -404,14 +404,13 @@ impl<T: CountedItem> IndexPropagatorWorker<T> {
#[async_trait]
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
fn name(&self) -> String {
- format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
+ format!("{} counter", T::COUNTER_TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- if !self.buf.is_empty() {
- Some(format!("{} items in queue", self.buf.len()))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.buf.len() as u64),
+ ..Default::default()
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 83e7eeff..cfdc9d2d 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -330,12 +330,10 @@ where
format!("{} GC", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.gc.data.gc_todo_len().unwrap_or(0);
- if l > 0 {
- Some(format!("{} items in queue", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
+ ..Default::default()
}
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index a5c29723..6f8a19b6 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -310,15 +310,13 @@ where
R: TableReplication + 'static,
{
fn name(&self) -> String {
- format!("{} Merkle tree updater", F::TABLE_NAME)
+ format!("{} Merkle", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.0.todo_len().unwrap_or(0);
- if l > 0 {
- Some(format!("{} items in queue", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
+ ..Default::default()
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 9d79d856..af7aa640 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -570,12 +570,10 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
format!("{} sync", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.todo.len();
- if l > 0 {
- Some(format!("{} partitions remaining", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.todo.len() as u64),
+ ..Default::default()
}
}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 619f5068..fd9258b8 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -29,13 +29,24 @@ pub struct BackgroundRunner {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
pub name: String,
- pub info: Option<String>,
+ pub status: WorkerStatus,
pub state: WorkerState,
pub errors: usize,
pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>,
}
+/// WorkerStatus is a struct returned by the worker with a bunch of canonical
+/// fields to indicate their status to CLI users. All fields are optional.
+#[derive(Clone, Serialize, Deserialize, Debug, Default)]
+pub struct WorkerStatus {
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index f5e3addb..7e9da7f8 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
-use crate::background::WorkerInfo;
+use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error;
use crate::time::now_msec;
@@ -26,7 +26,7 @@ impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerState::Busy => write!(f, "Busy"),
- WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerState::Throttled(_) => write!(f, "Busy*"),
WorkerState::Idle => write!(f, "Idle"),
WorkerState::Done => write!(f, "Done"),
}
@@ -37,8 +37,8 @@ impl std::fmt::Display for WorkerState {
pub trait Worker: Send {
fn name(&self) -> String;
- fn info(&self) -> Option<String> {
- None
+ fn status(&self) -> WorkerStatus {
+ Default::default()
}
/// Work: do a basic unit of work, if one is available (otherwise, should return
@@ -119,7 +119,7 @@ impl WorkerProcessor {
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.state = worker.state;
- i.info = worker.worker.info();
+ i.status = worker.worker.status();
i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors;
if worker.last_error.is_some() {
@@ -130,7 +130,7 @@ impl WorkerProcessor {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
state: worker.state,
- info: worker.worker.info(),
+ status: worker.worker.status(),
errors: worker.errors,
consecutive_errors: worker.consecutive_errors,
last_error: worker.last_error.take(),