diff options
author | Alex Auvolat <alex@adnab.me> | 2022-11-07 21:12:11 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-11-07 21:12:11 +0100 |
commit | 73a4ca8b1515f95bf7860fc292c12db83d3c6228 (patch) | |
tree | da57568dca4a6fbf4cb812f8dc8d5d9fef92d2b2 /src/rpc | |
parent | fd5bc142b553d716c8265d83cff0bb633aa09e6b (diff) | |
download | garage-73a4ca8b1515f95bf7860fc292c12db83d3c6228.tar.gz garage-73a4ca8b1515f95bf7860fc292c12db83d3c6228.zip |
Use bytes as capacity units
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/graph_algo.rs | 34 | ||||
-rw-r--r-- | src/rpc/layout.rs | 62 |
3 files changed, 50 insertions, 47 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 5a427131..1b411c6a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -18,6 +18,7 @@ garage_util = { version = "0.8.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" +bytesize = "1.1" gethostname = "0.2" hex = "0.4" tracing = "0.1.30" diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs index 1e4a819b..f181e2ba 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/graph_algo.rs @@ -23,8 +23,8 @@ pub enum Vertex { /// Edge data structure for the flow algorithm. #[derive(Clone, Copy, Debug)] pub struct FlowEdge { - cap: u32, // flow maximal capacity of the edge - flow: i32, // flow value on the edge + cap: u64, // flow maximal capacity of the edge + flow: i64, // flow value on the edge dest: usize, // destination vertex id rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v } @@ -32,7 +32,7 @@ pub struct FlowEdge { /// Edge data structure for the detection of negative cycles. #[derive(Clone, Copy, Debug)] pub struct WeightedEdge { - w: i32, // weight of the edge + w: i64, // weight of the edge dest: usize, } @@ -51,7 +51,7 @@ pub struct Graph<E: Edge> { graph: Vec<Vec<E>>, } -pub type CostFunction = HashMap<(Vertex, Vertex), i32>; +pub type CostFunction = HashMap<(Vertex, Vertex), i64>; impl<E: Edge> Graph<E> { pub fn new(vertices: &[Vertex]) -> Self { @@ -77,7 +77,7 @@ impl<E: Edge> Graph<E> { impl Graph<FlowEdge> { /// This function adds a directed edge to the graph with capacity c, and the /// corresponding reversed edge with capacity 0. - pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> { + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { let idu = self.get_vertex_id(&u)?; let idv = self.get_vertex_id(&v)?; if idu == idv { @@ -115,7 +115,7 @@ impl Graph<FlowEdge> { } /// This function returns the value of the flow incoming to v. - pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> { + pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> { let idv = self.get_vertex_id(&v)?; let mut result = 0; for edge in self.graph[idv].iter() { @@ -125,7 +125,7 @@ impl Graph<FlowEdge> { } /// This function returns the value of the flow outgoing from v. - pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> { + pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> { let idv = self.get_vertex_id(&v)?; let mut result = 0; for edge in self.graph[idv].iter() { @@ -136,7 +136,7 @@ impl Graph<FlowEdge> { /// This function computes the flow total value by computing the outgoing flow /// from the source. - pub fn get_flow_value(&mut self) -> Result<i32, String> { + pub fn get_flow_value(&mut self) -> Result<i64, String> { self.get_outflow(Vertex::Source) } @@ -156,7 +156,7 @@ impl Graph<FlowEdge> { } /// Computes an upper bound of the flow on the graph - pub fn flow_upper_bound(&self) -> Result<u32, String> { + pub fn flow_upper_bound(&self) -> Result<u64, String> { let idsource = self.get_vertex_id(&Vertex::Source)?; let mut flow_upper_bound = 0; for edge in self.graph[idsource].iter() { @@ -193,7 +193,7 @@ impl Graph<FlowEdge> { // it means id has not yet been reached level[id] = Some(lvl); for edge in self.graph[id].iter() { - if edge.cap as i32 - edge.flow > 0 { + if edge.cap as i64 - edge.flow > 0 { fifo.push_back((edge.dest, lvl + 1)); } } @@ -216,10 +216,10 @@ impl Graph<FlowEdge> { lifo.pop(); while let Some((id, _)) = lifo.pop() { let nbd = next_nbd[id]; - self.graph[id][nbd].flow += f as i32; + self.graph[id][nbd].flow += f as i64; let id_rev = self.graph[id][nbd].dest; let nbd_rev = self.graph[id][nbd].rev; - self.graph[id_rev][nbd_rev].flow -= f as i32; + self.graph[id_rev][nbd_rev].flow -= f as i64; } lifo.push((idsource, flow_upper_bound)); continue; @@ -236,9 +236,9 @@ impl Graph<FlowEdge> { } // else we can try to send flow from id to its nbd let new_flow = min( - f as i32, - self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow, - ) as u32; + f as i64, + self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, + ) as u64; if new_flow == 0 { next_nbd[id] += 1; continue; @@ -302,7 +302,7 @@ impl Graph<FlowEdge> { let nb_vertices = self.id_to_vertex.len(); for i in 0..nb_vertices { for edge in self.graph[i].iter() { - if edge.cap as i32 - edge.flow > 0 { + if edge.cap as i64 - edge.flow > 0 { // It is possible to send overflow through this edge let u = self.id_to_vertex[i]; let v = self.id_to_vertex[edge.dest]; @@ -322,7 +322,7 @@ impl Graph<FlowEdge> { impl Graph<WeightedEdge> { /// This function adds a single directed weighted edge to the graph. - pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> { + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { let idu = self.get_vertex_id(&u)?; let idv = self.get_vertex_id(&v)?; self.graph[idu].push(WeightedEdge { w, dest: idv }); diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 15765662..3c80b213 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -2,7 +2,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; -use hex::ToHex; +use bytesize::ByteSize; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -32,7 +32,7 @@ pub struct ClusterLayout { /// This attribute is only used to retain the previously computed partition size, /// to know to what extent does it change with the layout update. - pub partition_size: u32, + pub partition_size: u64, /// Parameters used to compute the assignation currently given by /// ring_assignation_data pub parameters: LayoutParameters, @@ -86,8 +86,7 @@ pub struct NodeRole { /// The capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes - // TODO : change the capacity to u64 and use byte unit input/output - pub capacity: Option<u32>, + pub capacity: Option<u64>, /// A set of tags to recognize the node pub tags: Vec<String>, } @@ -95,7 +94,7 @@ pub struct NodeRole { impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { - Some(c) => format!("{}", c), + Some(c) => ByteSize::b(c).to_string_as(false), None => "gateway".to_string(), } } @@ -264,7 +263,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } /// Given a node uuids, this function returns its capacity or fails if it does not have any - pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u32, Error> { + pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> { match self.node_role(uuid) { Some(NodeRole { capacity: Some(cap), @@ -300,7 +299,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } /// Returns the sum of capacities of non gateway nodes in the cluster - pub fn get_total_capacity(&self) -> Result<u32, Error> { + pub fn get_total_capacity(&self) -> Result<u64, Error> { let mut total_capacity = 0; for uuid in self.nongateway_nodes().iter() { total_capacity += self.get_node_capacity(uuid)?; @@ -458,13 +457,14 @@ impl ClusterLayout { if old_assignation_opt != None { msg.push(format!( "Optimal size of a partition: {} (was {} in the previous layout).", - partition_size, self.partition_size + ByteSize::b(partition_size).to_string_as(false), + ByteSize::b(self.partition_size).to_string_as(false) )); } else { msg.push(format!( "Given the replication and redundancy constraints, the \ optimal size of a partition is {}.", - partition_size + ByteSize::b(partition_size).to_string_as(false) )); } // We write the partition size. @@ -613,7 +613,7 @@ impl ClusterLayout { fn compute_optimal_partition_size( &self, zone_to_id: &HashMap<String, usize>, - ) -> Result<u32, Error> { + ) -> Result<u64, Error> { let empty_set = HashSet::<(usize, usize)>::new(); let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; g.compute_maximal_flow()?; @@ -672,7 +672,7 @@ impl ClusterLayout { /// previous one. fn generate_flow_graph( &self, - partition_size: u32, + partition_size: u64, zone_to_id: &HashMap<String, usize>, exclude_assoc: &HashSet<(usize, usize)>, ) -> Result<Graph<FlowEdge>, Error> { @@ -682,18 +682,18 @@ impl ClusterLayout { let nb_zones = zone_to_id.len(); let redundancy = self.parameters.zone_redundancy; for p in 0..NB_PARTITIONS { - g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; + g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u64)?; g.add_edge( Vertex::Source, Vertex::Pdown(p), - (self.replication_factor - redundancy) as u32, + (self.replication_factor - redundancy) as u64, )?; for z in 0..nb_zones { g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; g.add_edge( Vertex::Pdown(p), Vertex::PZ(p, z), - self.replication_factor as u32, + self.replication_factor as u64, )?; } } @@ -813,17 +813,19 @@ impl ClusterLayout { ) -> Result<Message, Error> { let mut msg = Message::new(); - let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32; + let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; let total_cap = self.get_total_capacity()?; let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); msg.push("".into()); msg.push(format!( "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)", - used_cap, total_cap, percent_cap + ByteSize::b(used_cap).to_string_as(false), + ByteSize::b(total_cap).to_string_as(false), + percent_cap )); msg.push("".into()); msg.push( - "If the percentage is to low, it might be that the \ + "If the percentage is too low, it might be that the \ replication/redundancy constraints force the use of nodes/zones with small \ storage capacities. \ You might want to rebalance the storage capacities or relax the constraints. \ @@ -833,9 +835,9 @@ impl ClusterLayout { msg.push(format!( "Recall that because of the replication factor, the actual available \ storage capacity is {} / {} = {}.", - used_cap, + ByteSize::b(used_cap).to_string_as(false), self.replication_factor, - used_cap / self.replication_factor as u32 + ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false) )); // We define and fill in the following tables @@ -914,34 +916,34 @@ impl ClusterLayout { replicated_partitions )); - let available_cap_z: u32 = self.partition_size * replicated_partitions as u32; + let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; let mut total_cap_z = 0; for n in nodes_of_z.iter() { total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; } let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); msg.push(format!( - " Usable capacity / Total capacity: {}/{} ({:.1}%).", - available_cap_z, total_cap_z, percent_cap_z + " Usable capacity / Total capacity: {} / {} ({:.1}%).", + ByteSize::b(available_cap_z).to_string_as(false), + ByteSize::b(total_cap_z).to_string_as(false), + percent_cap_z )); for n in nodes_of_z.iter() { - let available_cap_n = stored_partitions[*n] as u32 * self.partition_size; + let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; let tags_n = (self .node_role(&self.node_id_vec[*n]) .ok_or("Node not found."))? .tags_string(); msg.push(format!( - " Node {}: {} partitions ({} new) ; \ + " Node {:?}: {} partitions ({} new) ; \ usable/total capacity: {} / {} ({:.1}%) ; tags:{}", - &self.node_id_vec[*n].to_vec()[0..2] - .to_vec() - .encode_hex::<String>(), + self.node_id_vec[*n], stored_partitions[*n], new_partitions[*n], - available_cap_n, - total_cap_n, + ByteSize::b(available_cap_n).to_string_as(false), + ByteSize::b(total_cap_n).to_string_as(false), (available_cap_n as f32) / (total_cap_n as f32) * 100.0, tags_n )); @@ -1041,7 +1043,7 @@ mod tests { fn update_layout( cl: &mut ClusterLayout, node_id_vec: &Vec<u8>, - node_capacity_vec: &Vec<u32>, + node_capacity_vec: &Vec<u64>, node_zone_vec: &Vec<String>, zone_redundancy: usize, ) { |