aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/cli')
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/layout.rs178
-rw-r--r--src/garage/cli/structs.rs47
-rw-r--r--src/garage/cli/util.rs66
4 files changed, 221 insertions, 78 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index cb7a898c..48359614 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -85,7 +85,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
_ => {
- let new_role = match layout.staging.get(&adv.id) {
+ let new_role = match layout.staging_roles.get(&adv.id) {
Some(NodeRoleV(Some(_))) => "(pending)",
_ => "NO ROLE ASSIGNED",
};
@@ -190,8 +190,9 @@ pub async fn cmd_admin(
bucket,
relevant_keys,
counters,
+ mpu_counters,
} => {
- print_bucket_info(&bucket, &relevant_keys, &counters);
+ print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
@@ -215,8 +216,9 @@ pub async fn cmd_admin(
hash,
refcount,
versions,
+ uploads,
} => {
- print_block_info(hash, refcount, versions);
+ print_block_info(hash, refcount, versions, uploads);
}
r => {
error!("Unexpected response: {:?}", r);
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index dc5315a1..3932f115 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,3 +1,5 @@
+use bytesize::ByteSize;
+
use format_table::format_table;
use garage_util::crdt::Crdt;
use garage_util::error::*;
@@ -14,8 +16,8 @@ pub async fn cli_layout_command_dispatch(
rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
- LayoutOperation::Assign(configure_opt) => {
- cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await
+ LayoutOperation::Assign(assign_opt) => {
+ cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await
}
LayoutOperation::Remove(remove_opt) => {
cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
@@ -27,6 +29,9 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Revert(revert_opt) => {
cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
}
+ LayoutOperation::Config(config_opt) => {
+ cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
+ }
}
}
@@ -60,14 +65,14 @@ pub async fn cmd_assign_role(
.collect::<Result<Vec<_>, _>>()?;
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@@ -83,7 +88,7 @@ pub async fn cmd_assign_role(
return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
}
- if args.capacity == Some(0) {
+ if args.capacity == Some(ByteSize::b(0)) {
return Err(Error::Message("Invalid capacity value: 0".into()));
}
@@ -91,7 +96,7 @@ pub async fn cmd_assign_role(
let new_entry = match roles.get(&added_node) {
Some(NodeRoleV(Some(old))) => {
let capacity = match args.capacity {
- Some(c) => Some(c),
+ Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => old.capacity,
};
@@ -108,7 +113,7 @@ pub async fn cmd_assign_role(
}
_ => {
let capacity = match args.capacity {
- Some(c) => Some(c),
+ Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
@@ -125,7 +130,7 @@ pub async fn cmd_assign_role(
};
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@@ -145,13 +150,13 @@ pub async fn cmd_remove_role(
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -166,7 +171,7 @@ pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
if !print_cluster_layout(&layout) {
@@ -176,30 +181,41 @@ pub async fn cmd_show_layout(
println!();
println!("Current cluster layout version: {}", layout.version);
- if print_staging_role_changes(&layout) {
- layout.roles.merge(&layout.staging);
-
- println!();
- println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
- if !print_cluster_layout(&layout) {
- println!("No nodes have a role in the new layout.");
- }
- println!();
+ let has_role_changes = print_staging_role_changes(&layout);
+ let has_param_changes = print_staging_parameters_changes(&layout);
+ if has_role_changes || has_param_changes {
+ let v = layout.version;
+ let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
// will move around when we apply
- if layout.calculate_partition_assignation() {
- println!("To enact the staged role changes, type:");
- println!();
- println!(" garage layout apply --version {}", layout.version + 1);
- println!();
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- layout.version + 1
- );
- } else {
- println!("Not enough nodes have an assigned role to maintain enough copies of data.");
- println!("This new layout cannot yet be applied.");
+ match res_apply {
+ Ok((layout, msg)) => {
+ println!();
+ println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
+ if !print_cluster_layout(&layout) {
+ println!("No nodes have a role in the new layout.");
+ }
+ println!();
+
+ for line in msg.iter() {
+ println!("{}", line);
+ }
+ println!("To enact the staged role changes, type:");
+ println!();
+ println!(" garage layout apply --version {}", v + 1);
+ println!();
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ v + 1)
+ }
+ Err(e) => {
+ println!("Error while trying to compute the assignment: {}", e);
+ println!("This new layout cannot yet be applied.");
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ v + 1)
+ }
}
}
@@ -213,11 +229,14 @@ pub async fn cmd_apply_layout(
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
- let layout = layout.apply_staged_changes(apply_opt.version)?;
+ let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?;
+ for line in msg.iter() {
+ println!("{}", line);
+ }
send_layout(rpc_cli, rpc_host, layout).await?;
- println!("New cluster layout with updated role assignation has been applied in cluster.");
+ println!("New cluster layout with updated role assignment has been applied in cluster.");
println!("Data will now be moved around between nodes accordingly.");
Ok(())
@@ -238,6 +257,45 @@ pub async fn cmd_revert_layout(
Ok(())
}
+pub async fn cmd_config_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ config_opt: ConfigLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut did_something = false;
+ match config_opt.redundancy {
+ None => (),
+ Some(r) => {
+ if r > layout.replication_factor {
+ println!(
+ "The zone redundancy must be smaller or equal to the \
+ replication factor ({}).",
+ layout.replication_factor
+ );
+ } else if r < 1 {
+ println!("The zone redundancy must be at least 1.");
+ } else {
+ layout
+ .staging_parameters
+ .update(LayoutParameters { zone_redundancy: r });
+ println!("The new zone redundancy has been saved ({}).", r);
+ }
+ did_something = true;
+ }
+ }
+
+ if !did_something {
+ return Err(Error::Message(
+ "Please specify an action for `garage layout config` to do".into(),
+ ));
+ }
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+ Ok(())
+}
+
// --- utility ---
pub async fn fetch_layout(
@@ -269,21 +327,39 @@ pub async fn send_layout(
}
pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
- let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
Some(r) => r,
_ => continue,
};
let tags = role.tags.join(",");
- table.push(format!(
- "{:?}\t{}\t{}\t{}",
- id,
- tags,
- role.zone,
- role.capacity_string()
- ));
+ let usage = layout.get_node_usage(id).unwrap_or(0);
+ let capacity = layout.get_node_capacity(id).unwrap_or(0);
+ if capacity > 0 {
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}\t{} ({:.1}%)",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string(),
+ ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
+ (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
+ ));
+ } else {
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ };
}
+ println!();
+ println!("Parameters of the layout computation:");
+ println!("Zone redundancy: {}", layout.parameters.zone_redundancy);
+ println!();
if table.len() == 1 {
false
} else {
@@ -292,9 +368,23 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
}
}
+pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool {
+ let has_changes = *layout.staging_parameters.get() != layout.parameters;
+ if has_changes {
+ println!();
+ println!("==== NEW LAYOUT PARAMETERS ====");
+ println!(
+ "Zone redundancy: {}",
+ layout.staging_parameters.get().zone_redundancy
+ );
+ println!();
+ }
+ has_changes
+}
+
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
let has_changes = layout
- .staging
+ .staging_roles
.items()
.iter()
.any(|(k, _, v)| layout.roles.get(k) != Some(v));
@@ -303,7 +393,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
println!();
println!("==== STAGED ROLE CHANGES ====");
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
- for (id, _, role) in layout.staging.items().iter() {
+ for (id, _, role) in layout.staging_roles.items().iter() {
if layout.roles.get(id) == Some(role) {
continue;
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 6e585e53..9ca4a059 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -17,7 +17,7 @@ pub enum Command {
#[structopt(name = "node", version = garage_version())]
Node(NodeOperation),
- /// Operations on the assignation of node roles in the cluster layout
+ /// Operations on the assignment of node roles in the cluster layout
#[structopt(name = "layout", version = garage_version())]
Layout(LayoutOperation),
@@ -91,6 +91,10 @@ pub enum LayoutOperation {
#[structopt(name = "remove", version = garage_version())]
Remove(RemoveRoleOpt),
+ /// Configure parameters value for the layout computation
+ #[structopt(name = "config", version = garage_version())]
+ Config(ConfigLayoutOpt),
+
/// Show roles currently assigned to nodes and changes staged for commit
#[structopt(name = "show", version = garage_version())]
Show,
@@ -114,9 +118,9 @@ pub struct AssignRoleOpt {
#[structopt(short = "z", long = "zone")]
pub(crate) zone: Option<String>,
- /// Capacity (in relative terms, use 1 to represent your smallest server)
+ /// Storage capacity, in bytes (supported suffixes: B, KB, MB, GB, TB, PB)
#[structopt(short = "c", long = "capacity")]
- pub(crate) capacity: Option<u32>,
+ pub(crate) capacity: Option<bytesize::ByteSize>,
/// Gateway-only node
#[structopt(short = "g", long = "gateway")]
@@ -138,6 +142,13 @@ pub struct RemoveRoleOpt {
}
#[derive(StructOpt, Debug)]
+pub struct ConfigLayoutOpt {
+ /// Zone redundancy parameter
+ #[structopt(short = "r", long = "redundancy")]
+ pub(crate) redundancy: Option<usize>,
+}
+
+#[derive(StructOpt, Debug)]
pub struct ApplyLayoutOpt {
/// Version number of new configuration: this command will fail if
/// it is not exactly 1 + the previous configuration's version
@@ -317,11 +328,11 @@ pub enum KeyOperation {
/// Get key info
#[structopt(name = "info", version = garage_version())]
- Info(KeyOpt),
+ Info(KeyInfoOpt),
/// Create new key
- #[structopt(name = "new", version = garage_version())]
- New(KeyNewOpt),
+ #[structopt(name = "create", version = garage_version())]
+ Create(KeyNewOpt),
/// Rename key
#[structopt(name = "rename", version = garage_version())]
@@ -345,15 +356,18 @@ pub enum KeyOperation {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct KeyOpt {
+pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
+ /// Whether to display the secret key
+ #[structopt(long = "show-secret")]
+ pub show_secret: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
- #[structopt(long = "name", default_value = "Unnamed key")]
+ #[structopt(default_value = "Unnamed key")]
pub name: String,
}
@@ -397,6 +411,10 @@ pub struct KeyImportOpt {
/// Key name
#[structopt(short = "n", default_value = "Imported key")]
pub name: String,
+
+ /// Confirm key import
+ #[structopt(long = "yes")]
+ pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
@@ -432,19 +450,22 @@ pub struct RepairOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
- /// Only do a full sync of metadata tables
+ /// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
Tables,
- /// Only repair (resync/rebalance) the set of stored blocks
+ /// Repair (resync/rebalance) the set of stored blocks
#[structopt(name = "blocks", version = garage_version())]
Blocks,
- /// Only redo the propagation of object deletions to the version table (slow)
+ /// Repropagate object deletions to the version table
#[structopt(name = "versions", version = garage_version())]
Versions,
- /// Only redo the propagation of version deletions to the block ref table (extremely slow)
+ /// Repropagate object deletions to the multipart upload table
+ #[structopt(name = "mpu", version = garage_version())]
+ MultipartUploads,
+ /// Repropagate version deletions to the block ref table
#[structopt(name = "block_refs", version = garage_version())]
BlockRefs,
- /// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
+ /// Verify integrity of all blocks on disc
#[structopt(name = "scrub", version = garage_version())]
Scrub {
#[structopt(subcommand)]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 1140cf22..2232d395 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
-use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
-use garage_model::s3::version_table::Version;
+use garage_model::s3::mpu_table::{self, MultipartUpload};
+use garage_model::s3::object_table;
+use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt;
@@ -135,6 +136,7 @@ pub fn print_bucket_info(
bucket: &Bucket,
relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>,
+ mpu_counters: &HashMap<String, i64>,
) {
let key_name = |k| {
relevant_keys
@@ -148,7 +150,7 @@ pub fn print_bucket_info(
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
let size =
- bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64);
println!(
"\nSize: {} ({})",
size.to_string_as(true),
@@ -156,14 +158,22 @@ pub fn print_bucket_info(
);
println!(
"Objects: {}",
- counters.get(OBJECTS).cloned().unwrap_or_default()
+ *counters.get(object_table::OBJECTS).unwrap_or(&0)
+ );
+ println!(
+ "Unfinished uploads (multipart and non-multipart): {}",
+ *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0)
);
println!(
"Unfinished multipart uploads: {}",
- counters
- .get(UNFINISHED_UPLOADS)
- .cloned()
- .unwrap_or_default()
+ *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0)
+ );
+ let mpu_size =
+ bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64);
+ println!(
+ "Size of unfinished multipart uploads: {} ({})",
+ mpu_size.to_string_as(true),
+ mpu_size.to_string_as(false),
);
println!("\nWebsite access: {}", p.website_config.get().is_some());
@@ -390,29 +400,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
format_table(table);
}
-pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
+pub fn print_block_info(
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ uploads: Vec<MultipartUpload>,
+) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
- let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
+ let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
- table.push(format!(
- "{:?}\t{:?}\t{}\t{:?}",
- ver.uuid,
- ver.bucket_id,
- ver.key,
- ver.deleted.get()
- ));
+ match &ver.backlink {
+ VersionBacklink::Object { bucket_id, key } => {
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t\t{:?}",
+ ver.uuid,
+ bucket_id,
+ key,
+ ver.deleted.get()
+ ));
+ }
+ VersionBacklink::MultipartUpload { upload_id } => {
+ let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t{:?}\t{:?}",
+ ver.uuid,
+ upload.map(|u| u.bucket_id).unwrap_or_default(),
+ upload.map(|u| u.key.as_str()).unwrap_or_default(),
+ upload_id,
+ ver.deleted.get()
+ ));
+ }
+ }
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
- table.push(format!("{:?}\t\t\tyes", vh));
+ table.push(format!("{:?}\t\t\t\tyes", vh));
}
}
}