diff options
author | Alex <alex@adnab.me> | 2023-09-27 09:04:32 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-09-27 09:04:32 +0000 |
commit | aa7eadc799ebd0d668ff29b155255acfdfa1d9b5 (patch) | |
tree | fc50c4b784cc0d380ded4306a83d8237c482149f /src/rpc | |
parent | 1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f (diff) | |
parent | 0e5925fff6d9b3147de4967e1963b4c785d0055f (diff) | |
download | garage-0.9.0-beta3.tar.gz garage-0.9.0-beta3.zip |
Merge pull request 'New layout: fixes and UX improvements' (#634) from new-layout-ux into nextv0.9.0-beta3
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/634
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/graph_algo.rs | 16 | ||||
-rw-r--r-- | src/rpc/layout.rs | 240 | ||||
-rw-r--r-- | src/rpc/system.rs | 4 |
4 files changed, 163 insertions, 98 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 8ccee46f..2f22cd28 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +format_table.workspace = true garage_db.workspace = true garage_util.workspace = true diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs index 65450d64..d8c6c9b9 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/graph_algo.rs @@ -1,7 +1,7 @@ //! This module deals with graph algorithms. //! It is used in layout.rs to build the partition to node assignment. -use rand::prelude::SliceRandom; +use rand::prelude::{SeedableRng, SliceRandom}; use std::cmp::{max, min}; use std::collections::HashMap; use std::collections::VecDeque; @@ -143,7 +143,11 @@ impl Graph<FlowEdge> { /// This function shuffles the order of the edge lists. It keeps the ids of the /// reversed edges consistent. fn shuffle_edges(&mut self) { - let mut rng = rand::thread_rng(); + // We use deterministic randomness so that the layout calculation algorihtm + // will output the same thing every time it is run. This way, the results + // pre-calculated in `garage layout show` will match exactly those used + // in practice with `garage layout apply` + let mut rng = rand::rngs::StdRng::from_seed([0x12u8; 32]); for i in 0..self.graph.len() { self.graph[i].shuffle(&mut rng); // We need to update the ids of the reverse edges. @@ -189,7 +193,7 @@ impl Graph<FlowEdge> { let mut fifo = VecDeque::new(); fifo.push_back((idsource, 0)); while let Some((id, lvl)) = fifo.pop_front() { - if level[id] == None { + if level[id].is_none() { // it means id has not yet been reached level[id] = Some(lvl); for edge in self.graph[id].iter() { @@ -199,7 +203,7 @@ impl Graph<FlowEdge> { } } } - if level[idsink] == None { + if level[idsink].is_none() { // There is no residual flow break; } @@ -383,7 +387,7 @@ fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> { for t in 0..forest.len() { let mut id = t; // while we are on a valid undiscovered node - while time_of_discovery[id] == None { + while time_of_discovery[id].is_none() { time_of_discovery[id] = Some(t); if let Some(i) = forest[id] { id = i; @@ -391,7 +395,7 @@ fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> { break; } } - if forest[id] != None && time_of_discovery[id] == Some(t) { + if forest[id].is_some() && time_of_discovery[id] == Some(t) { // We discovered an id that we explored at this iteration t. // It means we are on a cycle let mut cy = vec![id; 1]; diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index c2655e59..e02a180b 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; +use std::fmt; use bytesize::ByteSize; use itertools::Itertools; @@ -115,7 +116,16 @@ mod v09 { /// algorithm. It is stored as a Crdt. #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] pub struct LayoutParameters { - pub zone_redundancy: usize, + pub zone_redundancy: ZoneRedundancy, + } + + /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies + /// of each partition on at least that number of different zones. + /// Otherwise, copies will be stored on the maximum possible number of zones. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ZoneRedundancy { + AtLeast(usize), + Maximum, } impl garage_util::migrate::Migrate for ClusterLayout { @@ -125,7 +135,6 @@ mod v09 { fn migrate(previous: Self::Previous) -> Self { use itertools::Itertools; - use std::collections::HashSet; // In the old layout, capacities are in an arbitrary unit, // but in the new layout they are in bytes. @@ -152,17 +161,10 @@ mod v09 { .min() .unwrap_or(0); - // Determine zone redundancy parameter - let zone_redundancy = std::cmp::min( - previous.replication_factor, - roles - .items() - .iter() - .filter_map(|(_, _, r)| r.0.as_ref().map(|p| p.zone.as_str())) - .collect::<HashSet<&str>>() - .len(), - ); - let parameters = LayoutParameters { zone_redundancy }; + // By default, zone_redundancy is maximum possible value + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; let mut res = Self { version: previous.version, @@ -193,7 +195,7 @@ mod v09 { .. })) = role { - *cap = *cap * mul; + *cap *= mul; } new_roles.merge_raw(node, *ts, &role); } @@ -224,15 +226,39 @@ impl NodeRole { } } +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::<usize>() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + // Implementation of the ClusterLayout methods unrelated to the assignment algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { - // We set the default zone redundancy to be equal to the replication factor, - // i.e. as strict as possible. + // We set the default zone redundancy to be Maximum, meaning that the maximum + // possible value will be used depending on the cluster topology let parameters = LayoutParameters { - zone_redundancy: replication_factor, + zone_redundancy: ZoneRedundancy::Maximum, }; - let staging_parameters = Lww::<LayoutParameters>::new(parameters.clone()); + let staging_parameters = Lww::<LayoutParameters>::new(parameters); let empty_lwwmap = LwwMap::new(); @@ -296,7 +322,7 @@ To know the correct value of the new layout version, invoke `garage layout show` self.roles.merge(&self.staging_roles); self.roles.retain(|(_, _, v)| v.0.is_some()); - self.parameters = self.staging_parameters.get().clone(); + self.parameters = *self.staging_parameters.get(); self.staging_roles.clear(); self.staging_hash = self.calculate_staging_hash(); @@ -325,7 +351,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } self.staging_roles.clear(); - self.staging_parameters.update(self.parameters.clone()); + self.staging_parameters.update(self.parameters); self.staging_hash = self.calculate_staging_hash(); self.version += 1; @@ -356,7 +382,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let mut result = Vec::<Uuid>::new(); for uuid in self.node_id_vec.iter() { match self.node_role(uuid) { - Some(role) if role.capacity != None => result.push(*uuid), + Some(role) if role.capacity.is_some() => result.push(*uuid), _ => (), } } @@ -418,6 +444,23 @@ To know the correct value of the new layout version, invoke `garage layout show` Ok(total_capacity) } + /// Returns the effective value of the zone_redundancy parameter + fn effective_zone_redundancy(&self) -> usize { + match self.parameters.zone_redundancy { + ZoneRedundancy::AtLeast(v) => v, + ZoneRedundancy::Maximum => { + let n_zones = self + .roles + .items() + .iter() + .filter_map(|(_, _, role)| role.0.as_ref().map(|x| x.zone.as_str())) + .collect::<HashSet<&str>>() + .len(); + std::cmp::min(n_zones, self.replication_factor) + } + } + } + /// Check a cluster layout for internal consistency /// (assignment, roles, parameters, partition size) /// returns true if consistent, false if error @@ -471,6 +514,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } // Check that every partition is associated to distinct nodes + let zone_redundancy = self.effective_zone_redundancy(); let rf = self.replication_factor; for p in 0..(1 << PARTITION_BITS) { let nodes_of_p = self.ring_assignment_data[rf * p..rf * (p + 1)].to_vec(); @@ -485,11 +529,10 @@ To know the correct value of the new layout version, invoke `garage layout show` .expect("Zone not found.") }) .collect::<Vec<_>>(); - let redundancy = self.parameters.zone_redundancy; - if zones_of_p.iter().unique().count() < redundancy { + if zones_of_p.iter().unique().count() < zone_redundancy { return Err(format!( "nodes of partition are in less than {} distinct zones", - redundancy + zone_redundancy )); } } @@ -518,7 +561,7 @@ To know the correct value of the new layout version, invoke `garage layout show` // algorithm. let cl2 = self.clone(); let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap(); - match cl2.compute_optimal_partition_size(&zone_to_id) { + match cl2.compute_optimal_partition_size(&zone_to_id, zone_redundancy) { Ok(s) if s != self.partition_size => { return Err(format!( "partition_size ({}) is different than optimal value ({})", @@ -533,6 +576,8 @@ To know the correct value of the new layout version, invoke `garage layout show` } } +// ==================================================================================== + // Implementation of the ClusterLayout methods related to the assignment algorithm. impl ClusterLayout { /// This function calculates a new partition-to-node assignment. @@ -549,13 +594,15 @@ impl ClusterLayout { // changes in the layout. We retrieve the old_assignment reframed with new ids let old_assignment_opt = self.update_node_id_vec()?; + let zone_redundancy = self.effective_zone_redundancy(); + let mut msg = Message::new(); msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); msg.push("".into()); msg.push(format!( "Partitions are \ replicated {} times on at least {} distinct zones.", - self.replication_factor, self.parameters.zone_redundancy + self.replication_factor, zone_redundancy )); // We generate for once numerical ids for the zones of non gateway nodes, @@ -570,12 +617,12 @@ impl ClusterLayout { nb_nongateway_nodes, self.replication_factor ))); } - if id_to_zone.len() < self.parameters.zone_redundancy { + if id_to_zone.len() < zone_redundancy { return Err(Error::Message(format!( "The number of zones with non-gateway \ nodes ({}) is smaller than the redundancy parameter ({})", id_to_zone.len(), - self.parameters.zone_redundancy + zone_redundancy ))); } @@ -583,18 +630,18 @@ impl ClusterLayout { // Capacities should be given in a unit so that partition size is at least 100. // In this case, integer rounding plays a marginal role in the percentages of // optimality. - let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; + let partition_size = self.compute_optimal_partition_size(&zone_to_id, zone_redundancy)?; - if old_assignment_opt != None { + msg.push("".into()); + if old_assignment_opt.is_some() { msg.push(format!( - "Optimal size of a partition: {} (was {} in the previous layout).", + "Optimal partition size: {} ({} in previous layout)", ByteSize::b(partition_size).to_string_as(false), ByteSize::b(self.partition_size).to_string_as(false) )); } else { msg.push(format!( - "Given the replication and redundancy constraints, the \ - optimal size of a partition is {}.", + "Optimal partition size: {}", ByteSize::b(partition_size).to_string_as(false) )); } @@ -610,7 +657,8 @@ impl ClusterLayout { // We compute a first flow/assignment that is heuristically close to the previous // assignment - let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt)?; + let mut gflow = + self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt, zone_redundancy)?; if let Some(assoc) = &old_assignment_opt { // We minimize the distance to the previous assignment. self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; @@ -618,7 +666,6 @@ impl ClusterLayout { // We display statistics of the computation msg.extend(self.output_stat(&gflow, &old_assignment_opt, &zone_to_id, &id_to_zone)?); - msg.push("".to_string()); // We update the layout structure self.update_ring_from_flow(id_to_zone.len(), &gflow)?; @@ -645,7 +692,7 @@ impl ClusterLayout { .roles .items() .iter() - .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity != None)) + .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity.is_some())) .map(|(k, _, _)| *k) .collect(); @@ -661,7 +708,7 @@ impl ClusterLayout { .roles .items() .iter() - .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity == None)) + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_none())) .map(|(k, _, _)| *k) .collect(); @@ -723,7 +770,7 @@ impl ClusterLayout { for uuid in self.nongateway_nodes().iter() { let r = self.node_role(uuid).unwrap(); - if !zone_to_id.contains_key(&r.zone) && r.capacity != None { + if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { zone_to_id.insert(r.zone.clone(), id_to_zone.len()); id_to_zone.push(r.zone.clone()); } @@ -736,9 +783,10 @@ impl ClusterLayout { fn compute_optimal_partition_size( &self, zone_to_id: &HashMap<String, usize>, + zone_redundancy: usize, ) -> Result<u64, Error> { let empty_set = HashSet::<(usize, usize)>::new(); - let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set, zone_redundancy)?; g.compute_maximal_flow()?; if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { return Err(Error::Message( @@ -751,7 +799,12 @@ impl ClusterLayout { let mut s_down = 1; let mut s_up = self.get_total_capacity()?; while s_down + 1 < s_up { - g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?; + g = self.generate_flow_graph( + (s_down + s_up) / 2, + zone_to_id, + &empty_set, + zone_redundancy, + )?; g.compute_maximal_flow()?; if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { s_up = (s_down + s_up) / 2; @@ -790,18 +843,18 @@ impl ClusterLayout { partition_size: u64, zone_to_id: &HashMap<String, usize>, exclude_assoc: &HashSet<(usize, usize)>, + zone_redundancy: usize, ) -> Result<Graph<FlowEdge>, Error> { let vertices = ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); let mut g = Graph::<FlowEdge>::new(&vertices); let nb_zones = zone_to_id.len(); - let redundancy = self.parameters.zone_redundancy; for p in 0..NB_PARTITIONS { - g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u64)?; + g.add_edge(Vertex::Source, Vertex::Pup(p), zone_redundancy as u64)?; g.add_edge( Vertex::Source, Vertex::Pdown(p), - (self.replication_factor - redundancy) as u64, + (self.replication_factor - zone_redundancy) as u64, )?; for z in 0..nb_zones { g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; @@ -830,6 +883,7 @@ impl ClusterLayout { &self, zone_to_id: &HashMap<String, usize>, prev_assign_opt: &Option<Vec<Vec<usize>>>, + zone_redundancy: usize, ) -> Result<Graph<FlowEdge>, Error> { // We list the (partition,node) associations that are not used in the // previous assignment @@ -847,7 +901,12 @@ impl ClusterLayout { } // We compute the best flow using only the edges used in the previous assignment - let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; + let mut g = self.generate_flow_graph( + self.partition_size, + zone_to_id, + &exclude_edge, + zone_redundancy, + )?; g.compute_maximal_flow()?; // We add the excluded edges and compute the maximal flow with the full graph. @@ -931,29 +990,33 @@ impl ClusterLayout { let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; let total_cap = self.get_total_capacity()?; let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); - msg.push("".into()); msg.push(format!( - "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)", + "Usable capacity / total cluster capacity: {} / {} ({:.1} %)", ByteSize::b(used_cap).to_string_as(false), ByteSize::b(total_cap).to_string_as(false), percent_cap )); - msg.push("".into()); - msg.push( - "If the percentage is too low, it might be that the \ - replication/redundancy constraints force the use of nodes/zones with small \ - storage capacities. \ - You might want to rebalance the storage capacities or relax the constraints. \ - See the detailed statistics below and look for saturated nodes/zones." - .into(), - ); msg.push(format!( - "Recall that because of the replication factor, the actual available \ - storage capacity is {} / {} = {}.", - ByteSize::b(used_cap).to_string_as(false), + "Effective capacity (replication factor {}): {}", self.replication_factor, ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false) )); + if percent_cap < 80. { + msg.push("".into()); + msg.push( + "If the percentage is too low, it might be that the \ + cluster topology and redundancy constraints are forcing the use of nodes/zones with small \ + storage capacities." + .into(), + ); + msg.push( + "You might want to move storage capacity between zones or relax the redundancy constraint." + .into(), + ); + msg.push( + "See the detailed statistics below and look for saturated nodes/zones.".into(), + ); + } // We define and fill in the following tables let storing_nodes = self.nongateway_nodes(); @@ -992,25 +1055,25 @@ impl ClusterLayout { } } - if *prev_assign_opt == None { + if prev_assign_opt.is_none() { new_partitions = stored_partitions.clone(); - new_partitions_zone = stored_partitions_zone.clone(); + //new_partitions_zone = stored_partitions_zone.clone(); } // We display the statistics msg.push("".into()); - if *prev_assign_opt != None { + if prev_assign_opt.is_some() { let total_new_partitions: usize = new_partitions.iter().sum(); msg.push(format!( "A total of {} new copies of partitions need to be \ transferred.", total_new_partitions )); + msg.push("".into()); } - msg.push("".into()); - msg.push("==== DETAILED STATISTICS BY ZONES AND NODES ====".into()); + let mut table = vec![]; for z in 0..id_to_zone.len() { let mut nodes_of_z = Vec::<usize>::new(); for n in 0..storing_nodes.len() { @@ -1020,15 +1083,9 @@ impl ClusterLayout { } let replicated_partitions: usize = nodes_of_z.iter().map(|n| stored_partitions[*n]).sum(); - msg.push("".into()); - - msg.push(format!( - "Zone {}: {} distinct partitions stored ({} new, \ - {} partition copies) ", - id_to_zone[z], - stored_partitions_zone[z], - new_partitions_zone[z], - replicated_partitions + table.push(format!( + "{}\tTags\tPartitions\tCapacity\tUsable capacity", + id_to_zone[z] )); let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; @@ -1037,33 +1094,35 @@ impl ClusterLayout { total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; } let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); - msg.push(format!( - " Usable capacity / Total capacity: {} / {} ({:.1}%).", - ByteSize::b(available_cap_z).to_string_as(false), - ByteSize::b(total_cap_z).to_string_as(false), - percent_cap_z - )); for n in nodes_of_z.iter() { let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; - let tags_n = (self - .node_role(&self.node_id_vec[*n]) - .ok_or("Node not found."))? - .tags_string(); - msg.push(format!( - " Node {:?}: {} partitions ({} new) ; \ - usable/total capacity: {} / {} ({:.1}%) ; tags:{}", + let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string(); + table.push(format!( + " {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)", self.node_id_vec[*n], + tags_n, stored_partitions[*n], new_partitions[*n], - ByteSize::b(available_cap_n).to_string_as(false), ByteSize::b(total_cap_n).to_string_as(false), + ByteSize::b(available_cap_n).to_string_as(false), (available_cap_n as f32) / (total_cap_n as f32) * 100.0, - tags_n )); } + + table.push(format!( + " TOTAL\t\t{} ({} unique)\t{}\t{} ({:.1}%)", + replicated_partitions, + stored_partitions_zone[z], + //new_partitions_zone[z], + ByteSize::b(total_cap_z).to_string_as(false), + ByteSize::b(available_cap_z).to_string_as(false), + percent_cap_z + )); + table.push("".into()); } + msg.push(format_table::format_table_to_string(table)); Ok(msg) } @@ -1125,7 +1184,7 @@ mod tests { let mut curr_zone = 0; - let redundancy = cl.parameters.zone_redundancy; + let redundancy = cl.effective_zone_redundancy(); for replic in 0..cl.replication_factor { for p in 0..NB_PARTITIONS { @@ -1177,8 +1236,9 @@ mod tests { ); cl.staging_roles.merge(&update); } - cl.staging_parameters - .update(LayoutParameters { zone_redundancy }); + cl.staging_parameters.update(LayoutParameters { + zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), + }); cl.staging_hash = cl.calculate_staging_hash(); } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 8fba9580..7fc3c20c 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -668,7 +668,7 @@ impl System { let prev_layout_check = layout.check().is_ok(); if layout.merge(adv) { - if prev_layout_check && !layout.check().is_ok() { + if prev_layout_check && layout.check().is_err() { error!("New cluster layout is invalid, discarding."); return Err(Error::Message( "New cluster layout is invalid, discarding.".into(), @@ -724,7 +724,7 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let not_configured = !self.ring.borrow().layout.check().is_ok(); + let not_configured = self.ring.borrow().layout.check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self |