aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/garage/cli
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/garage/cli')
-rw-r--r--src/garage/cli/cmd.rs30
-rw-r--r--src/garage/cli/layout.rs79
-rw-r--r--src/garage/cli/structs.rs202
-rw-r--r--src/garage/cli/util.rs137
4 files changed, 313 insertions, 135 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index a90277a0..c8b96489 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,6 +1,8 @@
use std::collections::HashSet;
+use std::time::Duration;
use garage_util::error::*;
+use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
@@ -38,13 +40,14 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
_ => unreachable!(),
}
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@@ -85,19 +88,21 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|adv| !adv.is_up);
+ let failure_case_1 = status
+ .iter()
+ .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_)))));
let failure_case_2 = layout
.roles
.items()
.iter()
- .filter(|(_, _, v)| v.0.is_some())
- .any(|(id, _, _)| !status_keys.contains(id));
+ .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some());
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
+ let tf = timeago::Formatter::new();
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
@@ -108,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(),
last_seen = adv
.last_seen_secs_ago
- .map(|s| format!("{}s ago", s))
+ .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
));
}
@@ -144,7 +149,7 @@ pub async fn cmd_connect(
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
@@ -160,15 +165,19 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
+ match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::BucketList(bl) => {
print_bucket_list(bl);
}
- AdminRpc::BucketInfo(bucket, rk) => {
- print_bucket_info(&bucket, &rk);
+ AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ } => {
+ print_bucket_info(&bucket, &relevant_keys, &counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
@@ -176,6 +185,9 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
+ AdminRpc::WorkerList(wi, wlo) => {
+ print_worker_info(wi, wlo);
+ }
r => {
error!("Unexpected response: {:?}", r);
}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index e76f7737..3884bb92 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,6 +1,6 @@
use garage_util::crdt::Crdt;
-use garage_util::data::*;
use garage_util::error::*;
+use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
@@ -36,21 +36,29 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
let added_nodes = args
.node_ids
.iter()
- .map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id))
+ .map(|node_id| {
+ find_matching_node(
+ status
+ .iter()
+ .map(|adv| adv.id)
+ .chain(layout.node_ids().iter().cloned()),
+ node_id,
+ )
+ })
.collect::<Result<Vec<_>, _>>()?;
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
let mut roles = layout.roles.clone();
roles.merge(&layout.staging);
@@ -203,31 +211,9 @@ pub async fn cmd_apply_layout(
rpc_host: NodeID,
apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match apply_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.roles.merge(&layout.staging);
-
- if !layout.calculate_partition_assignation() {
- return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
- }
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
-
- layout.version += 1;
+ let layout = layout.apply_staged_changes(apply_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -242,25 +228,9 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match revert_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.version += 1;
+ let layout = layout.revert_staged_changes(revert_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -275,7 +245,7 @@ pub async fn fetch_layout(
rpc_host: NodeID,
) -> Result<ClusterLayout, Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@@ -291,7 +261,7 @@ pub async fn send_layout(
rpc_cli
.call(
&rpc_host,
- &SystemRpc::AdvertiseClusterLayout(layout),
+ SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
)
.await??;
@@ -323,11 +293,20 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
}
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
- if !layout.staging.items().is_empty() {
+ let has_changes = layout
+ .staging
+ .items()
+ .iter()
+ .any(|(k, _, v)| layout.roles.get(k) != Some(v));
+
+ if has_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for (id, _, role) in layout.staging.items().iter() {
+ if layout.roles.get(id) == Some(role) {
+ continue;
+ }
if let Some(role) = &role.0 {
let tags = role.tags.join(",");
table.push(format!(
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index a0c49aeb..06548e89 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -1,55 +1,65 @@
use serde::{Deserialize, Serialize};
-
use structopt::StructOpt;
+use garage_util::version::garage_version;
+
#[derive(StructOpt, Debug)]
pub enum Command {
/// Run Garage server
- #[structopt(name = "server")]
+ #[structopt(name = "server", version = garage_version())]
Server,
/// Get network status
- #[structopt(name = "status")]
+ #[structopt(name = "status", version = garage_version())]
Status,
/// Operations on individual Garage nodes
- #[structopt(name = "node")]
+ #[structopt(name = "node", version = garage_version())]
Node(NodeOperation),
/// Operations on the assignation of node roles in the cluster layout
- #[structopt(name = "layout")]
+ #[structopt(name = "layout", version = garage_version())]
Layout(LayoutOperation),
/// Operations on buckets
- #[structopt(name = "bucket")]
+ #[structopt(name = "bucket", version = garage_version())]
Bucket(BucketOperation),
/// Operations on S3 access keys
- #[structopt(name = "key")]
+ #[structopt(name = "key", version = garage_version())]
Key(KeyOperation),
/// Run migrations from previous Garage version
/// (DO NOT USE WITHOUT READING FULL DOCUMENTATION)
- #[structopt(name = "migrate")]
+ #[structopt(name = "migrate", version = garage_version())]
Migrate(MigrateOpt),
- /// Start repair of node data
- #[structopt(name = "repair")]
+ /// Start repair of node data on remote node
+ #[structopt(name = "repair", version = garage_version())]
Repair(RepairOpt),
+ /// Offline reparation of node data (these repairs must be run offline
+ /// directly on the server node)
+ #[structopt(name = "offline-repair", version = garage_version())]
+ OfflineRepair(OfflineRepairOpt),
+
/// Gather node statistics
- #[structopt(name = "stats")]
+ #[structopt(name = "stats", version = garage_version())]
Stats(StatsOpt),
+
+ /// Manage background workers
+ #[structopt(name = "worker", version = garage_version())]
+ Worker(WorkerOpt),
}
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
/// Print identifier (public key) of this Garage node
- #[structopt(name = "id")]
+ #[structopt(name = "id", version = garage_version())]
NodeId(NodeIdOpt),
/// Connect to Garage node that is currently isolated from the system
- #[structopt(name = "connect")]
+ #[structopt(name = "connect", version = garage_version())]
Connect(ConnectNodeOpt),
}
@@ -70,23 +80,23 @@ pub struct ConnectNodeOpt {
#[derive(StructOpt, Debug)]
pub enum LayoutOperation {
/// Assign role to Garage node
- #[structopt(name = "assign")]
+ #[structopt(name = "assign", version = garage_version())]
Assign(AssignRoleOpt),
/// Remove role from Garage cluster node
- #[structopt(name = "remove")]
+ #[structopt(name = "remove", version = garage_version())]
Remove(RemoveRoleOpt),
/// Show roles currently assigned to nodes and changes staged for commit
- #[structopt(name = "show")]
+ #[structopt(name = "show", version = garage_version())]
Show,
/// Apply staged changes to cluster layout
- #[structopt(name = "apply")]
+ #[structopt(name = "apply", version = garage_version())]
Apply(ApplyLayoutOpt),
/// Revert staged changes to cluster layout
- #[structopt(name = "revert")]
+ #[structopt(name = "revert", version = garage_version())]
Revert(RevertLayoutOpt),
}
@@ -141,40 +151,44 @@ pub struct RevertLayoutOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
- #[structopt(name = "list")]
+ #[structopt(name = "list", version = garage_version())]
List,
/// Get bucket info
- #[structopt(name = "info")]
+ #[structopt(name = "info", version = garage_version())]
Info(BucketOpt),
/// Create bucket
- #[structopt(name = "create")]
+ #[structopt(name = "create", version = garage_version())]
Create(BucketOpt),
/// Delete bucket
- #[structopt(name = "delete")]
+ #[structopt(name = "delete", version = garage_version())]
Delete(DeleteBucketOpt),
/// Alias bucket under new name
- #[structopt(name = "alias")]
+ #[structopt(name = "alias", version = garage_version())]
Alias(AliasBucketOpt),
/// Remove bucket alias
- #[structopt(name = "unalias")]
+ #[structopt(name = "unalias", version = garage_version())]
Unalias(UnaliasBucketOpt),
/// Allow key to read or write to bucket
- #[structopt(name = "allow")]
+ #[structopt(name = "allow", version = garage_version())]
Allow(PermBucketOpt),
/// Deny key from reading or writing to bucket
- #[structopt(name = "deny")]
+ #[structopt(name = "deny", version = garage_version())]
Deny(PermBucketOpt),
/// Expose as website or not
- #[structopt(name = "website")]
+ #[structopt(name = "website", version = garage_version())]
Website(WebsiteOpt),
+
+ /// Set the quotas for this bucket
+ #[structopt(name = "set-quotas", version = garage_version())]
+ SetQuotas(SetQuotasOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@@ -262,37 +276,52 @@ pub struct PermBucketOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct SetQuotasOpt {
+ /// Bucket name
+ pub bucket: String,
+
+ /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB,
+ /// or `none` for no size restriction)
+ #[structopt(long = "max-size")]
+ pub max_size: Option<String>,
+
+ /// Set a maximum number of objects for the bucket (or `none` for no restriction)
+ #[structopt(long = "max-objects")]
+ pub max_objects: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
- #[structopt(name = "list")]
+ #[structopt(name = "list", version = garage_version())]
List,
/// Get key info
- #[structopt(name = "info")]
+ #[structopt(name = "info", version = garage_version())]
Info(KeyOpt),
/// Create new key
- #[structopt(name = "new")]
+ #[structopt(name = "new", version = garage_version())]
New(KeyNewOpt),
/// Rename key
- #[structopt(name = "rename")]
+ #[structopt(name = "rename", version = garage_version())]
Rename(KeyRenameOpt),
/// Delete key
- #[structopt(name = "delete")]
+ #[structopt(name = "delete", version = garage_version())]
Delete(KeyDeleteOpt),
/// Set permission flags for key
- #[structopt(name = "allow")]
+ #[structopt(name = "allow", version = garage_version())]
Allow(KeyPermOpt),
/// Unset permission flags for key
- #[structopt(name = "deny")]
+ #[structopt(name = "deny", version = garage_version())]
Deny(KeyPermOpt),
/// Import key
- #[structopt(name = "import")]
+ #[structopt(name = "import", version = garage_version())]
Import(KeyImportOpt),
}
@@ -364,7 +393,7 @@ pub struct MigrateOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum MigrateWhat {
/// Migrate buckets and permissions from v0.5.0
- #[structopt(name = "buckets050")]
+ #[structopt(name = "buckets050", version = garage_version())]
Buckets050,
}
@@ -385,27 +414,69 @@ pub struct RepairOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Only do a full sync of metadata tables
- #[structopt(name = "tables")]
+ #[structopt(name = "tables", version = garage_version())]
Tables,
/// Only repair (resync/rebalance) the set of stored blocks
- #[structopt(name = "blocks")]
+ #[structopt(name = "blocks", version = garage_version())]
Blocks,
/// Only redo the propagation of object deletions to the version table (slow)
- #[structopt(name = "versions")]
+ #[structopt(name = "versions", version = garage_version())]
Versions,
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
- #[structopt(name = "block_refs")]
+ #[structopt(name = "block_refs", version = garage_version())]
BlockRefs,
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
- #[structopt(name = "scrub")]
+ #[structopt(name = "scrub", version = garage_version())]
Scrub {
- /// Tranquility factor (see tranquilizer documentation)
- #[structopt(name = "tranquility", default_value = "2")]
+ #[structopt(subcommand)]
+ cmd: ScrubCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum ScrubCmd {
+ /// Start scrub
+ #[structopt(name = "start", version = garage_version())]
+ Start,
+ /// Pause scrub (it will resume automatically after 24 hours)
+ #[structopt(name = "pause", version = garage_version())]
+ Pause,
+ /// Resume paused scrub
+ #[structopt(name = "resume", version = garage_version())]
+ Resume,
+ /// Cancel scrub in progress
+ #[structopt(name = "cancel", version = garage_version())]
+ Cancel,
+ /// Set tranquility level for in-progress and future scrubs
+ #[structopt(name = "set-tranquility", version = garage_version())]
+ SetTranquility {
+ #[structopt()]
tranquility: u32,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct OfflineRepairOpt {
+ /// Confirm the launch of the repair operation
+ #[structopt(long = "yes")]
+ pub yes: bool,
+
+ #[structopt(subcommand)]
+ pub what: OfflineRepairWhat,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum OfflineRepairWhat {
+ /// Repair K2V item counters
+ #[cfg(feature = "k2v")]
+ #[structopt(name = "k2v_item_counters", version = garage_version())]
+ K2VItemCounters,
+ /// Repair object counters
+ #[structopt(name = "object_counters", version = garage_version())]
+ ObjectCounters,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
@@ -415,3 +486,48 @@ pub struct StatsOpt {
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct WorkerOpt {
+ #[structopt(subcommand)]
+ pub cmd: WorkerCmd,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerCmd {
+ /// List all workers on Garage node
+ #[structopt(name = "list", version = garage_version())]
+ List {
+ #[structopt(flatten)]
+ opt: WorkerListOpt,
+ },
+ /// Set worker parameter
+ #[structopt(name = "set", version = garage_version())]
+ Set {
+ #[structopt(subcommand)]
+ opt: WorkerSetCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub struct WorkerListOpt {
+ /// Show only busy workers
+ #[structopt(short = "b", long = "busy")]
+ pub busy: bool,
+ /// Show only workers with errors
+ #[structopt(short = "e", long = "errors")]
+ pub errors: bool,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerSetCmd {
+ /// Set tranquility of scrub operations
+ #[structopt(name = "scrub-tranquility", version = garage_version())]
+ ScrubTranquility { tranquility: u32 },
+ /// Set number of concurrent block resync workers
+ #[structopt(name = "resync-n-workers", version = garage_version())]
+ ResyncNWorkers { n_workers: usize },
+ /// Set tranquility of block resync operations
+ #[structopt(name = "resync-tranquility", version = garage_version())]
+ ResyncTranquility { tranquility: u32 },
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 7d496507..396938ae 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,11 +1,18 @@
use std::collections::HashMap;
+use std::time::Duration;
+use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
+use garage_util::formater::format_table;
+use garage_util::time::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
+use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+
+use crate::cli::structs::WorkerListOpt;
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -28,11 +35,12 @@ pub fn print_bucket_list(bl: Vec<Bucket>) {
[((k, n), _, _)] => format!("{}:{}", k, n),
s => format!("[{} local aliases]", s.len()),
};
+
table.push(format!(
"\t{}\t{}\t{}",
aliases.join(","),
local_aliases_n,
- hex::encode(bucket.id)
+ hex::encode(bucket.id),
));
}
format_table(table);
@@ -120,7 +128,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
}
}
-pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) {
+pub fn print_bucket_info(
+ bucket: &Bucket,
+ relevant_keys: &HashMap<String, Key>,
+ counters: &HashMap<String, i64>,
+) {
let key_name = |k| {
relevant_keys
.get(k)
@@ -132,7 +144,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
- println!("Website access: {}", p.website_config.get().is_some());
+ let size =
+ bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ println!(
+ "\nSize: {} ({})",
+ size.to_string_as(true),
+ size.to_string_as(false)
+ );
+ println!(
+ "Objects: {}",
+ counters.get(OBJECTS).cloned().unwrap_or_default()
+ );
+ println!(
+ "Unfinished multipart uploads: {}",
+ counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default()
+ );
+
+ println!("\nWebsite access: {}", p.website_config.get().is_some());
+
+ let quotas = p.quotas.get();
+ if quotas.max_size.is_some() || quotas.max_objects.is_some() {
+ println!("\nQuotas:");
+ if let Some(ms) = quotas.max_size {
+ let ms = bytesize::ByteSize::b(ms);
+ println!(
+ " maximum size: {} ({})",
+ ms.to_string_as(true),
+ ms.to_string_as(false)
+ );
+ }
+ if let Some(mo) = quotas.max_objects {
+ println!(" maximum number of objects: {}", mo);
+ }
+ }
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {
@@ -173,42 +220,13 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
};
}
-pub fn format_table(data: Vec<String>) {
- let data = data
- .iter()
- .map(|s| s.split('\t').collect::<Vec<_>>())
- .collect::<Vec<_>>();
-
- let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
- let mut column_size = vec![0; columns];
-
- let mut out = String::new();
-
- for row in data.iter() {
- for (i, col) in row.iter().enumerate() {
- column_size[i] = std::cmp::max(column_size[i], col.chars().count());
- }
- }
-
- for row in data.iter() {
- for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
- out.push_str(col);
- (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
- }
- out.push_str(row[row.len() - 1]);
- out.push('\n');
- }
-
- print!("{}", out);
-}
-
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = Uuid>,
pattern: &str,
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
- if hex::encode(&c).starts_with(&pattern) {
+ if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
@@ -222,3 +240,56 @@ pub fn find_matching_node(
Ok(candidates[0])
}
}
+
+pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
+ let mut wi = wi.into_iter().collect::<Vec<_>>();
+ wi.sort_by_key(|(tid, info)| {
+ (
+ match info.state {
+ WorkerState::Busy | WorkerState::Throttled(_) => 0,
+ WorkerState::Idle => 1,
+ WorkerState::Done => 2,
+ },
+ *tid,
+ )
+ });
+
+ let mut table = vec![];
+ for (tid, info) in wi.iter() {
+ if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
+ continue;
+ }
+ if wlo.errors && info.errors == 0 {
+ continue;
+ }
+
+ table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
+ if let Some(i) = &info.info {
+ table.push(format!("\t\t {}", i));
+ }
+ let tf = timeago::Formatter::new();
+ let (err_ago, err_msg) = info
+ .last_error
+ .as_ref()
+ .map(|(m, t)| {
+ (
+ tf.convert(Duration::from_millis(now_msec() - t)),
+ m.as_str(),
+ )
+ })
+ .unwrap_or(("(?) ago".into(), "(?)"));
+ if info.consecutive_errors > 0 {
+ table.push(format!(
+ "\t\t {} consecutive errors ({} total), last {}",
+ info.consecutive_errors, info.errors, err_ago,
+ ));
+ table.push(format!("\t\t {}", err_msg));
+ } else if info.errors > 0 {
+ table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
+ if wlo.errors {
+ table.push(format!("\t\t {}", err_msg));
+ }
+ }
+ }
+ format_table(table);
+}