diff options
-rw-r--r-- | Cargo.lock | 15 | ||||
-rw-r--r-- | Cargo.nix | 19 | ||||
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/garage/admin.rs | 125 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 15 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 5 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/system.rs | 73 |
8 files changed, 220 insertions, 35 deletions
@@ -1230,6 +1230,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", + "systemstat", "tokio", "tokio-stream", "tracing", @@ -3568,6 +3569,20 @@ dependencies = [ ] [[package]] +name = "systemstat" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a24aec24a9312c83999a28e3ef9db7e2afd5c64bf47725b758cdc1cafd5b0bd2" +dependencies = [ + "bytesize", + "lazy_static", + "libc", + "nom", + "time 0.3.9", + "winapi", +] + +[[package]] name = "tempfile" version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "8461dcfb984a8d042fecb5745d5da17912135dbf2a8ef7e6c3ae8e64c03d9744"; + nixifiedLockHash = "e59ef222aaaada125e2a5fccbd215b545740b2abd21ce381c42a7a6e3a7e672a"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1753,6 +1753,7 @@ in serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.81" { inherit profileName; }).out; + systemstat = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".systemstat."0.2.3" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }).out; tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; }).out; @@ -4917,6 +4918,21 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".systemstat."0.2.3" = overridableMkRustCrate (profileName: rec { + name = "systemstat"; + version = "0.2.3"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "a24aec24a9312c83999a28e3ef9db7e2afd5c64bf47725b758cdc1cafd5b0bd2"; }; + dependencies = { + bytesize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.1.0" { inherit profileName; }).out; + lazy_static = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }).out; + libc = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }).out; + ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "nom" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.1" { inherit profileName; }).out; + time = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".time."0.3.9" { inherit profileName; }).out; + ${ if hostPlatform.isWindows then "winapi" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".tempfile."3.3.0" = overridableMkRustCrate (profileName: rec { name = "tempfile"; version = "3.3.0"; @@ -5890,6 +5906,7 @@ in [ "ntsecapi" ] [ "ntstatus" ] [ "objbase" ] + [ "pdh" ] [ "processenv" ] [ "processthreadsapi" ] [ "profileapi" ] @@ -4,7 +4,7 @@ all: clear; cargo build release: - nix-build --arg release true + nix-build --attr pkgs.amd64.release --no-build-output shell: nix-shell diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 305c5c65..4eabebca 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -15,6 +15,7 @@ use garage_util::time::*; use garage_table::replication::*; use garage_table::*; +use garage_rpc::ring::PARTITION_BITS; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; @@ -783,6 +784,7 @@ impl AdminRpcHandler { for node in ring.layout.node_ids().iter() { let mut opt = opt.clone(); opt.all_nodes = false; + opt.skip_global = true; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); @@ -799,6 +801,15 @@ impl AdminRpcHandler { Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), } } + + writeln!(&mut ret, "\n======================").unwrap(); + write!( + &mut ret, + "Cluster statistics:\n\n{}", + self.gather_cluster_stats() + ) + .unwrap(); + Ok(AdminRpc::Ok(ret)) } else { Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) @@ -819,22 +830,6 @@ impl AdminRpcHandler { writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - // Gather ring statistics - let ring = self.garage.system.ring.borrow().clone(); - let mut ring_nodes = HashMap::new(); - for (_i, loc) in ring.partitions().iter() { - for n in ring.get_nodes(loc, ring.replication_factor).iter() { - if !ring_nodes.contains_key(n) { - ring_nodes.insert(*n, 0usize); - } - *ring_nodes.get_mut(n).unwrap() += 1; - } - } - writeln!(&mut ret, "\nRing nodes & partition count:").unwrap(); - for (n, c) in ring_nodes.iter() { - writeln!(&mut ret, " {:?} {}", n, c).unwrap(); - } - // Gather table statistics let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?); @@ -881,12 +876,108 @@ impl AdminRpcHandler { .unwrap(); if !opt.detailed { - writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap(); + writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap(); + } + + if !opt.skip_global { + write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); } Ok(ret) } + fn gather_cluster_stats(&self) -> String { + let mut ret = String::new(); + + // Gather storage node and free space statistics + let layout = &self.garage.system.ring.borrow().layout; + let mut node_partition_count = HashMap::<Uuid, u64>::new(); + for short_id in layout.ring_assignation_data.iter() { + let id = layout.node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = self + .garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::<HashMap<_, _>>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role.map(|x| x.capacity_string()).unwrap_or("?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + ret + } + fn gather_table_stats<F, R>( &self, t: &Arc<Table<F, R>>, diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 46e9113c..af7f1aa1 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -59,18 +59,29 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()]; + let mut healthy_nodes = + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail\tMetaAvail".to_string()]; for adv in status.iter().filter(|adv| adv.is_up) { match layout.roles.get(&adv.id) { Some(NodeRoleV(Some(cfg))) => { + let data_avail = match &adv.status.data_disk_avail { + _ if cfg.capacity.is_none() => "N/A".into(), + Some((avail, total)) => { + let pct = (*avail as f64) / (*total as f64) * 100.; + let avail = bytesize::ByteSize::b(*avail); + format!("{} ({:.1}%)", avail, pct) + } + None => "?".into(), + }; healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}", + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", id = adv.id, host = adv.status.hostname, addr = adv.addr, tags = cfg.tags.join(","), zone = cfg.zone, capacity = cfg.capacity_string(), + data_avail = data_avail, )); } _ => { diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 661a71f0..01ae92da 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -504,6 +504,11 @@ pub struct StatsOpt { /// Gather detailed statistics (this can be long) #[structopt(short = "d", long = "detailed")] pub detailed: bool, + + /// Don't show global cluster stats (internal use in RPC) + #[structopt(skip)] + #[serde(default)] + pub skip_global: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index e9a0929a..2daa0176 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -23,6 +23,7 @@ hex = "0.4" tracing = "0.1.30" rand = "0.8" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } +systemstat = "0.2.3" async-trait = "0.1.7" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 90f6a4c2..a9e91e19 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -118,18 +118,28 @@ pub struct System { /// Path to metadata directory pub metadata_dir: PathBuf, + /// Path to data directory + pub data_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeStatus { /// Hostname of the node pub hostname: String, + /// Replication factor configured on the node pub replication_factor: usize, /// Cluster layout version pub cluster_layout_version: u64, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, + + /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) + #[serde(default)] + pub meta_disk_avail: Option<(u64, u64)>, + /// Disk usage on partition containing data directory (tuple: `(avail, total)`) + #[serde(default)] + pub data_disk_avail: Option<(u64, u64)>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -271,14 +281,8 @@ impl System { } }; - let local_status = NodeStatus { - hostname: gethostname::gethostname() - .into_string() - .unwrap_or_else(|_| "<invalid utf-8>".to_string()), - replication_factor, - cluster_layout_version: cluster_layout.version, - cluster_layout_staging_hash: cluster_layout.staging_hash, - }; + let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir); #[cfg(feature = "metrics")] let metrics = SystemMetrics::new(replication_factor); @@ -379,6 +383,7 @@ impl System { ring, update_ring: Mutex::new(update_ring), metadata_dir: config.metadata_dir.clone(), + data_dir: config.data_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); Ok(sys) @@ -416,12 +421,7 @@ impl System { .get(&n.id.into()) .cloned() .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), + .unwrap_or(NodeStatus::unknown()), }) .collect::<Vec<_>>(); known_nodes @@ -600,6 +600,9 @@ impl System { let ring = self.ring.borrow(); new_si.cluster_layout_version = ring.layout.version; new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + + new_si.update_disk_usage(&self.metadata_dir, &self.data_dir); + self.local_status.swap(Arc::new(new_si)); } @@ -864,6 +867,48 @@ impl EndpointHandler<SystemRpc> for System { } } +impl NodeStatus { + fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { + NodeStatus { + hostname: gethostname::gethostname() + .into_string() + .unwrap_or_else(|_| "<invalid utf-8>".to_string()), + replication_factor, + cluster_layout_version: layout.version, + cluster_layout_staging_hash: layout.staging_hash, + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn unknown() -> Self { + NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path) { + use systemstat::{Platform, System}; + let mounts = System::new().mounts().unwrap_or_default(); + + let mount_avail = |path: &Path| { + mounts + .iter() + .filter(|x| path.starts_with(&x.fs_mounted_on)) + .max_by_key(|x| x.fs_mounted_on.len()) + .map(|x| (x.avail.as_u64(), x.total.as_u64())) + }; + + self.meta_disk_avail = mount_avail(meta_dir); + self.data_disk_avail = mount_avail(data_dir); + } +} + fn get_default_ip() -> Option<IpAddr> { pnet_datalink::interfaces() .iter() |