diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/layout.rs | 118 | ||||
-rw-r--r-- | src/rpc/system.rs | 3 |
2 files changed, 73 insertions, 48 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 16d573c7..8d2b3e17 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; -use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; +use garage_util::crdt::{AutoCrdt, Crdt, LwwMap, Lww}; use garage_util::data::*; use garage_util::error::*; @@ -27,12 +27,10 @@ pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, - #[serde(default="default_one")] - pub zone_redundancy: usize, //This attribute is only used to retain the previously computed partition size, //to know to what extent does it change with the layout update. - #[serde(default="default_zero")] + #[serde(default="default_partition_size")] pub partition_size: u32, pub roles: LwwMap<Uuid, NodeRoleV>, @@ -51,17 +49,31 @@ pub struct ClusterLayout { pub ring_assignation_data: Vec<CompactNodeType>, /// Role changes which are staged for the next version of the layout + #[serde(default="default_layout_parameters")] + pub parameters: Lww<LayoutParameters>, pub staging: LwwMap<Uuid, NodeRoleV>, pub staging_hash: Hash, } -fn default_one() -> usize{ - return 1; -} -fn default_zero() -> u32{ +fn default_partition_size() -> u32{ return 0; } +fn default_layout_parameters() -> Lww<LayoutParameters>{ + Lww::<LayoutParameters>::new(LayoutParameters{ zone_redundancy: 1}) +} + +///This struct is used to set the parameters to be used in the assignation computation +///algorithm. It is stored as a Crdt. +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct LayoutParameters { + pub zone_redundancy:usize, +} + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + const NB_PARTITIONS : usize = 1usize << PARTITION_BITS; #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] @@ -108,18 +120,24 @@ impl NodeRole { } impl ClusterLayout { - pub fn new(replication_factor: usize, zone_redundancy: usize) -> Self { + pub fn new(replication_factor: usize) -> Self { + + //We set the default zone redundancy to be equal to the replication factor, + //i.e. as strict as possible. + let default_parameters = Lww::<LayoutParameters>::new( + LayoutParameters{ zone_redundancy: replication_factor}); + let empty_lwwmap = LwwMap::new(); let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); ClusterLayout { version: 0, replication_factor, - zone_redundancy, partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), + parameters: default_parameters, staging: empty_lwwmap, staging_hash: empty_lwwmap_hash, } @@ -132,6 +150,7 @@ impl ClusterLayout { true } Ordering::Equal => { + self.parameters.merge(&other.parameters); self.staging.merge(&other.staging); let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); @@ -145,7 +164,7 @@ impl ClusterLayout { } } - pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self,Message), Error> { match version { None => { let error = r#" @@ -164,16 +183,14 @@ To know the correct value of the new layout version, invoke `garage layout show` self.roles.merge(&self.staging); self.roles.retain(|(_, _, v)| v.0.is_some()); - if !self.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + let msg = self.calculate_partition_assignation()?; self.staging.clear(); self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); self.version += 1; - Ok(self) + Ok((self,msg)) } pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { @@ -231,24 +248,24 @@ To know the correct value of the new layout version, invoke `garage layout show` } ///Given a node uuids, this function returns the label of its zone - pub fn get_node_zone(&self, uuid : &Uuid) -> Result<String,String> { + pub fn get_node_zone(&self, uuid : &Uuid) -> Result<String,Error> { match self.node_role(uuid) { Some(role) => return Ok(role.zone.clone()), - _ => return Err("The Uuid does not correspond to a node present in the cluster.".to_string()) + _ => return Err(Error::Message("The Uuid does not correspond to a node present in the cluster.".into())) } } ///Given a node uuids, this function returns its capacity or fails if it does not have any - pub fn get_node_capacity(&self, uuid : &Uuid) -> Result<u32,String> { + pub fn get_node_capacity(&self, uuid : &Uuid) -> Result<u32,Error> { match self.node_role(uuid) { Some(NodeRole{capacity : Some(cap), zone: _, tags: _}) => return Ok(*cap), - _ => return Err("The Uuid does not correspond to a node present in the \ - cluster or this node does not have a positive capacity.".to_string()) + _ => return Err(Error::Message("The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity.".into())) } } ///Returns the sum of capacities of non gateway nodes in the cluster - pub fn get_total_capacity(&self) -> Result<u32,String> { + pub fn get_total_capacity(&self) -> Result<u32,Error> { let mut total_capacity = 0; for uuid in self.useful_nodes().iter() { total_capacity += self.get_node_capacity(uuid)?; @@ -311,7 +328,8 @@ To know the correct value of the new layout version, invoke `garage layout show` let zones_of_p = nodes_of_p.iter() .map(|n| self.get_node_zone(&self.node_id_vec[*n as usize]) .expect("Zone not found.")); - if zones_of_p.unique().count() < self.zone_redundancy { + let redundancy = self.parameters.get().zone_redundancy; + if zones_of_p.unique().count() < redundancy { return false; } } @@ -354,7 +372,7 @@ impl ClusterLayout { /// Among such optimal assignation, it minimizes the distance to /// the former assignation (if any) to minimize the amount of /// data to be moved. - pub fn calculate_partition_assignation(&mut self, replication:usize, redundancy:usize) -> Result<Message,String> { + pub fn calculate_partition_assignation(&mut self) -> Result<Message,Error> { //The nodes might have been updated, some might have been deleted. //So we need to first update the list of nodes and retrieve the //assignation. @@ -362,12 +380,12 @@ impl ClusterLayout { //We update the node ids, since the node list might have changed with the staged //changes in the layout. We retrieve the old_assignation reframed with the new ids let old_assignation_opt = self.update_node_id_vec()?; - self.replication_factor = replication; - self.zone_redundancy = redundancy; + let redundancy = self.parameters.get().zone_redundancy; + let mut msg = Message::new(); msg.push(format!("Computation of a new cluster layout where partitions are \ - replicated {} times on at least {} distinct zones.", replication, redundancy)); + replicated {} times on at least {} distinct zones.", self.replication_factor, redundancy)); //We generate for once numerical ids for the zone, to use them as indices in the //flow graphs. @@ -381,6 +399,7 @@ impl ClusterLayout { //In this case, integer rounding plays a marginal role in the percentages of //optimality. let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; + if old_assignation_opt != None { msg.push(format!("Given the replication and redundancy constraint, the \ optimal size of a partition is {}. In the previous layout, it used to \ @@ -392,6 +411,12 @@ impl ClusterLayout { } self.partition_size = partition_size; + if partition_size < 100 { + msg.push("WARNING: The partition size is low (< 100), you might consider to \ + give the nodes capacities in a smaller unit (e.g. Mb instead of Gb) to \ + achieve a more tailored use of your storage ressources.".into()); + } + //We compute a first flow/assignment that is heuristically close to the previous //assignment let mut gflow = self.compute_candidate_assignment( &zone_to_id, &old_assignation_opt)?; @@ -413,7 +438,7 @@ impl ClusterLayout { /// None if the node is not present anymore. /// We work with the assumption that only this function and calculate_new_assignation /// do modify assignation_ring and node_id_vec. - fn update_node_id_vec(&mut self) -> Result< Option< Vec<Vec<usize> > > ,String> { + fn update_node_id_vec(&mut self) -> Result< Option< Vec<Vec<usize> > > ,Error> { // (1) We compute the new node list //Non gateway nodes should be coded on 8bits, hence they must be first in the list //We build the new node ids @@ -423,8 +448,8 @@ impl ClusterLayout { .map(|(k, _, _)| *k).collect(); if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { - return Err(format!("There are more than {} non-gateway nodes in the new \ - layout. This is not allowed.", MAX_NODE_NUMBER).to_string()); + return Err(Error::Message(format!("There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", MAX_NODE_NUMBER).into() )); } let mut new_gateway_nodes: Vec<Uuid> = self.roles.items().iter() @@ -449,8 +474,8 @@ impl ClusterLayout { return Ok(None); } if self.ring_assignation_data.len() != nb_partitions * self.replication_factor { - return Err("The old assignation does not have a size corresponding to \ - the old replication factor or the number of partitions.".to_string()); + return Err(Error::Message("The old assignation does not have a size corresponding to \ + the old replication factor or the number of partitions.".into())); } //We build a translation table between the uuid and new ids @@ -482,14 +507,14 @@ impl ClusterLayout { ///This function generates ids for the zone of the nodes appearing in ///self.node_id_vec. - fn generate_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>),String>{ + fn generate_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>),Error>{ let mut id_to_zone = Vec::<String>::new(); let mut zone_to_id = HashMap::<String,usize>::new(); for uuid in self.node_id_vec.iter() { if self.roles.get(uuid) == None { - return Err("The uuid was not found in the node roles (this should \ - not happen, it might be a critical error).".to_string()); + return Err(Error::Message("The uuid was not found in the node roles (this should \ + not happen, it might be a critical error).".into())); } match self.node_role(&uuid) { Some(r) => if !zone_to_id.contains_key(&r.zone) && r.capacity != None { @@ -504,14 +529,14 @@ impl ClusterLayout { ///This function computes by dichotomy the largest realizable partition size, given ///the layout. - fn compute_optimal_partition_size(&self, zone_to_id: &HashMap<String, usize>) -> Result<u32,String>{ + fn compute_optimal_partition_size(&self, zone_to_id: &HashMap<String, usize>) -> Result<u32,Error>{ let nb_partitions = 1usize << PARTITION_BITS; let empty_set = HashSet::<(usize,usize)>::new(); let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; g.compute_maximal_flow()?; if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() { - return Err("The storage capacity of he cluster is to small. It is \ - impossible to store partitions of size 1.".to_string()); + return Err(Error::Message("The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1.".into())); } let mut s_down = 1; @@ -545,14 +570,15 @@ impl ClusterLayout { return vertices; } - fn generate_flow_graph(&self, size: u32, zone_to_id: &HashMap<String, usize>, exclude_assoc : &HashSet<(usize,usize)>) -> Result<Graph<FlowEdge>, String> { + fn generate_flow_graph(&self, size: u32, zone_to_id: &HashMap<String, usize>, exclude_assoc : &HashSet<(usize,usize)>) -> Result<Graph<FlowEdge>, Error> { let vertices = ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.useful_nodes().len()); let mut g= Graph::<FlowEdge>::new(&vertices); let nb_zones = zone_to_id.len(); + let redundancy = self.parameters.get().zone_redundancy; for p in 0..NB_PARTITIONS { - g.add_edge(Vertex::Source, Vertex::Pup(p), self.zone_redundancy as u32)?; - g.add_edge(Vertex::Source, Vertex::Pdown(p), (self.replication_factor - self.zone_redundancy) as u32)?; + g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; + g.add_edge(Vertex::Source, Vertex::Pdown(p), (self.replication_factor - redundancy) as u32)?; for z in 0..nb_zones { g.add_edge(Vertex::Pup(p) , Vertex::PZ(p,z) , 1)?; g.add_edge(Vertex::Pdown(p) , Vertex::PZ(p,z) , @@ -574,7 +600,7 @@ impl ClusterLayout { fn compute_candidate_assignment(&self, zone_to_id: &HashMap<String, usize>, - old_assoc_opt : &Option<Vec< Vec<usize> >>) -> Result<Graph<FlowEdge>, String > { + old_assoc_opt : &Option<Vec< Vec<usize> >>) -> Result<Graph<FlowEdge>, Error > { //We list the edges that are not used in the old association let mut exclude_edge = HashSet::<(usize,usize)>::new(); @@ -601,7 +627,7 @@ impl ClusterLayout { return Ok(g); } - fn minimize_rebalance_load(&self, gflow: &mut Graph<FlowEdge>, zone_to_id: &HashMap<String, usize>, old_assoc : &Vec< Vec<usize> >) -> Result<(), String > { + fn minimize_rebalance_load(&self, gflow: &mut Graph<FlowEdge>, zone_to_id: &HashMap<String, usize>, old_assoc : &Vec< Vec<usize> >) -> Result<(), Error > { let mut cost = CostFunction::new(); for p in 0..NB_PARTITIONS { for n in old_assoc[p].iter() { @@ -616,7 +642,7 @@ impl ClusterLayout { return Ok(()); } - fn update_ring_from_flow(&mut self, nb_zones : usize, gflow: &Graph<FlowEdge> ) -> Result<(), String>{ + fn update_ring_from_flow(&mut self, nb_zones : usize, gflow: &Graph<FlowEdge> ) -> Result<(), Error>{ self.ring_assignation_data = Vec::<CompactNodeType>::new(); for p in 0..NB_PARTITIONS { for z in 0..nb_zones { @@ -631,8 +657,8 @@ impl ClusterLayout { } if self.ring_assignation_data.len() != NB_PARTITIONS*self.replication_factor { - return Err("Critical Error : the association ring we produced does not \ - have the right size.".to_string()); + return Err(Error::Message("Critical Error : the association ring we produced does not \ + have the right size.".into())); } return Ok(()); } @@ -643,7 +669,7 @@ impl ClusterLayout { fn output_stat(&self , gflow : &Graph<FlowEdge>, old_assoc_opt : &Option< Vec<Vec<usize>> >, zone_to_id: &HashMap<String, usize>, - id_to_zone : &Vec<String>) -> Result<Message, String>{ + id_to_zone : &Vec<String>) -> Result<Message, Error>{ let mut msg = Message::new(); let nb_partitions = 1usize << PARTITION_BITS; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 7eb25195..9e0bfa11 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -196,7 +196,6 @@ impl System { network_key: NetworkKey, background: Arc<BackgroundRunner>, replication_factor: usize, - zone_redundancy: usize, config: &Config, ) -> Result<Arc<Self>, Error> { let node_key = @@ -226,7 +225,7 @@ impl System { "No valid previous cluster layout stored ({}), starting fresh.", e ); - ClusterLayout::new(replication_factor, zone_redundancy) + ClusterLayout::new(replication_factor) } }; |