diff options
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r-- | src/rpc/layout.rs | 579 |
1 files changed, 579 insertions, 0 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs new file mode 100644 index 00000000..895dbf1c --- /dev/null +++ b/src/rpc/layout.rs @@ -0,0 +1,579 @@ +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; + +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; +use garage_util::data::*; + +use crate::ring::*; + +/// The layout of the cluster, i.e. the list of roles +/// which are assigned to each cluster node +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec<Uuid>, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec<CompactNodeType>, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct NodeRoleV(pub Option<NodeRole>); + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +/// The user-assigned roles of cluster nodes +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct NodeRole { + /// Datacenter at which this entry belong. This information might be used to perform a better + /// geodistribution + pub zone: String, + /// The (relative) capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option<u32>, + /// A set of tags to recognize the node + pub tags: Vec<String>, +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => format!("{}", c), + None => "gateway".to_string(), + } + } +} + +impl ClusterLayout { + pub fn new(replication_factor: usize) -> Self { + let empty_lwwmap = LwwMap::new(); + let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); + + ClusterLayout { + version: 0, + replication_factor, + roles: LwwMap::new(), + node_id_vec: Vec::new(), + ring_assignation_data: Vec::new(), + staging: empty_lwwmap, + staging_hash: empty_lwwmap_hash, + } + } + + pub fn merge(&mut self, other: &ClusterLayout) -> bool { + match other.version.cmp(&self.version) { + Ordering::Greater => { + *self = other.clone(); + true + } + Ordering::Equal => { + self.staging.merge(&other.staging); + + let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let changed = new_staging_hash != self.staging_hash; + + self.staging_hash = new_staging_hash; + + changed + } + Ordering::Less => false, + } + } + + /// Returns a list of IDs of nodes that currently have + /// a role in the cluster + pub fn node_ids(&self) -> &[Uuid] { + &self.node_id_vec[..] + } + + pub fn num_nodes(&self) -> usize { + self.node_id_vec.len() + } + + /// Returns the role of a node in the layout + pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { + match self.roles.get(node) { + Some(NodeRoleV(Some(v))) => Some(v), + _ => None, + } + } + + /// Check a cluster layout for internal consistency + /// returns true if consistent, false if error + pub fn check(&self) -> bool { + // Check that the hash of the staging data is correct + let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + if staging_hash != self.staging_hash { + return false; + } + + // Check that node_id_vec contains the correct list of nodes + let mut expected_nodes = self + .roles + .items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(id, _, _)| *id) + .collect::<Vec<_>>(); + expected_nodes.sort(); + let mut node_id_vec = self.node_id_vec.clone(); + node_id_vec.sort(); + if expected_nodes != node_id_vec { + return false; + } + + // Check that the assignation data has the correct length + if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor { + return false; + } + + // Check that the assigned nodes are correct identifiers + // of nodes that are assigned a role + // and that role is not the role of a gateway nodes + for x in self.ring_assignation_data.iter() { + if *x as usize >= self.node_id_vec.len() { + return false; + } + let node = self.node_id_vec[*x as usize]; + match self.roles.get(&node) { + Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), + _ => return false, + } + } + + true + } + + /// Calculate an assignation of partitions to nodes + pub fn calculate_partition_assignation(&mut self) -> bool { + let (configured_nodes, zones) = self.configured_nodes_and_zones(); + let n_zones = zones.len(); + + println!("Calculating updated partition assignation, this may take some time..."); + println!(); + + let old_partitions = self.parse_assignation_data(); + + let mut partitions = old_partitions.clone(); + for part in partitions.iter_mut() { + part.nodes + .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); + } + + // When nodes are removed, or when bootstraping an assignation from + // scratch for a new cluster, the old partitions will have holes (or be empty). + // Here we add more nodes to make a complete (sub-optimal) assignation, + // using an initial partition assignation that is calculated using the multi-dc maglev trick + match self.initial_partition_assignation() { + Some(initial_partitions) => { + for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { + for (id, info) in ipart.nodes.iter() { + if part.nodes.len() < self.replication_factor { + part.add(part.nodes.len() + 1, n_zones, id, info.unwrap()); + } + } + assert!(part.nodes.len() == self.replication_factor); + } + } + None => { + return false; + } + } + + // Calculate how many partitions each node should ideally store, + // and how many partitions they are storing with the current assignation + // This defines our target for which we will optimize in the following loop. + let total_capacity = configured_nodes + .iter() + .map(|(_, info)| info.capacity.unwrap_or(0)) + .sum::<u32>() as usize; + let total_partitions = self.replication_factor * (1 << PARTITION_BITS); + let target_partitions_per_node = configured_nodes + .iter() + .map(|(id, info)| { + ( + *id, + info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, + ) + }) + .collect::<HashMap<&Uuid, usize>>(); + + let mut partitions_per_node = self.partitions_per_node(&partitions[..]); + + println!("Target number of partitions per node:"); + for (node, npart) in target_partitions_per_node.iter() { + println!("{:?}\t{}", node, npart); + } + println!(); + + // Shuffle partitions between nodes so that nodes will reach (or better approach) + // their target number of stored partitions + loop { + let mut option = None; + for (i, part) in partitions.iter_mut().enumerate() { + for (irm, (idrm, _)) in part.nodes.iter().enumerate() { + let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32 + - target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32; + + for (idadd, infoadd) in configured_nodes.iter() { + // skip replacing a node by itself + // and skip replacing by gateway nodes + if idadd == idrm || infoadd.capacity.is_none() { + continue; + } + + let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32 + - target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32; + + // We want to try replacing node idrm by node idadd + // if that brings us close to our goal. + let square = |i: i32| i * i; + let oldcost = square(suprm) + square(supadd); + let newcost = square(suprm - 1) + square(supadd + 1); + if newcost >= oldcost { + // not closer to our goal + continue; + } + let gain = oldcost - newcost; + + let mut newpart = part.clone(); + + newpart.nodes.remove(irm); + if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) { + continue; + } + assert!(newpart.nodes.len() == self.replication_factor); + + if !old_partitions[i] + .is_valid_transition_to(&newpart, self.replication_factor) + { + continue; + } + + if option + .as_ref() + .map(|(old_gain, _, _, _, _)| gain > *old_gain) + .unwrap_or(true) + { + option = Some((gain, i, idadd, idrm, newpart)); + } + } + } + } + if let Some((_gain, i, idadd, idrm, newpart)) = option { + *partitions_per_node.entry(idadd).or_insert(0) += 1; + *partitions_per_node.get_mut(idrm).unwrap() -= 1; + partitions[i] = newpart; + } else { + break; + } + } + + // Check we completed the assignation correctly + // (this is a set of checks for the algorithm's consistency) + assert!(partitions.len() == (1 << PARTITION_BITS)); + assert!(partitions + .iter() + .all(|p| p.nodes.len() == self.replication_factor)); + + let new_partitions_per_node = self.partitions_per_node(&partitions[..]); + assert!(new_partitions_per_node == partitions_per_node); + + // Show statistics + println!("New number of partitions per node:"); + for (node, npart) in partitions_per_node.iter() { + println!("{:?}\t{}", node, npart); + } + println!(); + + let mut diffcount = HashMap::new(); + for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { + let nminus = oldpart.txtplus(newpart); + let nplus = newpart.txtplus(oldpart); + if nminus != "[...]" || nplus != "[...]" { + let tup = (nminus, nplus); + *diffcount.entry(tup).or_insert(0) += 1; + } + } + if diffcount.is_empty() { + println!("No data will be moved between nodes."); + } else { + let mut diffcount = diffcount.into_iter().collect::<Vec<_>>(); + diffcount.sort(); + println!("Number of partitions that move:"); + for ((nminus, nplus), npart) in diffcount { + println!("\t{}\t{} -> {}", npart, nminus, nplus); + } + } + println!(); + + // Calculate and save new assignation data + let (nodes, assignation_data) = + self.compute_assignation_data(&configured_nodes[..], &partitions[..]); + + self.node_id_vec = nodes; + self.ring_assignation_data = assignation_data; + + true + } + + fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> { + let (configured_nodes, zones) = self.configured_nodes_and_zones(); + let n_zones = zones.len(); + + // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) + let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); + + // Prepare ring + let mut partitions: Vec<PartitionAss> = partitions_idx + .iter() + .map(|_i| PartitionAss::new()) + .collect::<Vec<_>>(); + + // Create MagLev priority queues for each node + let mut queues = configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(node_id, node_info)| { + let mut parts = partitions_idx + .iter() + .map(|i| { + let part_data = + [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); + (*i, fasthash(&part_data[..])) + }) + .collect::<Vec<_>>(); + parts.sort_by_key(|(_i, h)| *h); + let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); + (node_id, node_info, parts_i, 0) + }) + .collect::<Vec<_>>(); + + let max_capacity = configured_nodes + .iter() + .filter_map(|(_, node_info)| node_info.capacity) + .fold(0, std::cmp::max); + + // Fill up ring + for rep in 0..self.replication_factor { + queues.sort_by_key(|(ni, _np, _q, _p)| { + let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); + fasthash(&queue_data[..]) + }); + + for (_, _, _, pos) in queues.iter_mut() { + *pos = 0; + } + + let mut remaining = partitions_idx.len(); + while remaining > 0 { + let remaining0 = remaining; + for i_round in 0..max_capacity { + for (node_id, node_info, q, pos) in queues.iter_mut() { + if i_round >= node_info.capacity.unwrap() { + continue; + } + for (pos2, &qv) in q.iter().enumerate().skip(*pos) { + if partitions[qv].add(rep + 1, n_zones, node_id, node_info) { + remaining -= 1; + *pos = pos2 + 1; + break; + } + } + } + } + if remaining == remaining0 { + // No progress made, exit + return None; + } + } + } + + Some(partitions) + } + + fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { + let configured_nodes = self + .roles + .items() + .iter() + .filter(|(_id, _, info)| info.0.is_some()) + .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) + .collect::<Vec<(&Uuid, &NodeRole)>>(); + + let zones = configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(_id, info)| info.zone.as_str()) + .collect::<HashSet<&str>>(); + + (configured_nodes, zones) + } + + fn compute_assignation_data<'a>( + &self, + configured_nodes: &[(&'a Uuid, &'a NodeRole)], + partitions: &[PartitionAss<'a>], + ) -> (Vec<Uuid>, Vec<CompactNodeType>) { + assert!(partitions.len() == (1 << PARTITION_BITS)); + + // Make a canonical order for nodes + let mut nodes = configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(id, _)| **id) + .collect::<Vec<_>>(); + let nodes_rev = nodes + .iter() + .enumerate() + .map(|(i, id)| (*id, i as CompactNodeType)) + .collect::<HashMap<Uuid, CompactNodeType>>(); + + let mut assignation_data = vec![]; + for partition in partitions.iter() { + assert!(partition.nodes.len() == self.replication_factor); + for (id, _) in partition.nodes.iter() { + assignation_data.push(*nodes_rev.get(id).unwrap()); + } + } + + nodes.extend( + configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_none()) + .map(|(id, _)| **id), + ); + + (nodes, assignation_data) + } + + fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> { + if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { + // If the previous assignation data is correct, use that + let mut partitions = vec![]; + for i in 0..(1 << PARTITION_BITS) { + let mut part = PartitionAss::new(); + for node_i in self.ring_assignation_data + [i * self.replication_factor..(i + 1) * self.replication_factor] + .iter() + { + let node_id = &self.node_id_vec[*node_i as usize]; + + if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { + part.nodes.push((node_id, Some(info))); + } else { + part.nodes.push((node_id, None)); + } + } + partitions.push(part); + } + partitions + } else { + // Otherwise start fresh + (0..(1 << PARTITION_BITS)) + .map(|_| PartitionAss::new()) + .collect() + } + } + + fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { + let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); + for p in partitions.iter() { + for (id, _) in p.nodes.iter() { + *partitions_per_node.entry(*id).or_insert(0) += 1; + } + } + partitions_per_node + } +} + +// ---- Internal structs for partition assignation in layout ---- + +#[derive(Clone)] +struct PartitionAss<'a> { + nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, +} + +impl<'a> PartitionAss<'a> { + fn new() -> Self { + Self { nodes: Vec::new() } + } + + fn nplus(&self, other: &PartitionAss<'a>) -> usize { + self.nodes + .iter() + .filter(|x| !other.nodes.contains(x)) + .count() + } + + fn txtplus(&self, other: &PartitionAss<'a>) -> String { + let mut nodes = self + .nodes + .iter() + .filter(|x| !other.nodes.contains(x)) + .map(|x| format!("{:?}", x.0)) + .collect::<Vec<_>>(); + nodes.sort(); + if self.nodes.iter().any(|x| other.nodes.contains(x)) { + nodes.push("...".into()); + } + format!("[{}]", nodes.join(" ")) + } + + fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { + let min_keep_nodes_per_part = (replication_factor + 1) / 2; + let n_removed = self.nplus(other); + + if self.nodes.len() <= min_keep_nodes_per_part { + n_removed == 0 + } else { + n_removed <= self.nodes.len() - min_keep_nodes_per_part + } + } + + fn add( + &mut self, + target_len: usize, + n_zones: usize, + node: &'a Uuid, + role: &'a NodeRole, + ) -> bool { + if self.nodes.len() != target_len - 1 { + return false; + } + + let p_zns = self + .nodes + .iter() + .map(|(_id, info)| info.unwrap().zone.as_str()) + .collect::<HashSet<&str>>(); + if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) + || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) + { + self.nodes.push((node, Some(role))); + true + } else { + false + } + } +} |