From fe9af1dcaae31a117528a9cfa10c422c9a850201 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 17:49:06 +0100 Subject: WIP: garage_rpc: store layout version history --- src/rpc/layout/history.rs | 170 ++++++++ src/rpc/layout/mod.rs | 32 ++ src/rpc/layout/schema.rs | 286 ++++++++++++ src/rpc/layout/tracker.rs | 21 + src/rpc/layout/version.rs | 1052 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1561 insertions(+) create mode 100644 src/rpc/layout/history.rs create mode 100644 src/rpc/layout/mod.rs create mode 100644 src/rpc/layout/schema.rs create mode 100644 src/rpc/layout/tracker.rs create mode 100644 src/rpc/layout/version.rs (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs new file mode 100644 index 00000000..b3019f58 --- /dev/null +++ b/src/rpc/layout/history.rs @@ -0,0 +1,170 @@ +use std::cmp::Ordering; +use std::sync::Arc; + +use garage_util::crdt::{Crdt, Lww, LwwMap}; +use garage_util::data::*; +use garage_util::encode::nonversioned_encode; +use garage_util::error::*; + +use super::schema::*; +use super::*; + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging_parameters = Lww::::new(version.parameters); + let empty_lwwmap = LwwMap::new(); + + let mut ret = LayoutHistory { + versions: vec![version].into_boxed_slice().into(), + update_trackers: Default::default(), + staging_parameters, + staging_roles: empty_lwwmap, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() + } + + pub(crate) fn calculate_staging_hash(&self) -> Hash { + let hashed_tuple = (&self.staging_roles, &self.staging_parameters); + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + } + + // ================== updates to layout, public interface =================== + + pub fn merge(&mut self, other: &LayoutHistory) -> bool { + let mut changed = false; + + // Merge staged layout changes + match other.current().version.cmp(&self.current().version) { + Ordering::Greater => { + self.staging_parameters = other.staging_parameters.clone(); + self.staging_roles = other.staging_roles.clone(); + self.staging_hash = other.staging_hash; + changed = true; + } + Ordering::Equal => { + self.staging_parameters.merge(&other.staging_parameters); + self.staging_roles.merge(&other.staging_roles); + + let new_staging_hash = self.calculate_staging_hash(); + if new_staging_hash != self.staging_hash { + changed = true; + } + + self.staging_hash = new_staging_hash; + } + Ordering::Less => (), + } + + // Add any new versions to history + let mut versions = self.versions.to_vec(); + for v2 in other.versions.iter() { + if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { + if v1 != v2 { + error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); + } + } else if versions.iter().all(|v| v.version != v2.version - 1) { + error!( + "Cannot receive new layout version {}, version {} is missing", + v2.version, + v2.version - 1 + ); + } else { + versions.push(v2.clone()); + changed = true; + } + } + self.versions = Arc::from(versions.into_boxed_slice()); + + // Merge trackers + self.update_trackers.merge(&other.update_trackers); + + changed + } + + pub fn apply_staged_changes(mut self, version: Option) -> Result<(Self, Message), Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + let mut new_version = self.current().clone(); + new_version.version += 1; + + new_version.roles.merge(&self.staging_roles); + new_version.roles.retain(|(_, _, v)| v.0.is_some()); + new_version.parameters = *self.staging_parameters.get(); + + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); + + let msg = new_version.calculate_partition_assignment()?; + + let mut versions = self.versions.to_vec(); + versions.push(new_version); + self.versions = Arc::from(versions.into_boxed_slice()); + + Ok((self, msg)) + } + + pub fn revert_staged_changes(mut self, version: Option) -> Result { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging_roles.clear(); + self.staging_parameters.update(self.current().parameters); + self.staging_hash = self.calculate_staging_hash(); + + // TODO this is stupid, we should have a separate version counter/LWW + // for the staging params + let mut new_version = self.current().clone(); + new_version.version += 1; + + let mut versions = self.versions.to_vec(); + versions.push(new_version); + self.versions = Arc::from(versions.into_boxed_slice()); + + Ok(self) + } + + pub fn check(&self) -> Result<(), String> { + // Check that the hash of the staging data is correct + let staging_hash = self.calculate_staging_hash(); + if staging_hash != self.staging_hash { + return Err("staging_hash is incorrect".into()); + } + + // TODO: anythign more ? + + self.current().check() + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs new file mode 100644 index 00000000..122d4b65 --- /dev/null +++ b/src/rpc/layout/mod.rs @@ -0,0 +1,32 @@ +mod history; +mod schema; +mod tracker; +mod version; + +pub use history::*; +pub use schema::*; +pub use version::*; + +// ---- defines: partitions ---- + +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) +pub type Partition = u16; + +// TODO: make this constant parametrizable in the config file +// For deployments with many nodes it might make sense to bump +// it up to 10. +// Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 +pub const PARTITION_BITS: usize = 8; + +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; + +// ---- defines: nodes ---- + +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs new file mode 100644 index 00000000..fa0822fa --- /dev/null +++ b/src/rpc/layout/schema.rs @@ -0,0 +1,286 @@ +mod v08 { + use crate::layout::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option, + /// A set of tags to recognize the node + pub tags: Vec, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} +} + +mod v09 { + use super::v08; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: ZoneRedundancy, + } + + /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies + /// of each partition on at least that number of different zones. + /// Otherwise, copies will be stored on the maximum possible number of zones. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ZoneRedundancy { + AtLeast(usize), + Maximum, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"G09layout"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // By default, zone_redundancy is maximum possible value + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), // will be set in the next migration + } + } + } + + fn multiply_all_capacities( + old_roles: LwwMap, + mul: u64, + ) -> LwwMap { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap *= mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } +} + +mod v010 { + use super::v09; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + use std::collections::HashMap; + use std::sync::Arc; + pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutVersion { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec, + } + + /// The history of cluster layouts + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct LayoutHistory { + /// The versions currently in use in the cluster + pub versions: Arc<[LayoutVersion]>, + + /// Update trackers + pub update_trackers: UpdateTrackers, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap, + /// Hash of the serialized staging_parameters + staging_roles + pub staging_hash: Hash, + } + + /// The tracker of acknowlegments and data syncs around the cluster + #[derive(Clone, Debug, Serialize, Deserialize, Default)] + pub struct UpdateTrackers { + /// The highest layout version number each node has ack'ed + pub ack_map: UpdateTracker, + /// The highest layout version number each node has synced data for + pub sync_map: UpdateTracker, + /// The highest layout version number each node has + /// ack'ed that all other nodes have synced data for + pub sync_ack_map: UpdateTracker, + } + + /// The history of cluster layouts + #[derive(Clone, Debug, Serialize, Deserialize, Default)] + pub struct UpdateTracker(pub HashMap); + + impl garage_util::migrate::Migrate for LayoutHistory { + const VERSION_MARKER: &'static [u8] = b"G010lh"; + + type Previous = v09::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + let version = LayoutVersion { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size: previous.partition_size, + parameters: previous.parameters, + roles: previous.roles, + node_id_vec: previous.node_id_vec, + ring_assignment_data: previous.ring_assignment_data, + }; + let update_tracker = UpdateTracker( + version + .nongateway_nodes() + .iter() + .map(|x| (*x, version.version)) + .collect::>(), + ); + let mut ret = Self { + versions: Arc::from(vec![version].into_boxed_slice()), + update_trackers: UpdateTrackers { + ack_map: update_tracker.clone(), + sync_map: update_tracker.clone(), + sync_ack_map: update_tracker.clone(), + }, + staging_parameters: previous.staging_parameters, + staging_roles: previous.staging_roles, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + } +} + +pub use v010::*; diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs new file mode 100644 index 00000000..778121e4 --- /dev/null +++ b/src/rpc/layout/tracker.rs @@ -0,0 +1,21 @@ +use super::*; + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs new file mode 100644 index 00000000..363bc204 --- /dev/null +++ b/src/rpc/layout/version.rs @@ -0,0 +1,1052 @@ +use std::collections::HashMap; +use std::collections::HashSet; +use std::fmt; + +use bytesize::ByteSize; +use itertools::Itertools; + +use garage_util::crdt::{AutoCrdt, LwwMap}; +use garage_util::data::*; +use garage_util::error::*; + +use crate::graph_algo::*; + +use std::convert::TryInto; + +use super::schema::*; +use super::*; + +// The Message type will be used to collect information on the algorithm. +pub type Message = Vec; + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl LayoutVersion { + pub fn new(replication_factor: usize) -> Self { + // We set the default zone redundancy to be Maximum, meaning that the maximum + // possible value will be used depending on the cluster topology + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + LayoutVersion { + version: 0, + replication_factor, + partition_size: 0, + roles: LwwMap::new(), + node_id_vec: Vec::new(), + ring_assignment_data: Vec::new(), + parameters, + } + } + + // ===================== accessors ====================== + + /// Returns a list of IDs of nodes that currently have + /// a role in the cluster + pub fn node_ids(&self) -> &[Uuid] { + &self.node_id_vec[..] + } + + pub fn num_nodes(&self) -> usize { + self.node_id_vec.len() + } + + /// Returns the role of a node in the layout + pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { + match self.roles.get(node) { + Some(NodeRoleV(Some(v))) => Some(v), + _ => None, + } + } + + /// Given a node uuids, this function returns its capacity or fails if it does not have any + pub fn get_node_capacity(&self, uuid: &Uuid) -> Result { + match self.node_role(uuid) { + Some(NodeRole { + capacity: Some(cap), + zone: _, + tags: _, + }) => Ok(*cap), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )), + } + } + + /// Returns the number of partitions associated to this node in the ring + pub fn get_node_usage(&self, uuid: &Uuid) -> Result { + for (i, id) in self.node_id_vec.iter().enumerate() { + if id == uuid { + let mut count = 0; + for nod in self.ring_assignment_data.iter() { + if i as u8 == *nod { + count += 1 + } + } + return Ok(count); + } + } + Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )) + } + + /// Get the partition in which data would fall on + pub fn partition_of(&self, position: &Hash) -> Partition { + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); + top >> (16 - PARTITION_BITS) + } + + /// Get the list of partitions and the first hash of a partition key that would fall in it + pub fn partitions(&self) -> Vec<(Partition, Hash)> { + (0..(1 << PARTITION_BITS)) + .map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) + .collect::>() + } + + /// Walk the ring to find the n servers in which data should be replicated + pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec { + assert_eq!(n, self.replication_factor); + + let data = &self.ring_assignment_data; + + if data.len() != self.replication_factor * (1 << PARTITION_BITS) { + warn!("Ring not yet ready, read/writes will be lost!"); + return vec![]; + } + + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + let partition_nodes = &data[partition_start..partition_end]; + + partition_nodes + .iter() + .map(|i| self.node_id_vec[*i as usize]) + .collect::>() + } + + // ===================== internal information extractors ====================== + + /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + pub(crate) fn nongateway_nodes(&self) -> Vec { + let mut result = Vec::::new(); + for uuid in self.node_id_vec.iter() { + match self.node_role(uuid) { + Some(role) if role.capacity.is_some() => result.push(*uuid), + _ => (), + } + } + result + } + + /// Given a node uuids, this function returns the label of its zone + fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { + match self.node_role(uuid) { + Some(role) => Ok(&role.zone), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the cluster.".into(), + )), + } + } + + /// Returns the sum of capacities of non gateway nodes in the cluster + fn get_total_capacity(&self) -> Result { + let mut total_capacity = 0; + for uuid in self.nongateway_nodes().iter() { + total_capacity += self.get_node_capacity(uuid)?; + } + Ok(total_capacity) + } + + /// Returns the effective value of the zone_redundancy parameter + fn effective_zone_redundancy(&self) -> usize { + match self.parameters.zone_redundancy { + ZoneRedundancy::AtLeast(v) => v, + ZoneRedundancy::Maximum => { + let n_zones = self + .roles + .items() + .iter() + .filter_map(|(_, _, role)| role.0.as_ref().map(|x| x.zone.as_str())) + .collect::>() + .len(); + std::cmp::min(n_zones, self.replication_factor) + } + } + } + + /// Check a cluster layout for internal consistency + /// (assignment, roles, parameters, partition size) + /// returns true if consistent, false if error + pub fn check(&self) -> Result<(), String> { + // Check that node_id_vec contains the correct list of nodes + let mut expected_nodes = self + .roles + .items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(id, _, _)| *id) + .collect::>(); + expected_nodes.sort(); + let mut node_id_vec = self.node_id_vec.clone(); + node_id_vec.sort(); + if expected_nodes != node_id_vec { + return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes)); + } + + // Check that the assignment data has the correct length + let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor; + if self.ring_assignment_data.len() != expected_assignment_data_len { + return Err(format!( + "ring_assignment_data has incorrect length {} instead of {}", + self.ring_assignment_data.len(), + expected_assignment_data_len + )); + } + + // Check that the assigned nodes are correct identifiers + // of nodes that are assigned a role + // and that role is not the role of a gateway nodes + for x in self.ring_assignment_data.iter() { + if *x as usize >= self.node_id_vec.len() { + return Err(format!( + "ring_assignment_data contains invalid node id {}", + *x + )); + } + let node = self.node_id_vec[*x as usize]; + match self.roles.get(&node) { + Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), + _ => return Err("ring_assignment_data contains id of a gateway node".into()), + } + } + + // Check that every partition is associated to distinct nodes + let zone_redundancy = self.effective_zone_redundancy(); + let rf = self.replication_factor; + for p in 0..(1 << PARTITION_BITS) { + let nodes_of_p = self.ring_assignment_data[rf * p..rf * (p + 1)].to_vec(); + if nodes_of_p.iter().unique().count() != rf { + return Err(format!("partition does not contain {} unique node ids", rf)); + } + // Check that every partition is spread over at least zone_redundancy zones. + let zones_of_p = nodes_of_p + .iter() + .map(|n| { + self.get_node_zone(&self.node_id_vec[*n as usize]) + .expect("Zone not found.") + }) + .collect::>(); + if zones_of_p.iter().unique().count() < zone_redundancy { + return Err(format!( + "nodes of partition are in less than {} distinct zones", + zone_redundancy + )); + } + } + + // Check that the nodes capacities is consistent with the stored partitions + let mut node_usage = vec![0; MAX_NODE_NUMBER]; + for n in self.ring_assignment_data.iter() { + node_usage[*n as usize] += 1; + } + for (n, usage) in node_usage.iter().enumerate() { + if *usage > 0 { + let uuid = self.node_id_vec[n]; + let partusage = usage * self.partition_size; + let nodecap = self.get_node_capacity(&uuid).unwrap(); + if partusage > nodecap { + return Err(format!( + "node usage ({}) is bigger than node capacity ({})", + usage * self.partition_size, + nodecap + )); + } + } + } + + // Check that the partition size stored is the one computed by the asignation + // algorithm. + let cl2 = self.clone(); + let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap(); + match cl2.compute_optimal_partition_size(&zone_to_id, zone_redundancy) { + Ok(s) if s != self.partition_size => { + return Err(format!( + "partition_size ({}) is different than optimal value ({})", + self.partition_size, s + )) + } + Err(e) => return Err(format!("could not calculate optimal partition size: {}", e)), + _ => (), + } + + Ok(()) + } + + // ================== updates to layout, internals =================== + + /// This function calculates a new partition-to-node assignment. + /// The computed assignment respects the node replication factor + /// and the zone redundancy parameter It maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignment, it minimizes the distance to + /// the former assignment (if any) to minimize the amount of + /// data to be moved. + /// Staged role changes must be merged with nodes roles before calling this function, + /// hence it must only be called from apply_staged_changes() and hence is not public. + pub(crate) fn calculate_partition_assignment(&mut self) -> Result { + // We update the node ids, since the node role list might have changed with the + // changes in the layout. We retrieve the old_assignment reframed with new ids + let old_assignment_opt = self.update_node_id_vec()?; + + let zone_redundancy = self.effective_zone_redundancy(); + + let mut msg = Message::new(); + msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); + msg.push("".into()); + msg.push(format!( + "Partitions are \ + replicated {} times on at least {} distinct zones.", + self.replication_factor, zone_redundancy + )); + + // We generate for once numerical ids for the zones of non gateway nodes, + // to use them as indices in the flow graphs. + let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; + + let nb_nongateway_nodes = self.nongateway_nodes().len(); + if nb_nongateway_nodes < self.replication_factor { + return Err(Error::Message(format!( + "The number of nodes with positive \ + capacity ({}) is smaller than the replication factor ({}).", + nb_nongateway_nodes, self.replication_factor + ))); + } + if id_to_zone.len() < zone_redundancy { + return Err(Error::Message(format!( + "The number of zones with non-gateway \ + nodes ({}) is smaller than the redundancy parameter ({})", + id_to_zone.len(), + zone_redundancy + ))); + } + + // We compute the optimal partition size + // Capacities should be given in a unit so that partition size is at least 100. + // In this case, integer rounding plays a marginal role in the percentages of + // optimality. + let partition_size = self.compute_optimal_partition_size(&zone_to_id, zone_redundancy)?; + + msg.push("".into()); + if old_assignment_opt.is_some() { + msg.push(format!( + "Optimal partition size: {} ({} in previous layout)", + ByteSize::b(partition_size).to_string_as(false), + ByteSize::b(self.partition_size).to_string_as(false) + )); + } else { + msg.push(format!( + "Optimal partition size: {}", + ByteSize::b(partition_size).to_string_as(false) + )); + } + // We write the partition size. + self.partition_size = partition_size; + + if partition_size < 100 { + msg.push( + "WARNING: The partition size is low (< 100), make sure the capacities of your nodes are correct and are of at least a few MB" + .into(), + ); + } + + // We compute a first flow/assignment that is heuristically close to the previous + // assignment + let mut gflow = + self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt, zone_redundancy)?; + if let Some(assoc) = &old_assignment_opt { + // We minimize the distance to the previous assignment. + self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; + } + + // We display statistics of the computation + msg.extend(self.output_stat(&gflow, &old_assignment_opt, &zone_to_id, &id_to_zone)?); + + // We update the layout structure + self.update_ring_from_flow(id_to_zone.len(), &gflow)?; + + if let Err(e) = self.check() { + return Err(Error::Message( + format!("Layout check returned an error: {}\nOriginal result of computation: <<<<\n{}\n>>>>", e, msg.join("\n")) + )); + } + + Ok(msg) + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignment given by ring, with the new indices of the nodes, and + /// None if the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignment + /// do modify assignment_ring and node_id_vec. + fn update_node_id_vec(&mut self) -> Result>>, Error> { + // (1) We compute the new node list + // Non gateway nodes should be coded on 8bits, hence they must be first in the list + // We build the new node ids + let new_non_gateway_nodes: Vec = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity.is_some())) + .map(|(k, _, _)| *k) + .collect(); + + if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { + return Err(Error::Message(format!( + "There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", + MAX_NODE_NUMBER + ))); + } + + let new_gateway_nodes: Vec = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_none())) + .map(|(k, _, _)| *k) + .collect(); + + let mut new_node_id_vec = Vec::::new(); + new_node_id_vec.extend(new_non_gateway_nodes); + new_node_id_vec.extend(new_gateway_nodes); + + let old_node_id_vec = self.node_id_vec.clone(); + self.node_id_vec = new_node_id_vec.clone(); + + // (2) We retrieve the old association + // We rewrite the old association with the new indices. We only consider partition + // to node assignments where the node is still in use. + if self.ring_assignment_data.is_empty() { + // This is a new association + return Ok(None); + } + + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "The old assignment does not have a size corresponding to \ + the old replication factor or the number of partitions." + .into(), + )); + } + + // We build a translation table between the uuid and new ids + let mut uuid_to_new_id = HashMap::::new(); + + // We add the indices of only the new non-gateway nodes that can be used in the + // association ring + for (i, uuid) in new_node_id_vec.iter().enumerate() { + uuid_to_new_id.insert(*uuid, i); + } + + let mut old_assignment = vec![Vec::::new(); NB_PARTITIONS]; + let rf = self.replication_factor; + + for (p, old_assign_p) in old_assignment.iter_mut().enumerate() { + for old_id in &self.ring_assignment_data[p * rf..(p + 1) * rf] { + let uuid = old_node_id_vec[*old_id as usize]; + if uuid_to_new_id.contains_key(&uuid) { + old_assign_p.push(uuid_to_new_id[&uuid]); + } + } + } + + // We write the ring + self.ring_assignment_data = Vec::::new(); + + Ok(Some(old_assignment)) + } + + /// This function generates ids for the zone of the nodes appearing in + /// self.node_id_vec. + fn generate_nongateway_zone_ids(&self) -> Result<(Vec, HashMap), Error> { + let mut id_to_zone = Vec::::new(); + let mut zone_to_id = HashMap::::new(); + + for uuid in self.nongateway_nodes().iter() { + let r = self.node_role(uuid).unwrap(); + if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { + zone_to_id.insert(r.zone.clone(), id_to_zone.len()); + id_to_zone.push(r.zone.clone()); + } + } + Ok((id_to_zone, zone_to_id)) + } + + /// This function computes by dichotomy the largest realizable partition size, given + /// the layout roles and parameters. + fn compute_optimal_partition_size( + &self, + zone_to_id: &HashMap, + zone_redundancy: usize, + ) -> Result { + let empty_set = HashSet::<(usize, usize)>::new(); + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set, zone_redundancy)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { + return Err(Error::Message( + "The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1." + .into(), + )); + } + + let mut s_down = 1; + let mut s_up = self.get_total_capacity()?; + while s_down + 1 < s_up { + g = self.generate_flow_graph( + (s_down + s_up) / 2, + zone_to_id, + &empty_set, + zone_redundancy, + )?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { + s_up = (s_down + s_up) / 2; + } else { + s_down = (s_down + s_up) / 2; + } + } + + Ok(s_down) + } + + fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec { + let mut vertices = vec![Vertex::Source, Vertex::Sink]; + for p in 0..NB_PARTITIONS { + vertices.push(Vertex::Pup(p)); + vertices.push(Vertex::Pdown(p)); + for z in 0..nb_zones { + vertices.push(Vertex::PZ(p, z)); + } + } + for n in 0..nb_nodes { + vertices.push(Vertex::N(n)); + } + vertices + } + + /// Generates the graph to compute the maximal flow corresponding to the optimal + /// partition assignment. + /// exclude_assoc is the set of (partition, node) association that we are forbidden + /// to use (hence we do not add the corresponding edge to the graph). This parameter + /// is used to compute a first flow that uses only edges appearing in the previous + /// assignment. This produces a solution that heuristically should be close to the + /// previous one. + fn generate_flow_graph( + &self, + partition_size: u64, + zone_to_id: &HashMap, + exclude_assoc: &HashSet<(usize, usize)>, + zone_redundancy: usize, + ) -> Result, Error> { + let vertices = + LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + let mut g = Graph::::new(&vertices); + let nb_zones = zone_to_id.len(); + for p in 0..NB_PARTITIONS { + g.add_edge(Vertex::Source, Vertex::Pup(p), zone_redundancy as u64)?; + g.add_edge( + Vertex::Source, + Vertex::Pdown(p), + (self.replication_factor - zone_redundancy) as u64, + )?; + for z in 0..nb_zones { + g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; + g.add_edge( + Vertex::Pdown(p), + Vertex::PZ(p, z), + self.replication_factor as u64, + )?; + } + } + for n in 0..self.nongateway_nodes().len() { + let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; + for p in 0..NB_PARTITIONS { + if !exclude_assoc.contains(&(p, n)) { + g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; + } + } + } + Ok(g) + } + + /// This function computes a first optimal assignment (in the form of a flow graph). + fn compute_candidate_assignment( + &self, + zone_to_id: &HashMap, + prev_assign_opt: &Option>>, + zone_redundancy: usize, + ) -> Result, Error> { + // We list the (partition,node) associations that are not used in the + // previous assignment + let mut exclude_edge = HashSet::<(usize, usize)>::new(); + if let Some(prev_assign) = prev_assign_opt { + let nb_nodes = self.nongateway_nodes().len(); + for (p, prev_assign_p) in prev_assign.iter().enumerate() { + for n in 0..nb_nodes { + exclude_edge.insert((p, n)); + } + for n in prev_assign_p.iter() { + exclude_edge.remove(&(p, *n)); + } + } + } + + // We compute the best flow using only the edges used in the previous assignment + let mut g = self.generate_flow_graph( + self.partition_size, + zone_to_id, + &exclude_edge, + zone_redundancy, + )?; + g.compute_maximal_flow()?; + + // We add the excluded edges and compute the maximal flow with the full graph. + // The algorithm is such that it will start with the flow that we just computed + // and find ameliorating paths from that. + for (p, n) in exclude_edge.iter() { + let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]; + g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; + } + g.compute_maximal_flow()?; + Ok(g) + } + + /// This function updates the flow graph gflow to minimize the distance between + /// its corresponding assignment and the previous one + fn minimize_rebalance_load( + &self, + gflow: &mut Graph, + zone_to_id: &HashMap, + prev_assign: &[Vec], + ) -> Result<(), Error> { + // We define a cost function on the edges (pairs of vertices) corresponding + // to the distance between the two assignments. + let mut cost = CostFunction::new(); + for (p, assoc_p) in prev_assign.iter().enumerate() { + for n in assoc_p.iter() { + let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]; + cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); + } + } + + // We compute the maximal length of a simple path in gflow. It is used in the + // Bellman-Ford algorithm in optimize_flow_with_cost to set the number + // of iterations. + let nb_nodes = self.nongateway_nodes().len(); + let path_length = 4 * nb_nodes; + gflow.optimize_flow_with_cost(&cost, path_length)?; + + Ok(()) + } + + /// This function updates the assignment ring from the flow graph. + fn update_ring_from_flow( + &mut self, + nb_zones: usize, + gflow: &Graph, + ) -> Result<(), Error> { + self.ring_assignment_data = Vec::::new(); + for p in 0..NB_PARTITIONS { + for z in 0..nb_zones { + let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + for vertex in assoc_vertex.iter() { + if let Vertex::N(n) = vertex { + self.ring_assignment_data.push((*n).try_into().unwrap()); + } + } + } + } + + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "Critical Error : the association ring we produced does not \ + have the right size." + .into(), + )); + } + Ok(()) + } + + /// This function returns a message summing up the partition repartition of the new + /// layout, and other statistics of the partition assignment computation. + fn output_stat( + &self, + gflow: &Graph, + prev_assign_opt: &Option>>, + zone_to_id: &HashMap, + id_to_zone: &[String], + ) -> Result { + let mut msg = Message::new(); + + let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; + let total_cap = self.get_total_capacity()?; + let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); + msg.push(format!( + "Usable capacity / total cluster capacity: {} / {} ({:.1} %)", + ByteSize::b(used_cap).to_string_as(false), + ByteSize::b(total_cap).to_string_as(false), + percent_cap + )); + msg.push(format!( + "Effective capacity (replication factor {}): {}", + self.replication_factor, + ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false) + )); + if percent_cap < 80. { + msg.push("".into()); + msg.push( + "If the percentage is too low, it might be that the \ + cluster topology and redundancy constraints are forcing the use of nodes/zones with small \ + storage capacities." + .into(), + ); + msg.push( + "You might want to move storage capacity between zones or relax the redundancy constraint." + .into(), + ); + msg.push( + "See the detailed statistics below and look for saturated nodes/zones.".into(), + ); + } + + // We define and fill in the following tables + let storing_nodes = self.nongateway_nodes(); + let mut new_partitions = vec![0; storing_nodes.len()]; + let mut stored_partitions = vec![0; storing_nodes.len()]; + + let mut new_partitions_zone = vec![0; id_to_zone.len()]; + let mut stored_partitions_zone = vec![0; id_to_zone.len()]; + + for p in 0..NB_PARTITIONS { + for z in 0..id_to_zone.len() { + let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + if !pz_nodes.is_empty() { + stored_partitions_zone[z] += 1; + if let Some(prev_assign) = prev_assign_opt { + let mut old_zones_of_p = Vec::::new(); + for n in prev_assign[p].iter() { + old_zones_of_p + .push(zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]); + } + if !old_zones_of_p.contains(&z) { + new_partitions_zone[z] += 1; + } + } + } + for vert in pz_nodes.iter() { + if let Vertex::N(n) = *vert { + stored_partitions[n] += 1; + if let Some(prev_assign) = prev_assign_opt { + if !prev_assign[p].contains(&n) { + new_partitions[n] += 1; + } + } + } + } + } + } + + if prev_assign_opt.is_none() { + new_partitions = stored_partitions.clone(); + //new_partitions_zone = stored_partitions_zone.clone(); + } + + // We display the statistics + + msg.push("".into()); + if prev_assign_opt.is_some() { + let total_new_partitions: usize = new_partitions.iter().sum(); + msg.push(format!( + "A total of {} new copies of partitions need to be \ + transferred.", + total_new_partitions + )); + msg.push("".into()); + } + + let mut table = vec![]; + for z in 0..id_to_zone.len() { + let mut nodes_of_z = Vec::::new(); + for n in 0..storing_nodes.len() { + if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { + nodes_of_z.push(n); + } + } + let replicated_partitions: usize = + nodes_of_z.iter().map(|n| stored_partitions[*n]).sum(); + table.push(format!( + "{}\tTags\tPartitions\tCapacity\tUsable capacity", + id_to_zone[z] + )); + + let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; + let mut total_cap_z = 0; + for n in nodes_of_z.iter() { + total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + } + let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); + + for n in nodes_of_z.iter() { + let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; + let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; + let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or(""))?.tags_string(); + table.push(format!( + " {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)", + self.node_id_vec[*n], + tags_n, + stored_partitions[*n], + new_partitions[*n], + ByteSize::b(total_cap_n).to_string_as(false), + ByteSize::b(available_cap_n).to_string_as(false), + (available_cap_n as f32) / (total_cap_n as f32) * 100.0, + )); + } + + table.push(format!( + " TOTAL\t\t{} ({} unique)\t{}\t{} ({:.1}%)", + replicated_partitions, + stored_partitions_zone[z], + //new_partitions_zone[z], + ByteSize::b(total_cap_z).to_string_as(false), + ByteSize::b(available_cap_z).to_string_as(false), + percent_cap_z + )); + table.push("".into()); + } + msg.push(format_table::format_table_to_string(table)); + + Ok(msg) + } +} + +// ==================================================================================== + +#[cfg(test)] +mod tests { + use super::{Error, *}; + use std::cmp::min; + + // This function checks that the partition size S computed is at least better than the + // one given by a very naive algorithm. To do so, we try to run the naive algorithm + // assuming a partion size of S+1. If we succed, it means that the optimal assignment + // was not optimal. The naive algorithm is the following : + // - we compute the max number of partitions associated to every node, capped at the + // partition number. It gives the number of tokens of every node. + // - every zone has a number of tokens equal to the sum of the tokens of its nodes. + // - we cycle over the partitions and associate zone tokens while respecting the + // zone redundancy constraint. + // NOTE: the naive algorithm is not optimal. Counter example: + // take nb_partition = 3 ; replication_factor = 5; redundancy = 4; + // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) + // With these parameters, the naive algo fails, whereas there is a solution: + // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) + fn check_against_naive(cl: &LayoutVersion) -> Result { + let over_size = cl.partition_size + 1; + let mut zone_token = HashMap::::new(); + + let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; + + if zones.is_empty() { + return Ok(false); + } + + for z in zones.iter() { + zone_token.insert(z.clone(), 0); + } + for uuid in cl.nongateway_nodes().iter() { + let z = cl.get_node_zone(uuid)?; + let c = cl.get_node_capacity(uuid)?; + zone_token.insert( + z.clone(), + zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize), + ); + } + + // For every partition, we count the number of zone already associated and + // the name of the last zone associated + + let mut id_zone_token = vec![0; zones.len()]; + for (z, t) in zone_token.iter() { + id_zone_token[zone_to_id[z]] = *t; + } + + let mut nb_token = vec![0; NB_PARTITIONS]; + let mut last_zone = vec![zones.len(); NB_PARTITIONS]; + + let mut curr_zone = 0; + + let redundancy = cl.effective_zone_redundancy(); + + for replic in 0..cl.replication_factor { + for p in 0..NB_PARTITIONS { + while id_zone_token[curr_zone] == 0 + || (last_zone[p] == curr_zone + && redundancy - nb_token[p] <= cl.replication_factor - replic) + { + curr_zone += 1; + if curr_zone >= zones.len() { + return Ok(true); + } + } + id_zone_token[curr_zone] -= 1; + if last_zone[p] != curr_zone { + nb_token[p] += 1; + last_zone[p] = curr_zone; + } + } + } + + return Ok(false); + } + + fn show_msg(msg: &Message) { + for s in msg.iter() { + println!("{}", s); + } + } + + fn update_layout( + cl: &mut LayoutVersion, + node_id_vec: &Vec, + node_capacity_vec: &Vec, + node_zone_vec: &Vec, + zone_redundancy: usize, + ) { + for i in 0..node_id_vec.len() { + if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { + cl.node_id_vec.push(x); + } + + let update = cl.staging_roles.update_mutator( + cl.node_id_vec[i], + NodeRoleV(Some(NodeRole { + zone: (node_zone_vec[i].to_string()), + capacity: (Some(node_capacity_vec[i])), + tags: (vec![]), + })), + ); + cl.staging_roles.merge(&update); + } + cl.staging_parameters.update(LayoutParameters { + zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), + }); + cl.staging_hash = cl.calculate_staging_hash(); + } + + #[test] + fn test_assignment() { + let mut node_id_vec = vec![1, 2, 3]; + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let mut cl = LayoutVersion::new(3); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_capacity_vec = vec![ + 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, + ]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1); + let v = cl.version; + let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + } +} -- cgit v1.2.3 From 8dccee3ccfe7793c42203f28c1e91c6f989b6899 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 19:28:36 +0100 Subject: cluster layout: adapt all uses of ClusterLayout to LayoutHistory --- src/rpc/layout/history.rs | 17 +++++------------ src/rpc/layout/schema.rs | 5 ++--- 2 files changed, 7 insertions(+), 15 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b3019f58..e59c9e9c 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,4 @@ use std::cmp::Ordering; -use std::sync::Arc; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -64,24 +63,22 @@ impl LayoutHistory { } // Add any new versions to history - let mut versions = self.versions.to_vec(); for v2 in other.versions.iter() { - if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { + if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { if v1 != v2 { error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); } - } else if versions.iter().all(|v| v.version != v2.version - 1) { + } else if self.versions.iter().all(|v| v.version != v2.version - 1) { error!( "Cannot receive new layout version {}, version {} is missing", v2.version, v2.version - 1 ); } else { - versions.push(v2.clone()); + self.versions.push(v2.clone()); changed = true; } } - self.versions = Arc::from(versions.into_boxed_slice()); // Merge trackers self.update_trackers.merge(&other.update_trackers); @@ -117,9 +114,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let msg = new_version.calculate_partition_assignment()?; - let mut versions = self.versions.to_vec(); - versions.push(new_version); - self.versions = Arc::from(versions.into_boxed_slice()); + self.versions.push(new_version); Ok((self, msg)) } @@ -149,9 +144,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let mut new_version = self.current().clone(); new_version.version += 1; - let mut versions = self.versions.to_vec(); - versions.push(new_version); - self.versions = Arc::from(versions.into_boxed_slice()); + self.versions.push(new_version); Ok(self) } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index fa0822fa..14e797be 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -184,7 +184,6 @@ mod v010 { use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; - use std::sync::Arc; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; /// The layout of the cluster, i.e. the list of roles @@ -215,7 +214,7 @@ mod v010 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { /// The versions currently in use in the cluster - pub versions: Arc<[LayoutVersion]>, + pub versions: Vec, /// Update trackers pub update_trackers: UpdateTrackers, @@ -267,7 +266,7 @@ mod v010 { .collect::>(), ); let mut ret = Self { - versions: Arc::from(vec![version].into_boxed_slice()), + versions: vec![version], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), -- cgit v1.2.3 From 523d2ecb9511f74e144cd116b942d6c1bf0f546d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 11:19:43 +0100 Subject: layout: use separate CRDT for staged layout changes --- src/rpc/layout/graph_algo.rs | 405 +++++++++++++++++++++++++++++++++++++++++++ src/rpc/layout/history.rs | 82 +++------ src/rpc/layout/mod.rs | 4 +- src/rpc/layout/schema.rs | 106 ++++++++++- src/rpc/layout/tracker.rs | 21 --- src/rpc/layout/version.rs | 54 +----- 6 files changed, 535 insertions(+), 137 deletions(-) create mode 100644 src/rpc/layout/graph_algo.rs delete mode 100644 src/rpc/layout/tracker.rs (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/graph_algo.rs b/src/rpc/layout/graph_algo.rs new file mode 100644 index 00000000..bd33e97f --- /dev/null +++ b/src/rpc/layout/graph_algo.rs @@ -0,0 +1,405 @@ +//! This module deals with graph algorithms. +//! It is used in layout.rs to build the partition to node assignment. + +use rand::prelude::{SeedableRng, SliceRandom}; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::collections::VecDeque; + +/// Vertex data structures used in all the graphs used in layout.rs. +/// usize parameters correspond to node/zone/partitions ids. +/// To understand the vertex roles below, please refer to the formal description +/// of the layout computation algorithm. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Vertex { + Source, + Pup(usize), // The vertex p+ of partition p + Pdown(usize), // The vertex p- of partition p + PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z) + N(usize), // The vertex corresponding to node n + Sink, +} + +/// Edge data structure for the flow algorithm. +#[derive(Clone, Copy, Debug)] +pub struct FlowEdge { + cap: u64, // flow maximal capacity of the edge + flow: i64, // flow value on the edge + dest: usize, // destination vertex id + rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v +} + +/// Edge data structure for the detection of negative cycles. +#[derive(Clone, Copy, Debug)] +pub struct WeightedEdge { + w: i64, // weight of the edge + dest: usize, +} + +pub trait Edge: Clone + Copy {} +impl Edge for FlowEdge {} +impl Edge for WeightedEdge {} + +/// Struct for the graph structure. We do encapsulation here to be able to both +/// provide user friendly Vertex enum to address vertices, and to use internally usize +/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. +pub struct Graph { + vertex_to_id: HashMap, + id_to_vertex: Vec, + + // The graph is stored as an adjacency list + graph: Vec>, +} + +pub type CostFunction = HashMap<(Vertex, Vertex), i64>; + +impl Graph { + pub fn new(vertices: &[Vertex]) -> Self { + let mut map = HashMap::::new(); + for (i, vert) in vertices.iter().enumerate() { + map.insert(*vert, i); + } + Graph:: { + vertex_to_id: map, + id_to_vertex: vertices.to_vec(), + graph: vec![Vec::::new(); vertices.len()], + } + } + + fn get_vertex_id(&self, v: &Vertex) -> Result { + self.vertex_to_id + .get(v) + .cloned() + .ok_or_else(|| format!("The graph does not contain vertex {:?}", v)) + } +} + +impl Graph { + /// This function adds a directed edge to the graph with capacity c, and the + /// corresponding reversed edge with capacity 0. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + if idu == idv { + return Err("Cannot add edge from vertex to itself in flow graph".into()); + } + + let rev_u = self.graph[idu].len(); + let rev_v = self.graph[idv].len(); + self.graph[idu].push(FlowEdge { + cap: c, + dest: idv, + flow: 0, + rev: rev_v, + }); + self.graph[idv].push(FlowEdge { + cap: 0, + dest: idu, + flow: 0, + rev: rev_u, + }); + Ok(()) + } + + /// This function returns the list of vertices that receive a positive flow from + /// vertex v. + pub fn get_positive_flow_from(&self, v: Vertex) -> Result, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = Vec::::new(); + for edge in self.graph[idv].iter() { + if edge.flow > 0 { + result.push(self.id_to_vertex[edge.dest]); + } + } + Ok(result) + } + + /// This function returns the value of the flow outgoing from v. + pub fn get_outflow(&self, v: Vertex) -> Result { + let idv = self.get_vertex_id(&v)?; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, edge.flow); + } + Ok(result) + } + + /// This function computes the flow total value by computing the outgoing flow + /// from the source. + pub fn get_flow_value(&mut self) -> Result { + self.get_outflow(Vertex::Source) + } + + /// This function shuffles the order of the edge lists. It keeps the ids of the + /// reversed edges consistent. + fn shuffle_edges(&mut self) { + // We use deterministic randomness so that the layout calculation algorihtm + // will output the same thing every time it is run. This way, the results + // pre-calculated in `garage layout show` will match exactly those used + // in practice with `garage layout apply` + let mut rng = rand::rngs::StdRng::from_seed([0x12u8; 32]); + for i in 0..self.graph.len() { + self.graph[i].shuffle(&mut rng); + // We need to update the ids of the reverse edges. + for j in 0..self.graph[i].len() { + let target_v = self.graph[i][j].dest; + let target_rev = self.graph[i][j].rev; + self.graph[target_v][target_rev].rev = j; + } + } + } + + /// Computes an upper bound of the flow on the graph + pub fn flow_upper_bound(&self) -> Result { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let mut flow_upper_bound = 0; + for edge in self.graph[idsource].iter() { + flow_upper_bound += edge.cap; + } + Ok(flow_upper_bound) + } + + /// This function computes the maximal flow using Dinic's algorithm. It starts with + /// the flow values already present in the graph. So it is possible to add some edge to + /// the graph, compute a flow, add other edges, update the flow. + pub fn compute_maximal_flow(&mut self) -> Result<(), String> { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let idsink = self.get_vertex_id(&Vertex::Sink)?; + + let nb_vertices = self.graph.len(); + + let flow_upper_bound = self.flow_upper_bound()?; + + // To ensure the dispersion of the associations generated by the + // assignment, we shuffle the neighbours of the nodes. Hence, + // the vertices do not consider their neighbours in the same order. + self.shuffle_edges(); + + // We run Dinic's max flow algorithm + loop { + // We build the level array from Dinic's algorithm. + let mut level = vec![None; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((idsource, 0)); + while let Some((id, lvl)) = fifo.pop_front() { + if level[id].is_none() { + // it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i64 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); + } + } + } + } + if level[idsink].is_none() { + // There is no residual flow + break; + } + // Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = Vec::new(); + + lifo.push((idsource, flow_upper_bound)); + + while let Some((id, f)) = lifo.last().cloned() { + if id == idsink { + // The DFS reached the sink, we can add a + // residual flow. + lifo.pop(); + while let Some((id, _)) = lifo.pop() { + let nbd = next_nbd[id]; + self.graph[id][nbd].flow += f as i64; + let id_rev = self.graph[id][nbd].dest; + let nbd_rev = self.graph[id][nbd].rev; + self.graph[id_rev][nbd_rev].flow -= f as i64; + } + lifo.push((idsource, flow_upper_bound)); + continue; + } + // else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= self.graph[id].len() { + // There is nothing to explore from id anymore + lifo.pop(); + if let Some((parent, _)) = lifo.last() { + next_nbd[*parent] += 1; + } + continue; + } + // else we can try to send flow from id to its nbd + let new_flow = min( + f as i64, + self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, + ) as u64; + if new_flow == 0 { + next_nbd[id] += 1; + continue; + } + if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { + if lvldest <= lvlid { + // We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + } + // otherwise, we send flow to nbd. + lifo.push((self.graph[id][nbd].dest, new_flow)); + } + } + Ok(()) + } + + /// This function takes a flow, and a cost function on the edges, and tries to find an + /// equivalent flow with a better cost, by finding improving overflow cycles. It uses + /// as subroutine the Bellman Ford algorithm run up to path_length. + /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and + /// only one needs to be present in the cost function. + pub fn optimize_flow_with_cost( + &mut self, + cost: &CostFunction, + path_length: usize, + ) -> Result<(), String> { + // We build the weighted graph g where we will look for negative cycle + let mut gf = self.build_cost_graph(cost)?; + let mut cycles = gf.list_negative_cycles(path_length); + while !cycles.is_empty() { + // we enumerate negative cycles + for c in cycles.iter() { + for i in 0..c.len() { + // We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertex_to_id[&c[i]]; + let idv = self.vertex_to_id[&c[(i + 1) % c.len()]]; + for j in 0..self.graph[idu].len() { + // since idu appears at most once in the cycles, we enumerate every + // edge at most once. + let edge = self.graph[idu][j]; + if edge.dest == idv { + self.graph[idu][j].flow += 1; + self.graph[idv][edge.rev].flow -= 1; + break; + } + } + } + } + + gf = self.build_cost_graph(cost)?; + cycles = gf.list_negative_cycles(path_length); + } + Ok(()) + } + + /// Construct the weighted graph G_f from the flow and the cost function + fn build_cost_graph(&self, cost: &CostFunction) -> Result, String> { + let mut g = Graph::::new(&self.id_to_vertex); + let nb_vertices = self.id_to_vertex.len(); + for i in 0..nb_vertices { + for edge in self.graph[i].iter() { + if edge.cap as i64 - edge.flow > 0 { + // It is possible to send overflow through this edge + let u = self.id_to_vertex[i]; + let v = self.id_to_vertex[edge.dest]; + if cost.contains_key(&(u, v)) { + g.add_edge(u, v, cost[&(u, v)])?; + } else if cost.contains_key(&(v, u)) { + g.add_edge(u, v, -cost[&(v, u)])?; + } else { + g.add_edge(u, v, 0)?; + } + } + } + } + Ok(g) + } +} + +impl Graph { + /// This function adds a single directed weighted edge to the graph. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + self.graph[idu].push(WeightedEdge { w, dest: idv }); + Ok(()) + } + + /// This function lists the negative cycles it manages to find after path_length + /// iterations of the main loop of the Bellman-Ford algorithm. For the classical + /// algorithm, path_length needs to be equal to the number of vertices. However, + /// for particular graph structures like in our case, the algorithm is still correct + /// when path_length is the length of the longest possible simple path. + /// See the formal description of the algorithm for more details. + fn list_negative_cycles(&self, path_length: usize) -> Vec> { + let nb_vertices = self.graph.len(); + + // We start with every vertex at distance 0 of some imaginary extra -1 vertex. + let mut distance = vec![0; nb_vertices]; + // The prev vector collects for every vertex from where does the shortest path come + let mut prev = vec![None; nb_vertices]; + + for _ in 0..path_length + 1 { + for id in 0..nb_vertices { + for e in self.graph[id].iter() { + if distance[id] + e.w < distance[e.dest] { + distance[e.dest] = distance[id] + e.w; + prev[e.dest] = Some(id); + } + } + } + } + + // If self.graph contains a negative cycle, then at this point the graph described + // by prev (which is a directed 1-forest/functional graph) + // must contain a cycle. We list the cycles of prev. + let cycles_prev = cycles_of_1_forest(&prev); + + // Remark that the cycle in prev is in the reverse order compared to the cycle + // in the graph. Thus the .rev(). + return cycles_prev + .iter() + .map(|cycle| { + cycle + .iter() + .rev() + .map(|id| self.id_to_vertex[*id]) + .collect() + }) + .collect(); + } +} + +/// This function returns the list of cycles of a directed 1 forest. It does not +/// check for the consistency of the input. +fn cycles_of_1_forest(forest: &[Option]) -> Vec> { + let mut cycles = Vec::>::new(); + let mut time_of_discovery = vec![None; forest.len()]; + + for t in 0..forest.len() { + let mut id = t; + // while we are on a valid undiscovered node + while time_of_discovery[id].is_none() { + time_of_discovery[id] = Some(t); + if let Some(i) = forest[id] { + id = i; + } else { + break; + } + } + if forest[id].is_some() && time_of_discovery[id] == Some(t) { + // We discovered an id that we explored at this iteration t. + // It means we are on a cycle + let mut cy = vec![id; 1]; + let mut id2 = id; + while let Some(id_next) = forest[id2] { + id2 = id_next; + if id2 != id { + cy.push(id2); + } else { + break; + } + } + cycles.push(cy); + } + } + cycles +} diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e59c9e9c..9ae28887 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -12,14 +10,15 @@ impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); - let staging_parameters = Lww::::new(version.parameters); - let empty_lwwmap = LwwMap::new(); + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), - staging_parameters, - staging_roles: empty_lwwmap, + staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -31,8 +30,7 @@ impl LayoutHistory { } pub(crate) fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } // ================== updates to layout, public interface =================== @@ -41,26 +39,10 @@ impl LayoutHistory { let mut changed = false; // Merge staged layout changes - match other.current().version.cmp(&self.current().version) { - Ordering::Greater => { - self.staging_parameters = other.staging_parameters.clone(); - self.staging_roles = other.staging_roles.clone(); - self.staging_hash = other.staging_hash; - changed = true; - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - if new_staging_hash != self.staging_hash { - changed = true; - } - - self.staging_hash = new_staging_hash; - } - Ordering::Less => (), + if self.staging != other.staging { + changed = true; } + self.staging.merge(&other.staging); // Add any new versions to history for v2 in other.versions.iter() { @@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + // Compute new version and add it to history let mut new_version = self.current().clone(); new_version.version += 1; - new_version.roles.merge(&self.staging_roles); + new_version.roles.merge(&self.staging.get().roles); new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); + new_version.parameters = *self.staging.get().parameters.get(); let msg = new_version.calculate_partition_assignment()?; - self.versions.push(new_version); + // Reset the staged layout changes + self.staging.update(LayoutStaging { + parameters: self.staging.get().parameters.clone(), + roles: LwwMap::new(), + }); + self.staging_hash = self.calculate_staging_hash(); + Ok((self, msg)) } - pub fn revert_staged_changes(mut self, version: Option) -> Result { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.current().version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.current().parameters); + pub fn revert_staged_changes(mut self) -> Result { + self.staging.update(LayoutStaging { + parameters: Lww::new(self.current().parameters.clone()), + roles: LwwMap::new(), + }); self.staging_hash = self.calculate_staging_hash(); - // TODO this is stupid, we should have a separate version counter/LWW - // for the staging params - let mut new_version = self.current().clone(); - new_version.version += 1; - - self.versions.push(new_version); - Ok(self) } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 122d4b65..7c15988a 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,8 +1,10 @@ +mod graph_algo; mod history; mod schema; -mod tracker; mod version; +// ---- re-exports ---- + pub use history::*; pub use schema::*; pub use version::*; diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 14e797be..c5b9b1d3 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -1,3 +1,9 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; + mod v08 { use crate::layout::CompactNodeType; use garage_util::crdt::LwwMap; @@ -210,6 +216,15 @@ mod v010 { pub ring_assignment_data: Vec, } + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap, + } + /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { @@ -219,10 +234,8 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap, + /// Staged changes for the next version + pub staging: Lww, /// Hash of the serialized staging_parameters + staging_roles pub staging_hash: Hash, } @@ -265,6 +278,10 @@ mod v010 { .map(|x| (*x, version.version)) .collect::>(), ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; let mut ret = Self { versions: vec![version], update_trackers: UpdateTrackers { @@ -272,8 +289,7 @@ mod v010 { sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - staging_parameters: previous.staging_parameters, - staging_roles: previous.staging_roles, + staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -283,3 +299,81 @@ mod v010 { } pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } + + pub(crate) fn min(&self) -> u64 { + self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs deleted file mode 100644 index 778121e4..00000000 --- a/src/rpc/layout/tracker.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::*; - -impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) { - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { - *v_mut = std::cmp::max(*v_mut, *v); - } else { - self.0.insert(*k, *v); - } - } - } -} - -impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) { - self.ack_map.merge(&other.ack_map); - self.sync_map.merge(&other.sync_map); - self.sync_ack_map.merge(&other.sync_ack_map); - } -} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 363bc204..6918fdf9 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -1,69 +1,21 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::fmt; +use std::convert::TryInto; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::{AutoCrdt, LwwMap}; +use garage_util::crdt::LwwMap; use garage_util::data::*; use garage_util::error::*; -use crate::graph_algo::*; - -use std::convert::TryInto; - +use super::graph_algo::*; use super::schema::*; use super::*; // The Message type will be used to collect information on the algorithm. pub type Message = Vec; -impl AutoCrdt for LayoutParameters { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for NodeRoleV { - const WARN_IF_DIFFERENT: bool = true; -} - -impl NodeRole { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } - } - - pub fn tags_string(&self) -> String { - self.tags.join(",") - } -} - -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - impl LayoutVersion { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be Maximum, meaning that the maximum -- cgit v1.2.3 From 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 12:55:36 +0100 Subject: wip: split out layout management from System into separate LayoutManager --- src/rpc/layout/manager.rs | 177 ++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/layout/mod.rs | 2 + 2 files changed, 179 insertions(+) create mode 100644 src/rpc/layout/manager.rs (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs new file mode 100644 index 00000000..a8a77139 --- /dev/null +++ b/src/rpc/layout/manager.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::watch; +use tokio::sync::Mutex; + +use netapp::endpoint::Endpoint; +use netapp::peering::fullmesh::FullMeshPeeringStrategy; +use netapp::NodeID; + +use garage_util::config::Config; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::persister::Persister; + +use super::*; +use crate::rpc_helper::*; +use crate::system::*; + +pub struct LayoutManager { + replication_factor: usize, + persist_cluster_layout: Persister, + + pub layout_watch: watch::Receiver>, + update_layout: Mutex>>, + + pub(crate) rpc_helper: RpcHelper, + system_endpoint: Arc>, +} + +impl LayoutManager { + pub fn new( + config: &Config, + node_id: NodeID, + system_endpoint: Arc>, + fullmesh: Arc, + replication_factor: usize, + ) -> Result { + let persist_cluster_layout: Persister = + Persister::new(&config.metadata_dir, "cluster_layout"); + + let cluster_layout = match persist_cluster_layout.load() { + Ok(x) => { + if x.current().replication_factor != replication_factor { + return Err(Error::Message(format!( + "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", + x.current().replication_factor, + replication_factor + ))); + } + x + } + Err(e) => { + info!( + "No valid previous cluster layout stored ({}), starting fresh.", + e + ); + LayoutHistory::new(replication_factor) + } + }; + + let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); + + let rpc_helper = RpcHelper::new( + node_id.into(), + fullmesh, + layout_watch.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ); + + Ok(Self { + replication_factor, + persist_cluster_layout, + layout_watch, + update_layout: Mutex::new(update_layout), + system_endpoint, + rpc_helper, + }) + } + + // ---- PUBLIC INTERFACE ---- + + pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub fn history(&self) -> watch::Ref> { + self.layout_watch.borrow() + } + + pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayout, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { + let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; + } + } + + // ---- INTERNALS --- + + /// Save network configuration to disc + async fn save_cluster_layout(&self) -> Result<(), Error> { + let layout: Arc = self.layout_watch.borrow().clone(); + self.persist_cluster_layout + .save_async(&layout) + .await + .expect("Cannot save current cluster layout"); + Ok(()) + } + + // ---- RPC HANDLERS ---- + + pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { + let layout = self.layout_watch.borrow().as_ref().clone(); + SystemRpc::AdvertiseClusterLayout(layout) + } + + pub(crate) async fn handle_advertise_cluster_layout( + &self, + adv: &LayoutHistory, + ) -> Result { + if adv.current().replication_factor != self.replication_factor { + let msg = format!( + "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", + adv.current().replication_factor, + self.replication_factor + ); + error!("{}", msg); + return Err(Error::Message(msg)); + } + + let update_layout = self.update_layout.lock().await; + // TODO: don't clone each time an AdvertiseClusterLayout is received + let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + + let prev_layout_check = layout.check().is_ok(); + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); + } + + update_layout.send(Arc::new(layout.clone()))?; + drop(update_layout); + + /* TODO + tokio::spawn(async move { + if let Err(e) = system + .rpc_helper() + .broadcast( + &system.system_endpoint, + SystemRpc::AdvertiseClusterLayout(layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } + }); + */ + + self.save_cluster_layout().await?; + } + + Ok(SystemRpc::Ok) + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 7c15988a..cd3764bc 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -3,6 +3,8 @@ mod history; mod schema; mod version; +pub mod manager; + // ---- re-exports ---- pub use history::*; -- cgit v1.2.3 From 19ef1ec8e7fee3a6c670e6e35dfcc83f0801e604 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 13:34:14 +0100 Subject: layout: more refactoring --- src/rpc/layout/manager.rs | 116 ++++++++++++++++++++++++++++++---------------- src/rpc/layout/schema.rs | 6 +-- 2 files changed, 80 insertions(+), 42 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index a8a77139..351e0959 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,6 +1,8 @@ use std::sync::Arc; use std::time::Duration; +use serde::{Deserialize, Serialize}; + use tokio::sync::watch; use tokio::sync::Mutex; @@ -28,6 +30,16 @@ pub struct LayoutManager { system_endpoint: Arc>, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LayoutStatus { + /// Cluster layout version + pub cluster_layout_version: u64, + /// Hash of cluster layout update trackers + // (TODO) pub cluster_layout_trackers_hash: Hash, + /// Hash of cluster layout staging data + pub cluster_layout_staging_hash: Hash, +} + impl LayoutManager { pub fn new( config: &Config, @@ -35,7 +47,7 @@ impl LayoutManager { system_endpoint: Arc>, fullmesh: Arc, replication_factor: usize, - ) -> Result { + ) -> Result, Error> { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); @@ -68,28 +80,39 @@ impl LayoutManager { config.rpc_timeout_msec.map(Duration::from_millis), ); - Ok(Self { + Ok(Arc::new(Self { replication_factor, persist_cluster_layout, layout_watch, update_layout: Mutex::new(update_layout), system_endpoint, rpc_helper, - }) + })) } // ---- PUBLIC INTERFACE ---- - pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> { + pub fn status(&self) -> LayoutStatus { + let layout = self.layout(); + LayoutStatus { + cluster_layout_version: layout.current().version, + cluster_layout_staging_hash: layout.staging_hash, + } + } + + pub async fn update_cluster_layout( + self: &Arc, + layout: &LayoutHistory, + ) -> Result<(), Error> { self.handle_advertise_cluster_layout(layout).await?; Ok(()) } - pub fn history(&self) -> watch::Ref> { + pub fn layout(&self) -> watch::Ref> { self.layout_watch.borrow() } - pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) { + pub(crate) async fn pull_cluster_layout(self: &Arc, peer: Uuid) { let resp = self .rpc_helper .call( @@ -118,13 +141,25 @@ impl LayoutManager { // ---- RPC HANDLERS ---- + pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, status: &LayoutStatus) { + let local_status = self.status(); + if status.cluster_layout_version > local_status.cluster_layout_version + || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash + { + tokio::spawn({ + let this = self.clone(); + async move { this.pull_cluster_layout(from).await } + }); + } + } + pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().as_ref().clone(); + let layout = self.layout_watch.borrow().clone(); SystemRpc::AdvertiseClusterLayout(layout) } pub(crate) async fn handle_advertise_cluster_layout( - &self, + self: &Arc, adv: &LayoutHistory, ) -> Result { if adv.current().replication_factor != self.replication_factor { @@ -137,39 +172,42 @@ impl LayoutManager { return Err(Error::Message(msg)); } - let update_layout = self.update_layout.lock().await; - // TODO: don't clone each time an AdvertiseClusterLayout is received - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); - - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - update_layout.send(Arc::new(layout.clone()))?; - drop(update_layout); - - /* TODO - tokio::spawn(async move { - if let Err(e) = system - .rpc_helper() - .broadcast( - &system.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); + if *adv != **self.layout_watch.borrow() { + let update_layout = self.update_layout.lock().await; + let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + + let prev_layout_check = layout.check().is_ok(); + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); } - }); - */ - self.save_cluster_layout().await?; + let layout = Arc::new(layout); + update_layout.send(layout.clone())?; + drop(update_layout); // release mutex + + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + SystemRpc::AdvertiseClusterLayout(layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } + } + }); + + self.save_cluster_layout().await?; + } } Ok(SystemRpc::Ok) diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index c5b9b1d3..d587a6cb 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -226,7 +226,7 @@ mod v010 { } /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize)] + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutHistory { /// The versions currently in use in the cluster pub versions: Vec, @@ -241,7 +241,7 @@ mod v010 { } /// The tracker of acknowlegments and data syncs around the cluster - #[derive(Clone, Debug, Serialize, Deserialize, Default)] + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTrackers { /// The highest layout version number each node has ack'ed pub ack_map: UpdateTracker, @@ -253,7 +253,7 @@ mod v010 { } /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize, Default)] + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTracker(pub HashMap); impl garage_util::migrate::Migrate for LayoutHistory { -- cgit v1.2.3 From bfb1845fdc981a370539d641a5d80f438f184f07 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:12:05 +0100 Subject: layout: refactor to use a RwLock on LayoutHistory --- src/rpc/layout/manager.rs | 93 ++++++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 46 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 351e0959..c021039b 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,10 +1,9 @@ -use std::sync::Arc; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; -use tokio::sync::watch; -use tokio::sync::Mutex; +use tokio::sync::Notify; use netapp::endpoint::Endpoint; use netapp::peering::fullmesh::FullMeshPeeringStrategy; @@ -23,8 +22,8 @@ pub struct LayoutManager { replication_factor: usize, persist_cluster_layout: Persister, - pub layout_watch: watch::Receiver>, - update_layout: Mutex>>, + layout: Arc>, + pub(crate) change_notify: Arc, pub(crate) rpc_helper: RpcHelper, system_endpoint: Arc>, @@ -71,20 +70,21 @@ impl LayoutManager { } }; - let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); + let layout = Arc::new(RwLock::new(cluster_layout)); + let change_notify = Arc::new(Notify::new()); let rpc_helper = RpcHelper::new( node_id.into(), fullmesh, - layout_watch.clone(), + layout.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ); Ok(Arc::new(Self { replication_factor, persist_cluster_layout, - layout_watch, - update_layout: Mutex::new(update_layout), + layout, + change_notify, system_endpoint, rpc_helper, })) @@ -108,8 +108,8 @@ impl LayoutManager { Ok(()) } - pub fn layout(&self) -> watch::Ref> { - self.layout_watch.borrow() + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + self.layout.read().unwrap() } pub(crate) async fn pull_cluster_layout(self: &Arc, peer: Uuid) { @@ -131,7 +131,7 @@ impl LayoutManager { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout: Arc = self.layout_watch.borrow().clone(); + let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning self.persist_cluster_layout .save_async(&layout) .await @@ -139,6 +139,22 @@ impl LayoutManager { Ok(()) } + fn merge_layout(&self, adv: &LayoutHistory) -> Option { + let mut layout = self.layout.write().unwrap(); + let prev_layout_check = layout.check().is_ok(); + + if !prev_layout_check || adv.check().is_ok() { + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + panic!("Merged two correct layouts and got an incorrect layout."); + } + + return Some(layout.clone()); + } + } + None + } + // ---- RPC HANDLERS ---- pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, status: &LayoutStatus) { @@ -154,7 +170,7 @@ impl LayoutManager { } pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().clone(); + let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning SystemRpc::AdvertiseClusterLayout(layout) } @@ -172,42 +188,27 @@ impl LayoutManager { return Err(Error::Message(msg)); } - if *adv != **self.layout_watch.borrow() { - let update_layout = self.update_layout.lock().await; - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + if let Some(new_layout) = self.merge_layout(adv) { + self.change_notify.notify_waiters(); - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - let layout = Arc::new(layout); - update_layout.send(layout.clone())?; - drop(update_layout); // release mutex - - tokio::spawn({ - let this = self.clone(); - async move { - if let Err(e) = this - .rpc_helper - .broadcast( - &this.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + SystemRpc::AdvertiseClusterLayout(new_layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); } - }); + } + }); - self.save_cluster_layout().await?; - } + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) -- cgit v1.2.3 From 94caf9c0c1342ce1d2ba3ac7af39fb133721ee83 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:53:34 +0100 Subject: layout: separate code path for synchronizing update trackers only --- src/rpc/layout/history.rs | 51 ++++++++++++----- src/rpc/layout/manager.rs | 140 +++++++++++++++++++++++++++++++++------------- src/rpc/layout/schema.rs | 23 ++++++-- 3 files changed, 154 insertions(+), 60 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 9ae28887..357b9d62 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,10 +18,11 @@ impl LayoutHistory { let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), + trackers_hash: [0u8; 32].into(), staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; - ret.staging_hash = ret.calculate_staging_hash(); + ret.update_hashes(); ret } @@ -29,6 +30,15 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } + pub(crate) fn update_hashes(&mut self) { + self.trackers_hash = self.calculate_trackers_hash(); + self.staging_hash = self.calculate_staging_hash(); + } + + pub(crate) fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + pub(crate) fn calculate_staging_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } @@ -38,12 +48,6 @@ impl LayoutHistory { pub fn merge(&mut self, other: &LayoutHistory) -> bool { let mut changed = false; - // Merge staged layout changes - if self.staging != other.staging { - changed = true; - } - self.staging.merge(&other.staging); - // Add any new versions to history for v2 in other.versions.iter() { if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { @@ -63,7 +67,21 @@ impl LayoutHistory { } // Merge trackers - self.update_trackers.merge(&other.update_trackers); + if self.update_trackers != other.update_trackers { + let c = self.update_trackers.merge(&other.update_trackers); + changed = changed || c; + } + + // Merge staged layout changes + if self.staging != other.staging { + self.staging.merge(&other.staging); + changed = true; + } + + // Update hashes if there are changes + if changed { + self.update_hashes(); + } changed } @@ -100,7 +118,7 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: self.staging.get().parameters.clone(), roles: LwwMap::new(), }); - self.staging_hash = self.calculate_staging_hash(); + self.update_hashes(); Ok((self, msg)) } @@ -110,20 +128,25 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: Lww::new(self.current().parameters.clone()), roles: LwwMap::new(), }); - self.staging_hash = self.calculate_staging_hash(); + self.update_hashes(); Ok(self) } pub fn check(&self) -> Result<(), String> { // Check that the hash of the staging data is correct - let staging_hash = self.calculate_staging_hash(); - if staging_hash != self.staging_hash { + if self.trackers_hash != self.calculate_trackers_hash() { + return Err("trackers_hash is incorrect".into()); + } + if self.staging_hash != self.calculate_staging_hash() { return Err("staging_hash is incorrect".into()); } - // TODO: anythign more ? + for version in self.versions.iter() { + version.check()?; + } - self.current().check() + // TODO: anythign more ? + Ok(()) } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c021039b..a2502f58 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -19,6 +19,7 @@ use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { + node_id: Uuid, replication_factor: usize, persist_cluster_layout: Persister, @@ -34,7 +35,7 @@ pub struct LayoutStatus { /// Cluster layout version pub cluster_layout_version: u64, /// Hash of cluster layout update trackers - // (TODO) pub cluster_layout_trackers_hash: Hash, + pub cluster_layout_trackers_hash: Hash, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, } @@ -81,6 +82,7 @@ impl LayoutManager { ); Ok(Arc::new(Self { + node_id: node_id.into(), replication_factor, persist_cluster_layout, layout, @@ -92,10 +94,15 @@ impl LayoutManager { // ---- PUBLIC INTERFACE ---- + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + self.layout.read().unwrap() + } + pub fn status(&self) -> LayoutStatus { let layout = self.layout(); LayoutStatus { cluster_layout_version: layout.current().version, + cluster_layout_trackers_hash: layout.trackers_hash, cluster_layout_staging_hash: layout.staging_hash, } } @@ -108,11 +115,35 @@ impl LayoutManager { Ok(()) } - pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { - self.layout.read().unwrap() + // ---- INTERNALS --- + + fn merge_layout(&self, adv: &LayoutHistory) -> Option { + let mut layout = self.layout.write().unwrap(); + let prev_layout_check = layout.check().is_ok(); + + if !prev_layout_check || adv.check().is_ok() { + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + panic!("Merged two correct layouts and got an incorrect layout."); + } + + return Some(layout.clone()); + } + } + None } - pub(crate) async fn pull_cluster_layout(self: &Arc, peer: Uuid) { + fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option { + let mut layout = self.layout.write().unwrap(); + if layout.update_trackers != *adv { + if layout.update_trackers.merge(adv) { + return Some(layout.update_trackers.clone()); + } + } + None + } + + async fn pull_cluster_layout(self: &Arc, peer: Uuid) { let resp = self .rpc_helper .call( @@ -123,15 +154,35 @@ impl LayoutManager { ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { - let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; + if let Err(e) = self.handle_advertise_cluster_layout(&layout).await { + warn!("In pull_cluster_layout: {}", e); + } } } - // ---- INTERNALS --- + async fn pull_cluster_layout_trackers(self: &Arc, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayoutTrackers, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp { + if let Err(e) = self + .handle_advertise_cluster_layout_trackers(&trackers) + .await + { + warn!("In pull_cluster_layout_trackers: {}", e); + } + } + } - /// Save network configuration to disc + /// Save cluster layout data to disk async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning + let layout = self.layout.read().unwrap().clone(); self.persist_cluster_layout .save_async(&layout) .await @@ -139,33 +190,41 @@ impl LayoutManager { Ok(()) } - fn merge_layout(&self, adv: &LayoutHistory) -> Option { - let mut layout = self.layout.write().unwrap(); - let prev_layout_check = layout.check().is_ok(); - - if !prev_layout_check || adv.check().is_ok() { - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - panic!("Merged two correct layouts and got an incorrect layout."); + fn broadcast_update(self: &Arc, rpc: SystemRpc) { + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + rpc, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); } - - return Some(layout.clone()); } - } - None + }); } // ---- RPC HANDLERS ---- - pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, status: &LayoutStatus) { - let local_status = self.status(); - if status.cluster_layout_version > local_status.cluster_layout_version - || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash + pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, remote: &LayoutStatus) { + let local = self.status(); + if remote.cluster_layout_version > local.cluster_layout_version + || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash { tokio::spawn({ let this = self.clone(); async move { this.pull_cluster_layout(from).await } }); + } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash { + tokio::spawn({ + let this = self.clone(); + async move { this.pull_cluster_layout_trackers(from).await } + }); } } @@ -174,6 +233,11 @@ impl LayoutManager { SystemRpc::AdvertiseClusterLayout(layout) } + pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc { + let layout = self.layout.read().unwrap(); + SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone()) + } + pub(crate) async fn handle_advertise_cluster_layout( self: &Arc, adv: &LayoutHistory, @@ -190,24 +254,20 @@ impl LayoutManager { if let Some(new_layout) = self.merge_layout(adv) { self.change_notify.notify_waiters(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout)); + self.save_cluster_layout().await?; + } - tokio::spawn({ - let this = self.clone(); - async move { - if let Err(e) = this - .rpc_helper - .broadcast( - &this.system_endpoint, - SystemRpc::AdvertiseClusterLayout(new_layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } - } - }); + Ok(SystemRpc::Ok) + } + pub(crate) async fn handle_advertise_cluster_layout_trackers( + self: &Arc, + trackers: &UpdateTrackers, + ) -> Result { + if let Some(new_trackers) = self.merge_layout_trackers(trackers) { + self.change_notify.notify_waiters(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers)); self.save_cluster_layout().await?; } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index d587a6cb..abae5bd8 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -233,6 +233,8 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, + /// Hash of the update trackers + pub trackers_hash: Hash, /// Staged changes for the next version pub staging: Lww, @@ -289,10 +291,12 @@ mod v010 { sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, + trackers_hash: [0u8; 32].into(), staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); + ret.trackers_hash = ret.calculate_trackers_hash(); ret } } @@ -355,14 +359,20 @@ impl core::str::FromStr for ZoneRedundancy { } impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) { + fn merge(&mut self, other: &UpdateTracker) -> bool { + let mut changed = false; for (k, v) in other.0.iter() { if let Some(v_mut) = self.0.get_mut(k) { - *v_mut = std::cmp::max(*v_mut, *v); + if *v > *v_mut { + *v_mut = *v; + changed = true; + } } else { self.0.insert(*k, *v); + changed = true; } } + changed } pub(crate) fn min(&self) -> u64 { @@ -371,9 +381,10 @@ impl UpdateTracker { } impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) { - self.ack_map.merge(&other.ack_map); - self.sync_map.merge(&other.sync_map); - self.sync_ack_map.merge(&other.sync_ack_map); + pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { + let c1 = self.ack_map.merge(&other.ack_map); + let c2 = self.sync_map.merge(&other.sync_map); + let c3 = self.sync_ack_map.merge(&other.sync_ack_map); + c1 || c2 || c3 } } -- cgit v1.2.3 From 03ebf18830dff1983f09abe6ecb8d8d26daeb446 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 15:31:59 +0100 Subject: layout: begin managing the update tracker values --- src/rpc/layout/history.rs | 74 +++++++++++++++++++++++++++++++++++++++++++---- src/rpc/layout/manager.rs | 7 +++-- src/rpc/layout/schema.rs | 15 +++++++--- src/rpc/layout/version.rs | 46 +++++++++++++++-------------- 4 files changed, 109 insertions(+), 33 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 357b9d62..347f03db 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -30,6 +32,14 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } + pub fn all_storage_nodes(&self) -> HashSet { + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>() + } + pub(crate) fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -43,6 +53,65 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ update tracking --------------- + + pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version in the history + self.ack_last(node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(node_id); + + // 4. Cleanup layout versions that are not needed anymore + self.cleanup_old_versions(); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + + // Finally, update hashes + self.update_hashes(); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update_trackers.ack_map.set_max(node, last_version); + } + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update_trackers.sync_map.set_max(node, first_version); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + self.update_trackers.sync_ack_map.set_max( + node, + self.calculate_global_min(&self.update_trackers.sync_map), + ); + } + + pub(crate) fn cleanup_old_versions(&mut self) { + let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + while self.versions.first().as_ref().unwrap().version < min_sync_ack { + self.versions.remove(0); + } + } + + pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { + let storage_nodes = self.all_storage_nodes(); + storage_nodes + .iter() + .map(|x| tracker.0.get(x).copied().unwrap_or(0)) + .min() + .unwrap_or(0) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -78,11 +147,6 @@ impl LayoutHistory { changed = true; } - // Update hashes if there are changes - if changed { - self.update_hashes(); - } - changed } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index a2502f58..ffcc938b 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -51,7 +51,7 @@ impl LayoutManager { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); - let cluster_layout = match persist_cluster_layout.load() { + let mut cluster_layout = match persist_cluster_layout.load() { Ok(x) => { if x.current().replication_factor != replication_factor { return Err(Error::Message(format!( @@ -71,6 +71,8 @@ impl LayoutManager { } }; + cluster_layout.update_trackers(node_id.into()); + let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -126,7 +128,7 @@ impl LayoutManager { if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } - + layout.update_trackers(self.node_id); return Some(layout.clone()); } } @@ -137,6 +139,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update_trackers.merge(adv) { + layout.update_trackers(self.node_id); return Some(layout.update_trackers.clone()); } } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index abae5bd8..9f5d6f62 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -3,6 +3,7 @@ use std::fmt; use bytesize::ByteSize; use garage_util::crdt::{AutoCrdt, Crdt}; +use garage_util::data::Uuid; mod v08 { use crate::layout::CompactNodeType; @@ -276,8 +277,7 @@ mod v010 { let update_tracker = UpdateTracker( version .nongateway_nodes() - .iter() - .map(|x| (*x, version.version)) + .map(|x| (x, version.version)) .collect::>(), ); let staging = LayoutStaging { @@ -375,8 +375,15 @@ impl UpdateTracker { changed } - pub(crate) fn min(&self) -> u64 { - self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) { + match self.0.get_mut(&peer) { + Some(e) => { + *e = std::cmp::max(*e, value); + } + None => { + self.0.insert(peer, value); + } + } } } diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 6918fdf9..65c62f63 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -134,15 +134,14 @@ impl LayoutVersion { // ===================== internal information extractors ====================== /// Returns the uuids of the non_gateway nodes in self.node_id_vec. - pub(crate) fn nongateway_nodes(&self) -> Vec { - let mut result = Vec::::new(); - for uuid in self.node_id_vec.iter() { - match self.node_role(uuid) { - Some(role) if role.capacity.is_some() => result.push(*uuid), - _ => (), - } - } - result + pub(crate) fn nongateway_nodes(&self) -> impl Iterator + '_ { + self.node_id_vec + .iter() + .copied() + .filter(move |uuid| match self.node_role(uuid) { + Some(role) if role.capacity.is_some() => true, + _ => false, + }) } /// Given a node uuids, this function returns the label of its zone @@ -158,8 +157,8 @@ impl LayoutVersion { /// Returns the sum of capacities of non gateway nodes in the cluster fn get_total_capacity(&self) -> Result { let mut total_capacity = 0; - for uuid in self.nongateway_nodes().iter() { - total_capacity += self.get_node_capacity(uuid)?; + for uuid in self.nongateway_nodes() { + total_capacity += self.get_node_capacity(&uuid)?; } Ok(total_capacity) } @@ -320,7 +319,7 @@ impl LayoutVersion { // to use them as indices in the flow graphs. let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; - let nb_nongateway_nodes = self.nongateway_nodes().len(); + let nb_nongateway_nodes = self.nongateway_nodes().count(); if nb_nongateway_nodes < self.replication_factor { return Err(Error::Message(format!( "The number of nodes with positive \ @@ -479,7 +478,8 @@ impl LayoutVersion { let mut id_to_zone = Vec::::new(); let mut zone_to_id = HashMap::::new(); - for uuid in self.nongateway_nodes().iter() { + let nongateway_nodes = self.nongateway_nodes().collect::>(); + for uuid in nongateway_nodes.iter() { let r = self.node_role(uuid).unwrap(); if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { zone_to_id.insert(r.zone.clone(), id_to_zone.len()); @@ -556,8 +556,10 @@ impl LayoutVersion { exclude_assoc: &HashSet<(usize, usize)>, zone_redundancy: usize, ) -> Result, Error> { - let vertices = - LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + let vertices = LayoutVersion::generate_graph_vertices( + zone_to_id.len(), + self.nongateway_nodes().count(), + ); let mut g = Graph::::new(&vertices); let nb_zones = zone_to_id.len(); for p in 0..NB_PARTITIONS { @@ -576,7 +578,7 @@ impl LayoutVersion { )?; } } - for n in 0..self.nongateway_nodes().len() { + for n in 0..self.nongateway_nodes().count() { let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; @@ -600,7 +602,7 @@ impl LayoutVersion { // previous assignment let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { - let nb_nodes = self.nongateway_nodes().len(); + let nb_nodes = self.nongateway_nodes().count(); for (p, prev_assign_p) in prev_assign.iter().enumerate() { for n in 0..nb_nodes { exclude_edge.insert((p, n)); @@ -652,7 +654,7 @@ impl LayoutVersion { // We compute the maximal length of a simple path in gflow. It is used in the // Bellman-Ford algorithm in optimize_flow_with_cost to set the number // of iterations. - let nb_nodes = self.nongateway_nodes().len(); + let nb_nodes = self.nongateway_nodes().count(); let path_length = 4 * nb_nodes; gflow.optimize_flow_with_cost(&cost, path_length)?; @@ -730,7 +732,7 @@ impl LayoutVersion { } // We define and fill in the following tables - let storing_nodes = self.nongateway_nodes(); + let storing_nodes = self.nongateway_nodes().collect::>(); let mut new_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()]; @@ -873,9 +875,9 @@ mod tests { for z in zones.iter() { zone_token.insert(z.clone(), 0); } - for uuid in cl.nongateway_nodes().iter() { - let z = cl.get_node_zone(uuid)?; - let c = cl.get_node_capacity(uuid)?; + for uuid in cl.nongateway_nodes() { + let z = cl.get_node_zone(&uuid)?; + let c = cl.get_node_capacity(&uuid)?; zone_token.insert( z.clone(), zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize), -- cgit v1.2.3 From bad7cc812ead88e9f334405c5c082d79c14c8898 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 15:42:10 +0100 Subject: layout admin: add missing calls to update_hash --- src/rpc/layout/history.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 347f03db..e17a1c77 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -40,7 +40,7 @@ impl LayoutHistory { .collect::>() } - pub(crate) fn update_hashes(&mut self) { + pub fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); } -- cgit v1.2.3 From 9d95f6f7040c1899715ae4f984313427b1432758 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 15:52:45 +0100 Subject: layout: fix tracker bugs --- src/rpc/layout/manager.rs | 8 +++++++- src/rpc/layout/schema.rs | 6 +++--- 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index ffcc938b..c1417dac 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -125,10 +125,10 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.merge(adv) { + layout.update_trackers(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } - layout.update_trackers(self.node_id); return Some(layout.clone()); } } @@ -245,6 +245,8 @@ impl LayoutManager { self: &Arc, adv: &LayoutHistory, ) -> Result { + debug!("handle_advertise_cluster_layout: {:?}", adv); + if adv.current().replication_factor != self.replication_factor { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", @@ -256,6 +258,8 @@ impl LayoutManager { } if let Some(new_layout) = self.merge_layout(adv) { + debug!("handle_advertise_cluster_layout: some changes were added to the current stuff"); + self.change_notify.notify_waiters(); self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout)); self.save_cluster_layout().await?; @@ -268,6 +272,8 @@ impl LayoutManager { self: &Arc, trackers: &UpdateTrackers, ) -> Result { + debug!("handle_advertise_cluster_layout_trackers: {:?}", trackers); + if let Some(new_trackers) = self.merge_layout_trackers(trackers) { self.change_notify.notify_waiters(); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers)); diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 9f5d6f62..db60c806 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -190,7 +190,7 @@ mod v010 { use garage_util::crdt::{Lww, LwwMap}; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; - use std::collections::HashMap; + use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; /// The layout of the cluster, i.e. the list of roles @@ -257,7 +257,7 @@ mod v010 { /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker(pub HashMap); + pub struct UpdateTracker(pub BTreeMap); impl garage_util::migrate::Migrate for LayoutHistory { const VERSION_MARKER: &'static [u8] = b"G010lh"; @@ -278,7 +278,7 @@ mod v010 { version .nongateway_nodes() .map(|x| (x, version.version)) - .collect::>(), + .collect::>(), ); let staging = LayoutStaging { parameters: previous.staging_parameters, -- cgit v1.2.3 From df36cf3099f6010c4fc62109b85d4d1e62f160cc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 16:32:31 +0100 Subject: layout: add helpers to LayoutHistory and prepare integration with Table --- src/rpc/layout/history.rs | 41 +++++++++++++++++++++++++++++++++-------- src/rpc/layout/manager.rs | 33 ++++++++++++++++++++++++++++++++- src/rpc/layout/schema.rs | 9 ++++++--- src/rpc/layout/version.rs | 2 +- 4 files changed, 72 insertions(+), 13 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e17a1c77..dbb02269 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -32,14 +32,6 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } - pub fn all_storage_nodes(&self) -> HashSet { - self.versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>() - } - pub fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -53,6 +45,39 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ who stores what now? --------------- + + pub fn max_ack(&self) -> u64 { + self.calculate_global_min(&self.update_trackers.ack_map) + } + + pub fn all_storage_nodes(&self) -> HashSet { + // TODO: cache this + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>() + } + + pub fn read_nodes_of(&self, position: &Hash) -> Vec { + let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let version = self + .versions + .iter() + .find(|x| x.version == sync_min) + .or(self.versions.last()) + .unwrap(); + version.nodes_of(position, version.replication_factor) + } + + pub fn write_sets_of(&self, position: &Hash) -> Vec> { + self.versions + .iter() + .map(|x| x.nodes_of(position, x.replication_factor)) + .collect::>() + } + // ------------------ update tracking --------------- pub(crate) fn update_trackers(&mut self, node_id: Uuid) { diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c1417dac..b0302b12 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,4 +1,5 @@ -use std::sync::{Arc, RwLock, RwLockReadGuard}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -26,6 +27,8 @@ pub struct LayoutManager { layout: Arc>, pub(crate) change_notify: Arc, + table_sync_version: Mutex>, + pub(crate) rpc_helper: RpcHelper, system_endpoint: Arc>, } @@ -117,6 +120,34 @@ impl LayoutManager { Ok(()) } + pub fn add_table(&self, table_name: &'static str) { + let first_version = self.layout().versions.first().unwrap().version; + + self.table_sync_version + .lock() + .unwrap() + .insert(table_name.to_string(), first_version); + } + + pub fn sync_table_until(self: &Arc, table_name: &'static str, version: u64) { + let mut table_sync_version = self.table_sync_version.lock().unwrap(); + *table_sync_version.get_mut(table_name).unwrap() = version; + let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap(); + drop(table_sync_version); + + let mut layout = self.layout.write().unwrap(); + if layout + .update_trackers + .sync_map + .set_max(self.node_id, sync_until) + { + layout.update_hashes(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + // ---- INTERNALS --- fn merge_layout(&self, adv: &LayoutHistory) -> Option { diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index db60c806..89f5c361 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -375,14 +375,17 @@ impl UpdateTracker { changed } - pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) { + pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { match self.0.get_mut(&peer) { - Some(e) => { - *e = std::cmp::max(*e, value); + Some(e) if *e < value => { + *e = value; + true } None => { self.0.insert(peer, value); + true } + _ => false, } } } diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 65c62f63..8133672a 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -109,7 +109,7 @@ impl LayoutVersion { .collect::>() } - /// Walk the ring to find the n servers in which data should be replicated + /// Return the n servers in which data for this hash should be replicated pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec { assert_eq!(n, self.replication_factor); -- cgit v1.2.3 From ce89d1ddabe3b9e638b0173949726522ae9a0311 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:08:32 +0100 Subject: table sync: adapt to new layout history --- src/rpc/layout/history.rs | 21 +++++++++++++++------ src/rpc/layout/manager.rs | 1 + src/rpc/layout/version.rs | 16 +++++++--------- 3 files changed, 23 insertions(+), 15 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dbb02269..185dbb27 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -47,11 +47,19 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- - pub fn max_ack(&self) -> u64 { + pub fn all_ack(&self) -> u64 { self.calculate_global_min(&self.update_trackers.ack_map) } - pub fn all_storage_nodes(&self) -> HashSet { + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version + } + + pub fn sync_versions(&self) -> (u64, u64, u64) { + (self.current().version, self.all_ack(), self.min_stored()) + } + + pub fn all_nongateway_nodes(&self) -> HashSet { // TODO: cache this self.versions .iter() @@ -71,11 +79,10 @@ impl LayoutHistory { version.nodes_of(position, version.replication_factor) } - pub fn write_sets_of(&self, position: &Hash) -> Vec> { + pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator> + 'a { self.versions .iter() - .map(|x| x.nodes_of(position, x.replication_factor)) - .collect::>() + .map(move |x| x.nodes_of(position, x.replication_factor)) } // ------------------ update tracking --------------- @@ -129,7 +136,9 @@ impl LayoutHistory { } pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - let storage_nodes = self.all_storage_nodes(); + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes(); storage_nodes .iter() .map(|x| tracker.0.get(x).copied().unwrap_or(0)) diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index b0302b12..7d60bae6 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -92,6 +92,7 @@ impl LayoutManager { persist_cluster_layout, layout, change_notify, + table_sync_version: Mutex::new(HashMap::new()), system_endpoint, rpc_helper, })) diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 8133672a..f45a3c35 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -98,15 +98,13 @@ impl LayoutVersion { } /// Get the list of partitions and the first hash of a partition key that would fall in it - pub fn partitions(&self) -> Vec<(Partition, Hash)> { - (0..(1 << PARTITION_BITS)) - .map(|i| { - let top = (i as u16) << (16 - PARTITION_BITS); - let mut location = [0u8; 32]; - location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - (i as u16, Hash::from(location)) - }) - .collect::>() + pub fn partitions(&self) -> impl Iterator + '_ { + (0..(1 << PARTITION_BITS)).map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) } /// Return the n servers in which data for this hash should be replicated -- cgit v1.2.3 From df24bb806d64d5d5e748c35efe3f49ad3dda709e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:37:33 +0100 Subject: layout/sync: fix bugs and add tracing --- src/rpc/layout/history.rs | 3 ++- src/rpc/layout/manager.rs | 10 ++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 185dbb27..cef56647 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -131,7 +131,8 @@ impl LayoutHistory { pub(crate) fn cleanup_old_versions(&mut self) { let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); while self.versions.first().as_ref().unwrap().version < min_sync_ack { - self.versions.remove(0); + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 7d60bae6..ce8b6f61 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -133,7 +133,7 @@ impl LayoutManager { pub fn sync_table_until(self: &Arc, table_name: &'static str, version: u64) { let mut table_sync_version = self.table_sync_version.lock().unwrap(); *table_sync_version.get_mut(table_name).unwrap() = version; - let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap(); + let sync_until = table_sync_version.iter().map(|(_, v)| *v).min().unwrap(); drop(table_sync_version); let mut layout = self.layout.write().unwrap(); @@ -142,6 +142,7 @@ impl LayoutManager { .sync_map .set_max(self.node_id, sync_until) { + debug!("sync_until updated to {}", sync_until); layout.update_hashes(); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), @@ -277,7 +278,12 @@ impl LayoutManager { self: &Arc, adv: &LayoutHistory, ) -> Result { - debug!("handle_advertise_cluster_layout: {:?}", adv); + debug!( + "handle_advertise_cluster_layout: {} versions, last={}, trackers={:?}", + adv.versions.len(), + adv.current().version, + adv.update_trackers + ); if adv.current().replication_factor != self.replication_factor { let msg = format!( -- cgit v1.2.3 From 9a491fa1372a23e91c793ee1d2b313607752826a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 13:10:59 +0100 Subject: layout: fix test --- src/rpc/layout/history.rs | 9 ++- src/rpc/layout/mod.rs | 3 + src/rpc/layout/test.rs | 159 ++++++++++++++++++++++++++++++++++++++++++ src/rpc/layout/version.rs | 172 ++-------------------------------------------- 4 files changed, 174 insertions(+), 169 deletions(-) create mode 100644 src/rpc/layout/test.rs (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index cef56647..050f5d0a 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,7 +18,7 @@ impl LayoutHistory { }; let mut ret = LayoutHistory { - versions: vec![version].into_boxed_slice().into(), + versions: vec![version], update_trackers: Default::default(), trackers_hash: [0u8; 32].into(), staging: Lww::raw(0, staging), @@ -211,6 +211,11 @@ To know the correct value of the new layout version, invoke `garage layout show` let msg = new_version.calculate_partition_assignment()?; self.versions.push(new_version); + if self.current().check().is_ok() { + while self.versions.first().unwrap().check().is_err() { + self.versions.remove(0); + } + } // Reset the staged layout changes self.staging.update(LayoutStaging { @@ -245,7 +250,7 @@ To know the correct value of the new layout version, invoke `garage layout show` version.check()?; } - // TODO: anythign more ? + // TODO: anything more ? Ok(()) } } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index cd3764bc..577b32fb 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -3,6 +3,9 @@ mod history; mod schema; mod version; +#[cfg(test)] +mod test; + pub mod manager; // ---- re-exports ---- diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs new file mode 100644 index 00000000..0ce090d2 --- /dev/null +++ b/src/rpc/layout/test.rs @@ -0,0 +1,159 @@ +use std::cmp::min; +use std::collections::HashMap; + +use garage_util::crdt::Crdt; +use garage_util::error::*; + +use crate::layout::*; + +// This function checks that the partition size S computed is at least better than the +// one given by a very naive algorithm. To do so, we try to run the naive algorithm +// assuming a partion size of S+1. If we succed, it means that the optimal assignment +// was not optimal. The naive algorithm is the following : +// - we compute the max number of partitions associated to every node, capped at the +// partition number. It gives the number of tokens of every node. +// - every zone has a number of tokens equal to the sum of the tokens of its nodes. +// - we cycle over the partitions and associate zone tokens while respecting the +// zone redundancy constraint. +// NOTE: the naive algorithm is not optimal. Counter example: +// take nb_partition = 3 ; replication_factor = 5; redundancy = 4; +// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) +// With these parameters, the naive algo fails, whereas there is a solution: +// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) +fn check_against_naive(cl: &LayoutVersion) -> Result { + let over_size = cl.partition_size + 1; + let mut zone_token = HashMap::::new(); + + let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; + + if zones.is_empty() { + return Ok(false); + } + + for z in zones.iter() { + zone_token.insert(z.clone(), 0); + } + for uuid in cl.nongateway_nodes() { + let z = cl.get_node_zone(&uuid)?; + let c = cl.get_node_capacity(&uuid)?; + zone_token.insert( + z.to_string(), + zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize), + ); + } + + // For every partition, we count the number of zone already associated and + // the name of the last zone associated + + let mut id_zone_token = vec![0; zones.len()]; + for (z, t) in zone_token.iter() { + id_zone_token[zone_to_id[z]] = *t; + } + + let mut nb_token = vec![0; NB_PARTITIONS]; + let mut last_zone = vec![zones.len(); NB_PARTITIONS]; + + let mut curr_zone = 0; + + let redundancy = cl.effective_zone_redundancy(); + + for replic in 0..cl.replication_factor { + for p in 0..NB_PARTITIONS { + while id_zone_token[curr_zone] == 0 + || (last_zone[p] == curr_zone + && redundancy - nb_token[p] <= cl.replication_factor - replic) + { + curr_zone += 1; + if curr_zone >= zones.len() { + return Ok(true); + } + } + id_zone_token[curr_zone] -= 1; + if last_zone[p] != curr_zone { + nb_token[p] += 1; + last_zone[p] = curr_zone; + } + } + } + + return Ok(false); +} + +fn show_msg(msg: &Message) { + for s in msg.iter() { + println!("{}", s); + } +} + +fn update_layout( + cl: &mut LayoutHistory, + node_capacity_vec: &[u64], + node_zone_vec: &[&'static str], + zone_redundancy: usize, +) { + let staging = cl.staging.get_mut(); + + for (i, (capacity, zone)) in node_capacity_vec + .iter() + .zip(node_zone_vec.iter()) + .enumerate() + { + let node_id = [i as u8; 32].into(); + + let update = staging.roles.update_mutator( + node_id, + NodeRoleV(Some(NodeRole { + zone: zone.to_string(), + capacity: Some(*capacity), + tags: (vec![]), + })), + ); + staging.roles.merge(&update); + } + staging.parameters.update(LayoutParameters { + zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), + }); + + cl.update_hashes(); +} + +#[test] +fn test_assignment() { + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"]; + + let mut cl = LayoutHistory::new(3); + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.current().version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); + + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]; + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 2); + let v = cl.current().version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.current().version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); + + node_capacity_vec = vec![ + 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, + ]; + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 1); + let v = cl.current().version; + let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); +} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index f45a3c35..ffbdf277 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -143,7 +143,7 @@ impl LayoutVersion { } /// Given a node uuids, this function returns the label of its zone - fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { + pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { match self.node_role(uuid) { Some(role) => Ok(&role.zone), _ => Err(Error::Message( @@ -162,7 +162,7 @@ impl LayoutVersion { } /// Returns the effective value of the zone_redundancy parameter - fn effective_zone_redundancy(&self) -> usize { + pub(crate) fn effective_zone_redundancy(&self) -> usize { match self.parameters.zone_redundancy { ZoneRedundancy::AtLeast(v) => v, ZoneRedundancy::Maximum => { @@ -472,7 +472,9 @@ impl LayoutVersion { /// This function generates ids for the zone of the nodes appearing in /// self.node_id_vec. - fn generate_nongateway_zone_ids(&self) -> Result<(Vec, HashMap), Error> { + pub(crate) fn generate_nongateway_zone_ids( + &self, + ) -> Result<(Vec, HashMap), Error> { let mut id_to_zone = Vec::::new(); let mut zone_to_id = HashMap::::new(); @@ -838,167 +840,3 @@ impl LayoutVersion { Ok(msg) } } - -// ==================================================================================== - -#[cfg(test)] -mod tests { - use super::{Error, *}; - use std::cmp::min; - - // This function checks that the partition size S computed is at least better than the - // one given by a very naive algorithm. To do so, we try to run the naive algorithm - // assuming a partion size of S+1. If we succed, it means that the optimal assignment - // was not optimal. The naive algorithm is the following : - // - we compute the max number of partitions associated to every node, capped at the - // partition number. It gives the number of tokens of every node. - // - every zone has a number of tokens equal to the sum of the tokens of its nodes. - // - we cycle over the partitions and associate zone tokens while respecting the - // zone redundancy constraint. - // NOTE: the naive algorithm is not optimal. Counter example: - // take nb_partition = 3 ; replication_factor = 5; redundancy = 4; - // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) - // With these parameters, the naive algo fails, whereas there is a solution: - // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) - fn check_against_naive(cl: &LayoutVersion) -> Result { - let over_size = cl.partition_size + 1; - let mut zone_token = HashMap::::new(); - - let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; - - if zones.is_empty() { - return Ok(false); - } - - for z in zones.iter() { - zone_token.insert(z.clone(), 0); - } - for uuid in cl.nongateway_nodes() { - let z = cl.get_node_zone(&uuid)?; - let c = cl.get_node_capacity(&uuid)?; - zone_token.insert( - z.clone(), - zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize), - ); - } - - // For every partition, we count the number of zone already associated and - // the name of the last zone associated - - let mut id_zone_token = vec![0; zones.len()]; - for (z, t) in zone_token.iter() { - id_zone_token[zone_to_id[z]] = *t; - } - - let mut nb_token = vec![0; NB_PARTITIONS]; - let mut last_zone = vec![zones.len(); NB_PARTITIONS]; - - let mut curr_zone = 0; - - let redundancy = cl.effective_zone_redundancy(); - - for replic in 0..cl.replication_factor { - for p in 0..NB_PARTITIONS { - while id_zone_token[curr_zone] == 0 - || (last_zone[p] == curr_zone - && redundancy - nb_token[p] <= cl.replication_factor - replic) - { - curr_zone += 1; - if curr_zone >= zones.len() { - return Ok(true); - } - } - id_zone_token[curr_zone] -= 1; - if last_zone[p] != curr_zone { - nb_token[p] += 1; - last_zone[p] = curr_zone; - } - } - } - - return Ok(false); - } - - fn show_msg(msg: &Message) { - for s in msg.iter() { - println!("{}", s); - } - } - - fn update_layout( - cl: &mut LayoutVersion, - node_id_vec: &Vec, - node_capacity_vec: &Vec, - node_zone_vec: &Vec, - zone_redundancy: usize, - ) { - for i in 0..node_id_vec.len() { - if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { - cl.node_id_vec.push(x); - } - - let update = cl.staging_roles.update_mutator( - cl.node_id_vec[i], - NodeRoleV(Some(NodeRole { - zone: (node_zone_vec[i].to_string()), - capacity: (Some(node_capacity_vec[i])), - tags: (vec![]), - })), - ); - cl.staging_roles.merge(&update); - } - cl.staging_parameters.update(LayoutParameters { - zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), - }); - cl.staging_hash = cl.calculate_staging_hash(); - } - - #[test] - fn test_assignment() { - let mut node_id_vec = vec![1, 2, 3]; - let mut node_capacity_vec = vec![4000, 1000, 2000]; - let mut node_zone_vec = vec!["A", "B", "C"] - .into_iter() - .map(|x| x.to_string()) - .collect(); - - let mut cl = LayoutVersion::new(3); - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); - let v = cl.version; - let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); - show_msg(&msg); - assert_eq!(cl.check(), Ok(())); - assert!(matches!(check_against_naive(&cl), Ok(true))); - - node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; - node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; - node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] - .into_iter() - .map(|x| x.to_string()) - .collect(); - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2); - let v = cl.version; - let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); - show_msg(&msg); - assert_eq!(cl.check(), Ok(())); - assert!(matches!(check_against_naive(&cl), Ok(true))); - - node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); - let v = cl.version; - let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); - show_msg(&msg); - assert_eq!(cl.check(), Ok(())); - assert!(matches!(check_against_naive(&cl), Ok(true))); - - node_capacity_vec = vec![ - 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, - ]; - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1); - let v = cl.version; - let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); - show_msg(&msg); - assert_eq!(cl.check(), Ok(())); - assert!(matches!(check_against_naive(&cl), Ok(true))); - } -} -- cgit v1.2.3 From 8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 12:48:38 +0100 Subject: layout: some refactoring of nongateway nodes --- src/rpc/layout/history.rs | 30 ++++++++------- src/rpc/layout/schema.rs | 17 ++++++++ src/rpc/layout/version.rs | 98 ++++++++++++++++++++++++++--------------------- 3 files changed, 88 insertions(+), 57 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 050f5d0a..877ad3a7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::HashSet; use garage_util::crdt::{Crdt, Lww, LwwMap}; @@ -59,13 +60,19 @@ impl LayoutHistory { (self.current().version, self.all_ack(), self.min_stored()) } - pub fn all_nongateway_nodes(&self) -> HashSet { + pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { // TODO: cache this - self.versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>() + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().into() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>().into() + } } pub fn read_nodes_of(&self, position: &Hash) -> Vec { @@ -202,14 +209,11 @@ To know the correct value of the new layout version, invoke `garage layout show` } // Compute new version and add it to history - let mut new_version = self.current().clone(); - new_version.version += 1; - - new_version.roles.merge(&self.staging.get().roles); - new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging.get().parameters.get(); + let (new_version, msg) = self + .current() + .clone() + .calculate_next_version(&self.staging.get())?; - let msg = new_version.calculate_partition_assignment()?; self.versions.push(new_version); if self.current().check().is_ok() { while self.versions.first().unwrap().check().is_err() { diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 89f5c361..db298ee6 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -212,6 +212,8 @@ mod v010 { /// see comment in v08::ClusterLayout pub node_id_vec: Vec, + /// number of non-gateway nodes, which are the first ids in node_id_vec + pub nongateway_node_count: usize, /// see comment in v08::ClusterLayout #[serde(with = "serde_bytes")] pub ring_assignment_data: Vec, @@ -265,6 +267,18 @@ mod v010 { type Previous = v09::ClusterLayout; fn migrate(previous: Self::Previous) -> Self { + let nongateway_node_count = previous + .node_id_vec + .iter() + .enumerate() + .filter(|(_, uuid)| { + let role = previous.roles.get(uuid); + matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) + }) + .map(|(i, _)| i) + .max() + .unwrap_or(0); + let version = LayoutVersion { version: previous.version, replication_factor: previous.replication_factor, @@ -272,11 +286,14 @@ mod v010 { parameters: previous.parameters, roles: previous.roles, node_id_vec: previous.node_id_vec, + nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; let update_tracker = UpdateTracker( version .nongateway_nodes() + .iter() + .copied() .map(|x| (x, version.version)) .collect::>(), ); diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index ffbdf277..a7f387b6 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -5,7 +5,7 @@ use std::convert::TryInto; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::LwwMap; +use garage_util::crdt::{Crdt, LwwMap}; use garage_util::data::*; use garage_util::error::*; @@ -30,6 +30,7 @@ impl LayoutVersion { partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), + nongateway_node_count: 0, ring_assignment_data: Vec::new(), parameters, } @@ -43,6 +44,11 @@ impl LayoutVersion { &self.node_id_vec[..] } + /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + pub fn nongateway_nodes(&self) -> &[Uuid] { + &self.node_id_vec[..self.nongateway_node_count] + } + pub fn num_nodes(&self) -> usize { self.node_id_vec.len() } @@ -56,18 +62,14 @@ impl LayoutVersion { } /// Given a node uuids, this function returns its capacity or fails if it does not have any - pub fn get_node_capacity(&self, uuid: &Uuid) -> Result { + pub fn get_node_capacity(&self, uuid: &Uuid) -> Option { match self.node_role(uuid) { Some(NodeRole { capacity: Some(cap), zone: _, tags: _, - }) => Ok(*cap), - _ => Err(Error::Message( - "The Uuid does not correspond to a node present in the \ - cluster or this node does not have a positive capacity." - .into(), - )), + }) => Some(*cap), + _ => None, } } @@ -131,17 +133,6 @@ impl LayoutVersion { // ===================== internal information extractors ====================== - /// Returns the uuids of the non_gateway nodes in self.node_id_vec. - pub(crate) fn nongateway_nodes(&self) -> impl Iterator + '_ { - self.node_id_vec - .iter() - .copied() - .filter(move |uuid| match self.node_role(uuid) { - Some(role) if role.capacity.is_some() => true, - _ => false, - }) - } - /// Given a node uuids, this function returns the label of its zone pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { match self.node_role(uuid) { @@ -152,11 +143,16 @@ impl LayoutVersion { } } + fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 { + self.get_node_capacity(&uuid) + .expect("non-gateway node with zero capacity") + } + /// Returns the sum of capacities of non gateway nodes in the cluster fn get_total_capacity(&self) -> Result { let mut total_capacity = 0; for uuid in self.nongateway_nodes() { - total_capacity += self.get_node_capacity(&uuid)?; + total_capacity += self.expect_get_node_capacity(&uuid); } Ok(total_capacity) } @@ -257,7 +253,7 @@ impl LayoutVersion { if *usage > 0 { let uuid = self.node_id_vec[n]; let partusage = usage * self.partition_size; - let nodecap = self.get_node_capacity(&uuid).unwrap(); + let nodecap = self.expect_get_node_capacity(&uuid); if partusage > nodecap { return Err(format!( "node usage ({}) is bigger than node capacity ({})", @@ -288,6 +284,21 @@ impl LayoutVersion { // ================== updates to layout, internals =================== + pub(crate) fn calculate_next_version( + mut self, + staging: &LayoutStaging, + ) -> Result<(Self, Message), Error> { + self.version += 1; + + self.roles.merge(&staging.roles); + self.roles.retain(|(_, _, v)| v.0.is_some()); + self.parameters = *staging.parameters.get(); + + let msg = self.calculate_partition_assignment()?; + + Ok((self, msg)) + } + /// This function calculates a new partition-to-node assignment. /// The computed assignment respects the node replication factor /// and the zone redundancy parameter It maximizes the capacity of a @@ -297,7 +308,7 @@ impl LayoutVersion { /// data to be moved. /// Staged role changes must be merged with nodes roles before calling this function, /// hence it must only be called from apply_staged_changes() and hence is not public. - pub(crate) fn calculate_partition_assignment(&mut self) -> Result { + fn calculate_partition_assignment(&mut self) -> Result { // We update the node ids, since the node role list might have changed with the // changes in the layout. We retrieve the old_assignment reframed with new ids let old_assignment_opt = self.update_node_id_vec()?; @@ -317,12 +328,12 @@ impl LayoutVersion { // to use them as indices in the flow graphs. let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; - let nb_nongateway_nodes = self.nongateway_nodes().count(); - if nb_nongateway_nodes < self.replication_factor { + if self.nongateway_nodes().len() < self.replication_factor { return Err(Error::Message(format!( "The number of nodes with positive \ capacity ({}) is smaller than the replication factor ({}).", - nb_nongateway_nodes, self.replication_factor + self.nongateway_nodes().len(), + self.replication_factor ))); } if id_to_zone.len() < zone_redundancy { @@ -420,12 +431,14 @@ impl LayoutVersion { .map(|(k, _, _)| *k) .collect(); - let mut new_node_id_vec = Vec::::new(); - new_node_id_vec.extend(new_non_gateway_nodes); - new_node_id_vec.extend(new_gateway_nodes); + let old_node_id_vec = std::mem::take(&mut self.node_id_vec); + + self.nongateway_node_count = new_non_gateway_nodes.len(); + self.node_id_vec.clear(); + self.node_id_vec.extend(new_non_gateway_nodes); + self.node_id_vec.extend(new_gateway_nodes); - let old_node_id_vec = self.node_id_vec.clone(); - self.node_id_vec = new_node_id_vec.clone(); + let new_node_id_vec = &self.node_id_vec; // (2) We retrieve the old association // We rewrite the old association with the new indices. We only consider partition @@ -464,7 +477,7 @@ impl LayoutVersion { } } - // We write the ring + // We clear the ring assignemnt data self.ring_assignment_data = Vec::::new(); Ok(Some(old_assignment)) @@ -478,8 +491,7 @@ impl LayoutVersion { let mut id_to_zone = Vec::::new(); let mut zone_to_id = HashMap::::new(); - let nongateway_nodes = self.nongateway_nodes().collect::>(); - for uuid in nongateway_nodes.iter() { + for uuid in self.nongateway_nodes().iter() { let r = self.node_role(uuid).unwrap(); if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { zone_to_id.insert(r.zone.clone(), id_to_zone.len()); @@ -556,10 +568,8 @@ impl LayoutVersion { exclude_assoc: &HashSet<(usize, usize)>, zone_redundancy: usize, ) -> Result, Error> { - let vertices = LayoutVersion::generate_graph_vertices( - zone_to_id.len(), - self.nongateway_nodes().count(), - ); + let vertices = + LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); let mut g = Graph::::new(&vertices); let nb_zones = zone_to_id.len(); for p in 0..NB_PARTITIONS { @@ -578,8 +588,8 @@ impl LayoutVersion { )?; } } - for n in 0..self.nongateway_nodes().count() { - let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + for n in 0..self.nongateway_nodes().len() { + let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]); let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; for p in 0..NB_PARTITIONS { @@ -602,7 +612,7 @@ impl LayoutVersion { // previous assignment let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { - let nb_nodes = self.nongateway_nodes().count(); + let nb_nodes = self.nongateway_nodes().len(); for (p, prev_assign_p) in prev_assign.iter().enumerate() { for n in 0..nb_nodes { exclude_edge.insert((p, n)); @@ -654,7 +664,7 @@ impl LayoutVersion { // We compute the maximal length of a simple path in gflow. It is used in the // Bellman-Ford algorithm in optimize_flow_with_cost to set the number // of iterations. - let nb_nodes = self.nongateway_nodes().count(); + let nb_nodes = self.nongateway_nodes().len(); let path_length = 4 * nb_nodes; gflow.optimize_flow_with_cost(&cost, path_length)?; @@ -732,7 +742,7 @@ impl LayoutVersion { } // We define and fill in the following tables - let storing_nodes = self.nongateway_nodes().collect::>(); + let storing_nodes = self.nongateway_nodes(); let mut new_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()]; @@ -804,13 +814,13 @@ impl LayoutVersion { let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; let mut total_cap_z = 0; for n in nodes_of_z.iter() { - total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + total_cap_z += self.expect_get_node_capacity(&self.node_id_vec[*n]); } let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); for n in nodes_of_z.iter() { let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; - let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; + let total_cap_n = self.expect_get_node_capacity(&self.node_id_vec[*n]); let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or(""))?.tags_string(); table.push(format!( " {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)", -- cgit v1.2.3 From 1aab1f4e688ebc3f3adcb41c817c16c688a3291c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 13:06:16 +0100 Subject: layout: refactoring of all_nodes --- src/rpc/layout/history.rs | 15 +++++++++++++++ src/rpc/layout/version.rs | 17 +++++++---------- 2 files changed, 22 insertions(+), 10 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 877ad3a7..69348873 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -60,6 +60,21 @@ impl LayoutHistory { (self.current().version, self.all_ack(), self.min_stored()) } + pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { + // TODO: cache this + if self.versions.len() == 1 { + self.versions[0].all_nodes().into() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>().into() + } + } + pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { // TODO: cache this if self.versions.len() == 1 { diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index a7f387b6..2cbdcee2 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -38,22 +38,19 @@ impl LayoutVersion { // ===================== accessors ====================== - /// Returns a list of IDs of nodes that currently have - /// a role in the cluster - pub fn node_ids(&self) -> &[Uuid] { + /// Returns a list of IDs of nodes that have a role in this + /// version of the cluster layout, including gateway nodes + pub fn all_nodes(&self) -> &[Uuid] { &self.node_id_vec[..] } - /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + /// Returns a list of IDs of nodes that have a storage capacity + /// assigned in this version of the cluster layout pub fn nongateway_nodes(&self) -> &[Uuid] { &self.node_id_vec[..self.nongateway_node_count] } - pub fn num_nodes(&self) -> usize { - self.node_id_vec.len() - } - - /// Returns the role of a node in the layout + /// Returns the role of a node in the layout, if it has one pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { match self.roles.get(node) { Some(NodeRoleV(Some(v))) => Some(v), @@ -61,7 +58,7 @@ impl LayoutVersion { } } - /// Given a node uuids, this function returns its capacity or fails if it does not have any + /// Returns the capacity of a node in the layout, if it has one pub fn get_node_capacity(&self, uuid: &Uuid) -> Option { match self.node_role(uuid) { Some(NodeRole { -- cgit v1.2.3 From 83a11374ca45831a6f54928dfe726fac65493b00 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 13:29:26 +0100 Subject: layout: fixes in schema --- src/rpc/layout/schema.rs | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index db298ee6..79440a47 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -193,7 +193,25 @@ mod v010 { use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; - /// The layout of the cluster, i.e. the list of roles + /// The history of cluster layouts, with trackers to keep a record + /// of which nodes are up-to-date to current cluster data + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutHistory { + /// The versions currently in use in the cluster + pub versions: Vec, + + /// Update trackers + pub update_trackers: UpdateTrackers, + /// Hash of the update trackers + pub trackers_hash: Hash, + + /// Staged changes for the next version + pub staging: Lww, + /// Hash of the serialized staging_parameters + staging_roles + pub staging_hash: Hash, + } + + /// A version of the layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutVersion { @@ -228,23 +246,6 @@ mod v010 { pub roles: LwwMap, } - /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutHistory { - /// The versions currently in use in the cluster - pub versions: Vec, - - /// Update trackers - pub update_trackers: UpdateTrackers, - /// Hash of the update trackers - pub trackers_hash: Hash, - - /// Staged changes for the next version - pub staging: Lww, - /// Hash of the serialized staging_parameters + staging_roles - pub staging_hash: Hash, - } - /// The tracker of acknowlegments and data syncs around the cluster #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTrackers { @@ -275,7 +276,7 @@ mod v010 { let role = previous.roles.get(uuid); matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) }) - .map(|(i, _)| i) + .map(|(i, _)| i + 1) .max() .unwrap_or(0); @@ -312,8 +313,7 @@ mod v010 { staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; - ret.staging_hash = ret.calculate_staging_hash(); - ret.trackers_hash = ret.calculate_trackers_hash(); + ret.update_hashes(); ret } } -- cgit v1.2.3 From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/rpc/layout/history.rs | 19 ++++++++++++++++--- src/rpc/layout/version.rs | 21 ++++++++++----------- 2 files changed, 26 insertions(+), 14 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 69348873..dce492c9 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -98,13 +98,26 @@ impl LayoutHistory { .find(|x| x.version == sync_min) .or(self.versions.last()) .unwrap(); - version.nodes_of(position, version.replication_factor) + version + .nodes_of(position, version.replication_factor) + .collect() } - pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator> + 'a { + pub fn write_sets_of(&self, position: &Hash) -> Vec> { self.versions .iter() - .map(move |x| x.nodes_of(position, x.replication_factor)) + .map(|x| x.nodes_of(position, x.replication_factor).collect()) + .collect() + } + + pub fn storage_nodes_of(&self, position: &Hash) -> Vec { + let mut ret = vec![]; + for version in self.versions.iter() { + ret.extend(version.nodes_of(position, version.replication_factor)); + } + ret.sort(); + ret.dedup(); + ret } // ------------------ update tracking --------------- diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 2cbdcee2..912ee538 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -107,25 +107,24 @@ impl LayoutVersion { } /// Return the n servers in which data for this hash should be replicated - pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec { + pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator + '_ { assert_eq!(n, self.replication_factor); let data = &self.ring_assignment_data; - if data.len() != self.replication_factor * (1 << PARTITION_BITS) { + let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + &data[partition_start..partition_end] + } else { warn!("Ring not yet ready, read/writes will be lost!"); - return vec![]; - } - - let partition_idx = self.partition_of(position) as usize; - let partition_start = partition_idx * self.replication_factor; - let partition_end = (partition_idx + 1) * self.replication_factor; - let partition_nodes = &data[partition_start..partition_end]; + &[] + }; partition_nodes .iter() - .map(|i| self.node_id_vec[*i as usize]) - .collect::>() + .map(move |i| self.node_id_vec[*i as usize]) } // ===================== internal information extractors ====================== -- cgit v1.2.3 From 7ef2c231208073db5a0a0a8674e2dd2d2ecb2222 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 15:45:01 +0100 Subject: layout: fix test --- src/rpc/layout/test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index 0ce090d2..e9639073 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -35,7 +35,7 @@ fn check_against_naive(cl: &LayoutVersion) -> Result { } for uuid in cl.nongateway_nodes() { let z = cl.get_node_zone(&uuid)?; - let c = cl.get_node_capacity(&uuid)?; + let c = cl.get_node_capacity(&uuid).unwrap(); zone_token.insert( z.to_string(), zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize), -- cgit v1.2.3 From b3e729f4b8ec3b06593f8d3b161c76b1263d9f13 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 12:15:58 +0100 Subject: layout history merge: rm invalid versions when valid versions are added --- src/rpc/layout/history.rs | 18 ++++++++++++++++++ src/rpc/layout/version.rs | 20 ++++++++++---------- 2 files changed, 28 insertions(+), 10 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dce492c9..2346b14a 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -211,6 +211,24 @@ impl LayoutHistory { changed = changed || c; } + // If there are invalid versions before valid versions, remove them, + // and increment update trackers + if self.versions.len() > 1 && self.current().check().is_ok() { + while self.versions.first().unwrap().check().is_err() { + self.versions.remove(0); + changed = true; + } + if changed { + let min_v = self.versions.first().unwrap().version; + let nodes = self.all_nongateway_nodes().into_owned(); + for node in nodes { + self.update_trackers.ack_map.set_max(node, min_v); + self.update_trackers.sync_map.set_max(node, min_v); + self.update_trackers.sync_ack_map.set_max(node, min_v); + } + } + } + // Merge staged layout changes if self.staging != other.staging { self.staging.merge(&other.staging); diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 912ee538..947fab56 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -174,6 +174,16 @@ impl LayoutVersion { /// (assignment, roles, parameters, partition size) /// returns true if consistent, false if error pub fn check(&self) -> Result<(), String> { + // Check that the assignment data has the correct length + let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor; + if self.ring_assignment_data.len() != expected_assignment_data_len { + return Err(format!( + "ring_assignment_data has incorrect length {} instead of {}", + self.ring_assignment_data.len(), + expected_assignment_data_len + )); + } + // Check that node_id_vec contains the correct list of nodes let mut expected_nodes = self .roles @@ -189,16 +199,6 @@ impl LayoutVersion { return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes)); } - // Check that the assignment data has the correct length - let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor; - if self.ring_assignment_data.len() != expected_assignment_data_len { - return Err(format!( - "ring_assignment_data has incorrect length {} instead of {}", - self.ring_assignment_data.len(), - expected_assignment_data_len - )); - } - // Check that the assigned nodes are correct identifiers // of nodes that are assigned a role // and that role is not the role of a gateway nodes -- cgit v1.2.3 From 65066c70640371cc318faddfb4c05c96de18e86d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 13:28:30 +0100 Subject: layout: wip cache global mins --- src/rpc/layout/history.rs | 46 +++++++++++++++++++++++++++------------------- src/rpc/layout/manager.rs | 6 +++--- src/rpc/layout/schema.rs | 36 +++++++++++++++++++++++++++--------- 3 files changed, 57 insertions(+), 31 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 2346b14a..1684918e 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -9,6 +9,7 @@ use garage_util::error::*; use super::schema::*; use super::*; + impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); @@ -49,7 +50,7 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- pub fn all_ack(&self) -> u64 { - self.calculate_global_min(&self.update_trackers.ack_map) + self.update_trackers.ack_map.current_min } pub fn min_stored(&self) -> u64 { @@ -91,7 +92,7 @@ impl LayoutHistory { } pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let sync_min = self.update_trackers.sync_map.current_min; let version = self .versions .iter() @@ -122,7 +123,7 @@ impl LayoutHistory { // ------------------ update tracking --------------- - pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date // 1. Acknowledge the last layout version in the history @@ -138,6 +139,9 @@ impl LayoutHistory { // 4. Cleanup layout versions that are not needed anymore self.cleanup_old_versions(); + // 5. Recalculate global minima + self.update_trackers_min(); + info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); @@ -146,42 +150,41 @@ impl LayoutHistory { self.update_hashes(); } + fn update_trackers_min(&mut self) { + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes().into_owned(); + let min_version = self.versions.first().unwrap().version; + self.update_trackers.update_min(&storage_nodes, min_version); + } + pub(crate) fn ack_last(&mut self, node: Uuid) { let last_version = self.current().version; self.update_trackers.ack_map.set_max(node, last_version); + self.update_trackers_min(); } pub(crate) fn sync_first(&mut self, node: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; self.update_trackers.sync_map.set_max(node, first_version); + self.update_trackers_min(); } pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers.sync_ack_map.set_max( - node, - self.calculate_global_min(&self.update_trackers.sync_map), - ); + self.update_trackers + .sync_ack_map + .set_max(node, self.update_trackers.sync_map.current_min); + self.update_trackers_min(); } pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + let min_sync_ack = self.update_trackers.sync_ack_map.current_min; while self.versions.first().as_ref().unwrap().version < min_sync_ack { let removed = self.versions.remove(0); info!("Layout history: pruning old version {}", removed.version); } } - pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes(); - storage_nodes - .iter() - .map(|x| tracker.0.get(x).copied().unwrap_or(0)) - .min() - .unwrap_or(0) - } - // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -229,6 +232,11 @@ impl LayoutHistory { } } + // Update the current_min value in trackers if anything changed + if changed { + self.update_trackers_min(); + } + // Merge staged layout changes if self.staging != other.staging { self.staging.merge(&other.staging); diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index ce8b6f61..21ec2d8d 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -74,7 +74,7 @@ impl LayoutManager { } }; - cluster_layout.update_trackers(node_id.into()); + cluster_layout.update_trackers_of(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -158,7 +158,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.merge(adv) { - layout.update_trackers(self.node_id); + layout.update_trackers_of(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -172,7 +172,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update_trackers.merge(adv) { - layout.update_trackers(self.node_id); + layout.update_trackers_of(self.node_id); return Some(layout.update_trackers.clone()); } } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 79440a47..969f5a0b 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -260,7 +260,10 @@ mod v010 { /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker(pub BTreeMap); + pub struct UpdateTracker { + pub values: BTreeMap, + pub current_min: u64, + } impl garage_util::migrate::Migrate for LayoutHistory { const VERSION_MARKER: &'static [u8] = b"G010lh"; @@ -290,14 +293,15 @@ mod v010 { nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; - let update_tracker = UpdateTracker( - version + let update_tracker = UpdateTracker { + values: version .nongateway_nodes() .iter() .copied() .map(|x| (x, version.version)) .collect::>(), - ); + current_min: 0, + }; let staging = LayoutStaging { parameters: previous.staging_parameters, roles: previous.staging_roles, @@ -378,14 +382,14 @@ impl core::str::FromStr for ZoneRedundancy { impl UpdateTracker { fn merge(&mut self, other: &UpdateTracker) -> bool { let mut changed = false; - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { + for (k, v) in other.values.iter() { + if let Some(v_mut) = self.values.get_mut(k) { if *v > *v_mut { *v_mut = *v; changed = true; } } else { - self.0.insert(*k, *v); + self.values.insert(*k, *v); changed = true; } } @@ -393,18 +397,26 @@ impl UpdateTracker { } pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { - match self.0.get_mut(&peer) { + match self.values.get_mut(&peer) { Some(e) if *e < value => { *e = value; true } None => { - self.0.insert(peer, value); + self.values.insert(peer, value); true } _ => false, } } + + fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { + self.current_min = storage_nodes + .iter() + .map(|x| self.values.get(x).copied().unwrap_or(min_version)) + .min() + .unwrap_or(min_version) + } } impl UpdateTrackers { @@ -414,4 +426,10 @@ impl UpdateTrackers { let c3 = self.sync_ack_map.merge(&other.sync_ack_map); c1 || c2 || c3 } + + pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { + self.ack_map.update_min(&storage_nodes, min_version); + self.sync_map.update_min(&storage_nodes, min_version); + self.sync_ack_map.update_min(&storage_nodes, min_version); + } } -- cgit v1.2.3 From 393c4d4515e0cdadadc8de8ae2df12e4371cff88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 14:20:50 +0100 Subject: layout: add helper for cached/external values to centralize recomputation --- src/rpc/layout/history.rs | 311 ++++++++++++++++++++++++++++------------------ src/rpc/layout/manager.rs | 22 ++-- src/rpc/layout/schema.rs | 48 +++---- 3 files changed, 212 insertions(+), 169 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 1684918e..b6f0e495 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,5 @@ -use std::borrow::Cow; use std::collections::HashSet; +use std::ops::Deref; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -9,95 +9,106 @@ use garage_util::error::*; use super::schema::*; use super::*; +pub struct LayoutHelper { + layout: Option, -impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + // cached values + ack_map_min: u64, + sync_map_min: u64, - let staging = LayoutStaging { - parameters: Lww::::new(version.parameters), - roles: LwwMap::new(), - }; + all_nodes: Vec, + all_nongateway_nodes: Vec, - let mut ret = LayoutHistory { - versions: vec![version], - update_trackers: Default::default(), - trackers_hash: [0u8; 32].into(), - staging: Lww::raw(0, staging), - staging_hash: [0u8; 32].into(), - }; - ret.update_hashes(); - ret - } + trackers_hash: Hash, + staging_hash: Hash, +} - pub fn current(&self) -> &LayoutVersion { - self.versions.last().as_ref().unwrap() +impl Deref for LayoutHelper { + type Target = LayoutHistory; + fn deref(&self) -> &LayoutHistory { + self.layout() } +} - pub fn update_hashes(&mut self) { - self.trackers_hash = self.calculate_trackers_hash(); - self.staging_hash = self.calculate_staging_hash(); +impl LayoutHelper { + pub fn new(mut layout: LayoutHistory) -> Self { + layout.cleanup_old_versions(); + + let all_nongateway_nodes = layout.get_all_nongateway_nodes(); + layout.clamp_update_trackers(&all_nongateway_nodes); + + let min_version = layout.min_stored(); + let ack_map_min = layout + .update_trackers + .ack_map + .min(&all_nongateway_nodes, min_version); + let sync_map_min = layout + .update_trackers + .sync_map + .min(&all_nongateway_nodes, min_version); + + let all_nodes = layout.get_all_nodes(); + let trackers_hash = layout.calculate_trackers_hash(); + let staging_hash = layout.calculate_staging_hash(); + + LayoutHelper { + layout: Some(layout), + ack_map_min, + sync_map_min, + all_nodes, + all_nongateway_nodes, + trackers_hash, + staging_hash, + } } - pub(crate) fn calculate_trackers_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) - } + // ------------------ single updating function -------------- - pub(crate) fn calculate_staging_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + fn layout(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() } - // ------------------ who stores what now? --------------- - - pub fn all_ack(&self) -> u64 { - self.update_trackers.ack_map.current_min + pub(crate) fn update(&mut self, f: F) -> bool + where + F: FnOnce(&mut LayoutHistory) -> bool, + { + let changed = f(&mut self.layout.as_mut().unwrap()); + if changed { + *self = Self::new(self.layout.take().unwrap()); + } + changed } - pub fn min_stored(&self) -> u64 { - self.versions.first().as_ref().unwrap().version + // ------------------ read helpers --------------- + + pub fn all_nodes(&self) -> &[Uuid] { + &self.all_nodes } - pub fn sync_versions(&self) -> (u64, u64, u64) { - (self.current().version, self.all_ack(), self.min_stored()) + pub fn all_nongateway_nodes(&self) -> &[Uuid] { + &self.all_nongateway_nodes } - pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].all_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.all_nodes()) - .flatten() - .collect::>(); - set.into_iter().copied().collect::>().into() - } + pub fn all_ack(&self) -> u64 { + self.ack_map_min } - pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].nongateway_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>(); - set.into_iter().copied().collect::>().into() - } + pub fn sync_versions(&self) -> (u64, u64, u64) { + ( + self.layout().current().version, + self.all_ack(), + self.layout().min_stored(), + ) } pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.update_trackers.sync_map.current_min; + let sync_min = self.sync_map_min; let version = self + .layout() .versions .iter() .find(|x| x.version == sync_min) - .or(self.versions.last()) + .or(self.layout().versions.last()) .unwrap(); version .nodes_of(position, version.replication_factor) @@ -105,7 +116,8 @@ impl LayoutHistory { } pub fn write_sets_of(&self, position: &Hash) -> Vec> { - self.versions + self.layout() + .versions .iter() .map(|x| x.nodes_of(position, x.replication_factor).collect()) .collect() @@ -113,7 +125,7 @@ impl LayoutHistory { pub fn storage_nodes_of(&self, position: &Hash) -> Vec { let mut ret = vec![]; - for version in self.versions.iter() { + for version in self.layout().versions.iter() { ret.extend(version.nodes_of(position, version.replication_factor)); } ret.sort(); @@ -121,7 +133,35 @@ impl LayoutHistory { ret } - // ------------------ update tracking --------------- + pub fn trackers_hash(&self) -> Hash { + self.trackers_hash + } + + pub fn staging_hash(&self) -> Hash { + self.staging_hash + } + + // ------------------ helpers for update tracking --------------- + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(node, sync_map_min) + }); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + } pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date @@ -136,55 +176,104 @@ impl LayoutHistory { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(node_id); - // 4. Cleanup layout versions that are not needed anymore - self.cleanup_old_versions(); - - // 5. Recalculate global minima - self.update_trackers_min(); - info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } +} - // Finally, update hashes - self.update_hashes(); +// ---- + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; + + LayoutHistory { + versions: vec![version], + update_trackers: Default::default(), + staging: Lww::raw(0, staging), + } } - fn update_trackers_min(&mut self) { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes().into_owned(); - let min_version = self.versions.first().unwrap().version; - self.update_trackers.update_min(&storage_nodes, min_version); + // ------------------ who stores what now? --------------- + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update_trackers.ack_map.set_max(node, last_version); - self.update_trackers_min(); + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version } - pub(crate) fn sync_first(&mut self, node: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; - self.update_trackers.sync_map.set_max(node, first_version); - self.update_trackers_min(); + pub fn get_all_nodes(&self) -> Vec { + if self.versions.len() == 1 { + self.versions[0].all_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>() + } } - pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers - .sync_ack_map - .set_max(node, self.update_trackers.sync_map.current_min); - self.update_trackers_min(); + fn get_all_nongateway_nodes(&self) -> Vec { + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>() + } } - pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.update_trackers.sync_ack_map.current_min; - while self.versions.first().as_ref().unwrap().version < min_sync_ack { - let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + // ---- housekeeping (all invoked by LayoutHelper) ---- + + fn cleanup_old_versions(&mut self) { + loop { + let all_nongateway_nodes = self.get_all_nongateway_nodes(); + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min(&all_nongateway_nodes, min_version); + if self.min_stored() < sync_ack_map_min { + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); + } else { + break; + } } } + fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + let min_v = self.min_stored(); + for node in nodes { + self.update_trackers.ack_map.set_max(*node, min_v); + self.update_trackers.sync_map.set_max(*node, min_v); + self.update_trackers.sync_ack_map.set_max(*node, min_v); + } + } + + fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + + fn calculate_staging_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -221,20 +310,6 @@ impl LayoutHistory { self.versions.remove(0); changed = true; } - if changed { - let min_v = self.versions.first().unwrap().version; - let nodes = self.all_nongateway_nodes().into_owned(); - for node in nodes { - self.update_trackers.ack_map.set_max(node, min_v); - self.update_trackers.sync_map.set_max(node, min_v); - self.update_trackers.sync_ack_map.set_max(node, min_v); - } - } - } - - // Update the current_min value in trackers if anything changed - if changed { - self.update_trackers_min(); } // Merge staged layout changes @@ -280,7 +355,6 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: self.staging.get().parameters.clone(), roles: LwwMap::new(), }); - self.update_hashes(); Ok((self, msg)) } @@ -290,20 +364,11 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: Lww::new(self.current().parameters.clone()), roles: LwwMap::new(), }); - self.update_hashes(); Ok(self) } pub fn check(&self) -> Result<(), String> { - // Check that the hash of the staging data is correct - if self.trackers_hash != self.calculate_trackers_hash() { - return Err("trackers_hash is incorrect".into()); - } - if self.staging_hash != self.calculate_staging_hash() { - return Err("staging_hash is incorrect".into()); - } - for version in self.versions.iter() { version.check()?; } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 21ec2d8d..e270ad21 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -24,7 +24,7 @@ pub struct LayoutManager { replication_factor: usize, persist_cluster_layout: Persister, - layout: Arc>, + layout: Arc>, pub(crate) change_notify: Arc, table_sync_version: Mutex>, @@ -54,7 +54,7 @@ impl LayoutManager { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); - let mut cluster_layout = match persist_cluster_layout.load() { + let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { if x.current().replication_factor != replication_factor { return Err(Error::Message(format!( @@ -74,6 +74,7 @@ impl LayoutManager { } }; + let mut cluster_layout = LayoutHelper::new(cluster_layout); cluster_layout.update_trackers_of(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -100,7 +101,7 @@ impl LayoutManager { // ---- PUBLIC INTERFACE ---- - pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { self.layout.read().unwrap() } @@ -108,8 +109,8 @@ impl LayoutManager { let layout = self.layout(); LayoutStatus { cluster_layout_version: layout.current().version, - cluster_layout_trackers_hash: layout.trackers_hash, - cluster_layout_staging_hash: layout.staging_hash, + cluster_layout_trackers_hash: layout.trackers_hash(), + cluster_layout_staging_hash: layout.staging_hash(), } } @@ -137,13 +138,8 @@ impl LayoutManager { drop(table_sync_version); let mut layout = self.layout.write().unwrap(); - if layout - .update_trackers - .sync_map - .set_max(self.node_id, sync_until) - { + if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { debug!("sync_until updated to {}", sync_until); - layout.update_hashes(); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), )); @@ -157,7 +153,7 @@ impl LayoutManager { let prev_layout_check = layout.check().is_ok(); if !prev_layout_check || adv.check().is_ok() { - if layout.merge(adv) { + if layout.update(|l| l.merge(adv)) { layout.update_trackers_of(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); @@ -171,7 +167,7 @@ impl LayoutManager { fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { - if layout.update_trackers.merge(adv) { + if layout.update(|l| l.update_trackers.merge(adv)) { layout.update_trackers_of(self.node_id); return Some(layout.update_trackers.clone()); } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 969f5a0b..00a2c017 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -188,7 +188,7 @@ mod v010 { use super::v09; use crate::layout::CompactNodeType; use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::{Hash, Uuid}; + use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; @@ -202,13 +202,9 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Hash of the update trackers - pub trackers_hash: Hash, /// Staged changes for the next version pub staging: Lww, - /// Hash of the serialized staging_parameters + staging_roles - pub staging_hash: Hash, } /// A version of the layout of the cluster, i.e. the list of roles @@ -260,10 +256,7 @@ mod v010 { /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker { - pub values: BTreeMap, - pub current_min: u64, - } + pub struct UpdateTracker(pub BTreeMap); impl garage_util::migrate::Migrate for LayoutHistory { const VERSION_MARKER: &'static [u8] = b"G010lh"; @@ -293,32 +286,27 @@ mod v010 { nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; - let update_tracker = UpdateTracker { - values: version + let update_tracker = UpdateTracker( + version .nongateway_nodes() .iter() .copied() .map(|x| (x, version.version)) .collect::>(), - current_min: 0, - }; + ); let staging = LayoutStaging { parameters: previous.staging_parameters, roles: previous.staging_roles, }; - let mut ret = Self { + Self { versions: vec![version], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - trackers_hash: [0u8; 32].into(), staging: Lww::raw(previous.version, staging), - staging_hash: [0u8; 32].into(), - }; - ret.update_hashes(); - ret + } } } } @@ -382,14 +370,14 @@ impl core::str::FromStr for ZoneRedundancy { impl UpdateTracker { fn merge(&mut self, other: &UpdateTracker) -> bool { let mut changed = false; - for (k, v) in other.values.iter() { - if let Some(v_mut) = self.values.get_mut(k) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { if *v > *v_mut { *v_mut = *v; changed = true; } } else { - self.values.insert(*k, *v); + self.0.insert(*k, *v); changed = true; } } @@ -397,23 +385,23 @@ impl UpdateTracker { } pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { - match self.values.get_mut(&peer) { + match self.0.get_mut(&peer) { Some(e) if *e < value => { *e = value; true } None => { - self.values.insert(peer, value); + self.0.insert(peer, value); true } _ => false, } } - fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { - self.current_min = storage_nodes + pub(crate) fn min(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + storage_nodes .iter() - .map(|x| self.values.get(x).copied().unwrap_or(min_version)) + .map(|x| self.0.get(x).copied().unwrap_or(min_version)) .min() .unwrap_or(min_version) } @@ -426,10 +414,4 @@ impl UpdateTrackers { let c3 = self.sync_ack_map.merge(&other.sync_ack_map); c1 || c2 || c3 } - - pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { - self.ack_map.update_min(&storage_nodes, min_version); - self.sync_map.update_min(&storage_nodes, min_version); - self.sync_ack_map.update_min(&storage_nodes, min_version); - } } -- cgit v1.2.3 From 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:40:44 +0100 Subject: layou: implement ack locking --- src/rpc/layout/history.rs | 98 ++++++++++++++++++++++++++++++++++------------- src/rpc/layout/manager.rs | 74 ++++++++++++++++++++++++++++++++--- src/rpc/layout/mod.rs | 1 + 3 files changed, 141 insertions(+), 32 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b6f0e495..dd38efa7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; use std::collections::HashSet; use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -21,6 +23,11 @@ pub struct LayoutHelper { trackers_hash: Hash, staging_hash: Hash, + + // ack lock: counts in-progress write operations for each + // layout version ; we don't increase the ack update tracker + // while this lock is nonzero + pub(crate) ack_lock: HashMap, } impl Deref for LayoutHelper { @@ -31,7 +38,7 @@ impl Deref for LayoutHelper { } impl LayoutHelper { - pub fn new(mut layout: LayoutHistory) -> Self { + pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { layout.cleanup_old_versions(); let all_nongateway_nodes = layout.get_all_nongateway_nodes(); @@ -51,6 +58,11 @@ impl LayoutHelper { let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); + ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); + ack_lock + .entry(layout.current().version) + .or_insert(AtomicUsize::new(0)); + LayoutHelper { layout: Some(layout), ack_map_min, @@ -59,6 +71,7 @@ impl LayoutHelper { all_nongateway_nodes, trackers_hash, staging_hash, + ack_lock, } } @@ -74,7 +87,10 @@ impl LayoutHelper { { let changed = f(&mut self.layout.as_mut().unwrap()); if changed { - *self = Self::new(self.layout.take().unwrap()); + *self = Self::new( + self.layout.take().unwrap(), + std::mem::take(&mut self.ack_lock), + ); } changed } @@ -115,7 +131,7 @@ impl LayoutHelper { .collect() } - pub fn write_sets_of(&self, position: &Hash) -> Vec> { + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { self.layout() .versions .iter() @@ -143,42 +159,72 @@ impl LayoutHelper { // ------------------ helpers for update tracking --------------- - pub(crate) fn sync_first(&mut self, node: Uuid) { + pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version which is not currently + // locked by an in-progress write operation + self.ack_max_free(local_node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(local_node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(local_node_id); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } + + fn sync_first(&mut self, local_node_id: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; - self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); } - pub(crate) fn sync_ack(&mut self, node: Uuid) { + fn sync_ack(&mut self, local_node_id: Uuid) { let sync_map_min = self.sync_map_min; self.update(|layout| { layout .update_trackers .sync_ack_map - .set_max(node, sync_map_min) + .set_max(local_node_id, sync_map_min) }); } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_ack = self.max_free_ack(); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_ack) + }); + if changed { + info!("ack_until updated to {}", max_ack); + } + changed } - pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { - // Ensure trackers for this node's values are up-to-date - - // 1. Acknowledge the last layout version in the history - self.ack_last(node_id); - - // 2. Assume the data on this node is sync'ed up at least to - // the first layout version in the history - self.sync_first(node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(node_id); - - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + pub(crate) fn max_free_ack(&self) -> u64 { + self.layout() + .versions + .iter() + .map(|x| x.version) + .take_while(|v| { + self.ack_lock + .get(v) + .map(|x| x.load(Ordering::Relaxed) == 0) + .unwrap_or(true) + }) + .max() + .unwrap_or(self.min_stored()) } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index e270ad21..4e073d1f 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; +use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -74,8 +74,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new(cluster_layout); - cluster_layout.update_trackers_of(node_id.into()); + let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default()); + cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -139,13 +139,36 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { - debug!("sync_until updated to {}", sync_until); + info!("sync_until updated to {}", sync_until); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), )); } } + fn ack_new_version(self: &Arc) { + let mut layout = self.layout.write().unwrap(); + if layout.ack_max_free(self.node_id) { + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + + // ---- ACK LOCKING ---- + + pub fn write_sets_of(self: &Arc, position: &Hash) -> WriteLock>> { + let layout = self.layout(); + let version = layout.current().version; + let nodes = layout.write_sets_of(position); + layout + .ack_lock + .get(&version) + .unwrap() + .fetch_add(1, Ordering::Relaxed); + WriteLock::new(version, self, nodes) + } + // ---- INTERNALS --- fn merge_layout(&self, adv: &LayoutHistory) -> Option { @@ -154,7 +177,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.update(|l| l.merge(adv)) { - layout.update_trackers_of(self.node_id); + layout.update_trackers(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -168,7 +191,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { - layout.update_trackers_of(self.node_id); + layout.update_trackers(self.node_id); return Some(layout.update_trackers.clone()); } } @@ -317,3 +340,42 @@ impl LayoutManager { Ok(SystemRpc::Ok) } } + +// ---- ack lock ---- + +pub struct WriteLock { + layout_version: u64, + layout_manager: Arc, + value: T, +} + +impl WriteLock { + fn new(version: u64, layout_manager: &Arc, value: T) -> Self { + Self { + layout_version: version, + layout_manager: layout_manager.clone(), + value, + } + } +} + +impl AsRef for WriteLock { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl Drop for WriteLock { + fn drop(&mut self) { + let layout = self.layout_manager.layout(); // acquire read lock + if let Some(counter) = layout.ack_lock.get(&self.layout_version) { + let prev_lock = counter.fetch_sub(1, Ordering::Relaxed); + if prev_lock == 1 && layout.current().version > self.layout_version { + drop(layout); // release read lock, write lock will be acquired + self.layout_manager.ack_new_version(); + } + } else { + error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version); + } + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 577b32fb..859287c8 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -11,6 +11,7 @@ pub mod manager; // ---- re-exports ---- pub use history::*; +pub use manager::WriteLock; pub use schema::*; pub use version::*; -- cgit v1.2.3 From d4df03424f1c7f3cc1eaba9e16d2e1d049131b97 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:56:57 +0100 Subject: layout: fix test --- src/rpc/layout/test.rs | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index e9639073..bb072c97 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -113,8 +113,6 @@ fn update_layout( staging.parameters.update(LayoutParameters { zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), }); - - cl.update_hashes(); } #[test] -- cgit v1.2.3 From ad5c6f779f7fdfdc0569920c830c59197023515a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Nov 2023 13:26:43 +0100 Subject: layout: split helper in separate file; more precise difference tracking --- src/rpc/layout/helper.rs | 224 +++++++++++++++++++++++++++++++++++++ src/rpc/layout/history.rs | 278 +++++----------------------------------------- src/rpc/layout/manager.rs | 5 +- src/rpc/layout/mod.rs | 3 +- 4 files changed, 256 insertions(+), 254 deletions(-) create mode 100644 src/rpc/layout/helper.rs (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs new file mode 100644 index 00000000..ed3da498 --- /dev/null +++ b/src/rpc/layout/helper.rs @@ -0,0 +1,224 @@ +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use garage_util::data::*; + +use super::schema::*; + +pub struct LayoutHelper { + layout: Option, + + // cached values + ack_map_min: u64, + sync_map_min: u64, + + all_nodes: Vec, + all_nongateway_nodes: Vec, + + pub(crate) trackers_hash: Hash, + pub(crate) staging_hash: Hash, + + // ack lock: counts in-progress write operations for each + // layout version ; we don't increase the ack update tracker + // while this lock is nonzero + pub(crate) ack_lock: HashMap, +} + +impl Deref for LayoutHelper { + type Target = LayoutHistory; + fn deref(&self) -> &LayoutHistory { + self.layout() + } +} + +impl LayoutHelper { + pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { + layout.cleanup_old_versions(); + + let all_nongateway_nodes = layout.get_all_nongateway_nodes(); + layout.clamp_update_trackers(&all_nongateway_nodes); + + let min_version = layout.min_stored(); + let ack_map_min = layout + .update_trackers + .ack_map + .min(&all_nongateway_nodes, min_version); + let sync_map_min = layout + .update_trackers + .sync_map + .min(&all_nongateway_nodes, min_version); + + let all_nodes = layout.get_all_nodes(); + let trackers_hash = layout.calculate_trackers_hash(); + let staging_hash = layout.calculate_staging_hash(); + + ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); + ack_lock + .entry(layout.current().version) + .or_insert(AtomicUsize::new(0)); + + LayoutHelper { + layout: Some(layout), + ack_map_min, + sync_map_min, + all_nodes, + all_nongateway_nodes, + trackers_hash, + staging_hash, + ack_lock, + } + } + + // ------------------ single updating function -------------- + + fn layout(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() + } + + pub(crate) fn update(&mut self, f: F) -> bool + where + F: FnOnce(&mut LayoutHistory) -> bool, + { + let changed = f(&mut self.layout.as_mut().unwrap()); + if changed { + *self = Self::new( + self.layout.take().unwrap(), + std::mem::take(&mut self.ack_lock), + ); + } + changed + } + + // ------------------ read helpers --------------- + + pub fn all_nodes(&self) -> &[Uuid] { + &self.all_nodes + } + + pub fn all_nongateway_nodes(&self) -> &[Uuid] { + &self.all_nongateway_nodes + } + + pub fn all_ack(&self) -> u64 { + self.ack_map_min + } + + pub fn sync_versions(&self) -> (u64, u64, u64) { + ( + self.layout().current().version, + self.all_ack(), + self.layout().min_stored(), + ) + } + + pub fn read_nodes_of(&self, position: &Hash) -> Vec { + let sync_min = self.sync_map_min; + let version = self + .layout() + .versions + .iter() + .find(|x| x.version == sync_min) + .or(self.layout().versions.last()) + .unwrap(); + version + .nodes_of(position, version.replication_factor) + .collect() + } + + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { + self.layout() + .versions + .iter() + .map(|x| x.nodes_of(position, x.replication_factor).collect()) + .collect() + } + + pub fn storage_nodes_of(&self, position: &Hash) -> Vec { + let mut ret = vec![]; + for version in self.layout().versions.iter() { + ret.extend(version.nodes_of(position, version.replication_factor)); + } + ret.sort(); + ret.dedup(); + ret + } + + pub fn trackers_hash(&self) -> Hash { + self.trackers_hash + } + + pub fn staging_hash(&self) -> Hash { + self.staging_hash + } + + // ------------------ helpers for update tracking --------------- + + pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version which is not currently + // locked by an in-progress write operation + self.ack_max_free(local_node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(local_node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(local_node_id); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } + + fn sync_first(&mut self, local_node_id: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); + } + + fn sync_ack(&mut self, local_node_id: Uuid) { + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(local_node_id, sync_map_min) + }); + } + + pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_ack = self.max_free_ack(); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_ack) + }); + if changed { + info!("ack_until updated to {}", max_ack); + } + changed + } + + pub(crate) fn max_free_ack(&self) -> u64 { + self.layout() + .versions + .iter() + .map(|x| x.version) + .take_while(|v| { + self.ack_lock + .get(v) + .map(|x| x.load(Ordering::Relaxed) == 0) + .unwrap_or(true) + }) + .max() + .unwrap_or(self.min_stored()) + } +} diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dd38efa7..0a139549 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,7 +1,4 @@ -use std::collections::HashMap; use std::collections::HashSet; -use std::ops::Deref; -use std::sync::atomic::{AtomicUsize, Ordering}; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -11,225 +8,6 @@ use garage_util::error::*; use super::schema::*; use super::*; -pub struct LayoutHelper { - layout: Option, - - // cached values - ack_map_min: u64, - sync_map_min: u64, - - all_nodes: Vec, - all_nongateway_nodes: Vec, - - trackers_hash: Hash, - staging_hash: Hash, - - // ack lock: counts in-progress write operations for each - // layout version ; we don't increase the ack update tracker - // while this lock is nonzero - pub(crate) ack_lock: HashMap, -} - -impl Deref for LayoutHelper { - type Target = LayoutHistory; - fn deref(&self) -> &LayoutHistory { - self.layout() - } -} - -impl LayoutHelper { - pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { - layout.cleanup_old_versions(); - - let all_nongateway_nodes = layout.get_all_nongateway_nodes(); - layout.clamp_update_trackers(&all_nongateway_nodes); - - let min_version = layout.min_stored(); - let ack_map_min = layout - .update_trackers - .ack_map - .min(&all_nongateway_nodes, min_version); - let sync_map_min = layout - .update_trackers - .sync_map - .min(&all_nongateway_nodes, min_version); - - let all_nodes = layout.get_all_nodes(); - let trackers_hash = layout.calculate_trackers_hash(); - let staging_hash = layout.calculate_staging_hash(); - - ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); - ack_lock - .entry(layout.current().version) - .or_insert(AtomicUsize::new(0)); - - LayoutHelper { - layout: Some(layout), - ack_map_min, - sync_map_min, - all_nodes, - all_nongateway_nodes, - trackers_hash, - staging_hash, - ack_lock, - } - } - - // ------------------ single updating function -------------- - - fn layout(&self) -> &LayoutHistory { - self.layout.as_ref().unwrap() - } - - pub(crate) fn update(&mut self, f: F) -> bool - where - F: FnOnce(&mut LayoutHistory) -> bool, - { - let changed = f(&mut self.layout.as_mut().unwrap()); - if changed { - *self = Self::new( - self.layout.take().unwrap(), - std::mem::take(&mut self.ack_lock), - ); - } - changed - } - - // ------------------ read helpers --------------- - - pub fn all_nodes(&self) -> &[Uuid] { - &self.all_nodes - } - - pub fn all_nongateway_nodes(&self) -> &[Uuid] { - &self.all_nongateway_nodes - } - - pub fn all_ack(&self) -> u64 { - self.ack_map_min - } - - pub fn sync_versions(&self) -> (u64, u64, u64) { - ( - self.layout().current().version, - self.all_ack(), - self.layout().min_stored(), - ) - } - - pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.sync_map_min; - let version = self - .layout() - .versions - .iter() - .find(|x| x.version == sync_min) - .or(self.layout().versions.last()) - .unwrap(); - version - .nodes_of(position, version.replication_factor) - .collect() - } - - pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { - self.layout() - .versions - .iter() - .map(|x| x.nodes_of(position, x.replication_factor).collect()) - .collect() - } - - pub fn storage_nodes_of(&self, position: &Hash) -> Vec { - let mut ret = vec![]; - for version in self.layout().versions.iter() { - ret.extend(version.nodes_of(position, version.replication_factor)); - } - ret.sort(); - ret.dedup(); - ret - } - - pub fn trackers_hash(&self) -> Hash { - self.trackers_hash - } - - pub fn staging_hash(&self) -> Hash { - self.staging_hash - } - - // ------------------ helpers for update tracking --------------- - - pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { - // Ensure trackers for this node's values are up-to-date - - // 1. Acknowledge the last layout version which is not currently - // locked by an in-progress write operation - self.ack_max_free(local_node_id); - - // 2. Assume the data on this node is sync'ed up at least to - // the first layout version in the history - self.sync_first(local_node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(local_node_id); - - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); - } - - fn sync_first(&mut self, local_node_id: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; - self.update(|layout| { - layout - .update_trackers - .sync_map - .set_max(local_node_id, first_version) - }); - } - - fn sync_ack(&mut self, local_node_id: Uuid) { - let sync_map_min = self.sync_map_min; - self.update(|layout| { - layout - .update_trackers - .sync_ack_map - .set_max(local_node_id, sync_map_min) - }); - } - - pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { - let max_ack = self.max_free_ack(); - let changed = self.update(|layout| { - layout - .update_trackers - .ack_map - .set_max(local_node_id, max_ack) - }); - if changed { - info!("ack_until updated to {}", max_ack); - } - changed - } - - pub(crate) fn max_free_ack(&self) -> u64 { - self.layout() - .versions - .iter() - .map(|x| x.version) - .take_while(|v| { - self.ack_lock - .get(v) - .map(|x| x.load(Ordering::Relaxed) == 0) - .unwrap_or(true) - }) - .max() - .unwrap_or(self.min_stored()) - } -} - -// ---- - impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); @@ -270,7 +48,7 @@ impl LayoutHistory { } } - fn get_all_nongateway_nodes(&self) -> Vec { + pub(crate) fn get_all_nongateway_nodes(&self) -> Vec { if self.versions.len() == 1 { self.versions[0].nongateway_nodes().to_vec() } else { @@ -286,8 +64,21 @@ impl LayoutHistory { // ---- housekeeping (all invoked by LayoutHelper) ---- - fn cleanup_old_versions(&mut self) { - loop { + pub(crate) fn cleanup_old_versions(&mut self) { + // If there are invalid versions before valid versions, remove them + if self.versions.len() > 1 && self.current().check().is_ok() { + while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() { + let removed = self.versions.remove(0); + info!( + "Layout history: pruning old invalid version {}", + removed.version + ); + } + } + + // If there are old versions that no one is reading from anymore, + // remove them + while self.versions.len() > 1 { let all_nongateway_nodes = self.get_all_nongateway_nodes(); let min_version = self.min_stored(); let sync_ack_map_min = self @@ -303,7 +94,7 @@ impl LayoutHistory { } } - fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { let min_v = self.min_stored(); for node in nodes { self.update_trackers.ack_map.set_max(*node, min_v); @@ -312,11 +103,11 @@ impl LayoutHistory { } } - fn calculate_trackers_hash(&self) -> Hash { + pub(crate) fn calculate_trackers_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) } - fn calculate_staging_hash(&self) -> Hash { + pub(crate) fn calculate_staging_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } @@ -328,6 +119,7 @@ impl LayoutHistory { // Add any new versions to history for v2 in other.versions.iter() { if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { + // Version is already present, check consistency if v1 != v2 { error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); } @@ -344,24 +136,14 @@ impl LayoutHistory { } // Merge trackers - if self.update_trackers != other.update_trackers { - let c = self.update_trackers.merge(&other.update_trackers); - changed = changed || c; - } - - // If there are invalid versions before valid versions, remove them, - // and increment update trackers - if self.versions.len() > 1 && self.current().check().is_ok() { - while self.versions.first().unwrap().check().is_err() { - self.versions.remove(0); - changed = true; - } - } + let c = self.update_trackers.merge(&other.update_trackers); + changed = changed || c; // Merge staged layout changes if self.staging != other.staging { + let prev_staging = self.staging.clone(); self.staging.merge(&other.staging); - changed = true; + changed = changed || self.staging != prev_staging; } changed @@ -390,11 +172,7 @@ To know the correct value of the new layout version, invoke `garage layout show` .calculate_next_version(&self.staging.get())?; self.versions.push(new_version); - if self.current().check().is_ok() { - while self.versions.first().unwrap().check().is_err() { - self.versions.remove(0); - } - } + self.cleanup_old_versions(); // Reset the staged layout changes self.staging.update(LayoutStaging { @@ -415,11 +193,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } pub fn check(&self) -> Result<(), String> { - for version in self.versions.iter() { - version.check()?; - } - // TODO: anything more ? - Ok(()) + self.current().check() } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 4e073d1f..85d94ffa 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -184,17 +184,20 @@ impl LayoutManager { return Some(layout.clone()); } } + None } fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option { let mut layout = self.layout.write().unwrap(); + if layout.update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { layout.update_trackers(self.node_id); return Some(layout.update_trackers.clone()); } } + None } @@ -284,7 +287,7 @@ impl LayoutManager { } pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning + let layout = self.layout.read().unwrap().clone(); SystemRpc::AdvertiseClusterLayout(layout) } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 859287c8..91151ab4 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,4 +1,5 @@ mod graph_algo; +mod helper; mod history; mod schema; mod version; @@ -10,7 +11,7 @@ pub mod manager; // ---- re-exports ---- -pub use history::*; +pub use helper::LayoutHelper; pub use manager::WriteLock; pub use schema::*; pub use version::*; -- cgit v1.2.3 From 707442f5de416fdbed4681a33b739f0a787b7834 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Nov 2023 13:51:40 +0100 Subject: layout: refactor digests and add "!=" assertions before epidemic bcast --- src/rpc/layout/helper.rs | 27 +++++++++++++++++++++++++-- src/rpc/layout/history.rs | 1 - src/rpc/layout/manager.rs | 36 ++++++++++-------------------------- src/rpc/layout/mod.rs | 2 +- 4 files changed, 36 insertions(+), 30 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index ed3da498..0d746ea3 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -2,10 +2,24 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; +use serde::{Deserialize, Serialize}; + use garage_util::data::*; use super::schema::*; +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct LayoutDigest { + /// Cluster layout version + pub current_version: u64, + /// Number of active layout versions + pub active_versions: usize, + /// Hash of cluster layout update trackers + pub trackers_hash: Hash, + /// Hash of cluster layout staging data + pub staging_hash: Hash, +} + pub struct LayoutHelper { layout: Option, @@ -16,8 +30,8 @@ pub struct LayoutHelper { all_nodes: Vec, all_nongateway_nodes: Vec, - pub(crate) trackers_hash: Hash, - pub(crate) staging_hash: Hash, + trackers_hash: Hash, + staging_hash: Hash, // ack lock: counts in-progress write operations for each // layout version ; we don't increase the ack update tracker @@ -152,6 +166,15 @@ impl LayoutHelper { self.staging_hash } + pub fn digest(&self) -> LayoutDigest { + LayoutDigest { + current_version: self.current().version, + active_versions: self.versions.len(), + trackers_hash: self.trackers_hash, + staging_hash: self.staging_hash, + } + } + // ------------------ helpers for update tracking --------------- pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 0a139549..653d2a48 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -5,7 +5,6 @@ use garage_util::data::*; use garage_util::encode::nonversioned_encode; use garage_util::error::*; -use super::schema::*; use super::*; impl LayoutHistory { diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 85d94ffa..c65831a2 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -2,8 +2,6 @@ use std::collections::HashMap; use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; -use serde::{Deserialize, Serialize}; - use tokio::sync::Notify; use netapp::endpoint::Endpoint; @@ -33,16 +31,6 @@ pub struct LayoutManager { system_endpoint: Arc>, } -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct LayoutStatus { - /// Cluster layout version - pub cluster_layout_version: u64, - /// Hash of cluster layout update trackers - pub cluster_layout_trackers_hash: Hash, - /// Hash of cluster layout staging data - pub cluster_layout_staging_hash: Hash, -} - impl LayoutManager { pub fn new( config: &Config, @@ -105,15 +93,6 @@ impl LayoutManager { self.layout.read().unwrap() } - pub fn status(&self) -> LayoutStatus { - let layout = self.layout(); - LayoutStatus { - cluster_layout_version: layout.current().version, - cluster_layout_trackers_hash: layout.trackers_hash(), - cluster_layout_staging_hash: layout.staging_hash(), - } - } - pub async fn update_cluster_layout( self: &Arc, layout: &LayoutHistory, @@ -173,6 +152,7 @@ impl LayoutManager { fn merge_layout(&self, adv: &LayoutHistory) -> Option { let mut layout = self.layout.write().unwrap(); + let prev_digest = layout.digest(); let prev_layout_check = layout.check().is_ok(); if !prev_layout_check || adv.check().is_ok() { @@ -181,6 +161,7 @@ impl LayoutManager { if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } + assert!(layout.digest() != prev_digest); return Some(layout.clone()); } } @@ -190,10 +171,12 @@ impl LayoutManager { fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option { let mut layout = self.layout.write().unwrap(); + let prev_digest = layout.digest(); if layout.update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { layout.update_trackers(self.node_id); + assert!(layout.digest() != prev_digest); return Some(layout.update_trackers.clone()); } } @@ -269,16 +252,17 @@ impl LayoutManager { // ---- RPC HANDLERS ---- - pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, remote: &LayoutStatus) { - let local = self.status(); - if remote.cluster_layout_version > local.cluster_layout_version - || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash + pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, remote: &LayoutDigest) { + let local = self.layout().digest(); + if remote.current_version > local.current_version + || remote.active_versions != local.active_versions + || remote.staging_hash != local.staging_hash { tokio::spawn({ let this = self.clone(); async move { this.pull_cluster_layout(from).await } }); - } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash { + } else if remote.trackers_hash != local.trackers_hash { tokio::spawn({ let this = self.clone(); async move { this.pull_cluster_layout_trackers(from).await } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 91151ab4..eb127fda 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -11,7 +11,7 @@ pub mod manager; // ---- re-exports ---- -pub use helper::LayoutHelper; +pub use helper::{LayoutDigest, LayoutHelper}; pub use manager::WriteLock; pub use schema::*; pub use version::*; -- cgit v1.2.3 From d6d239fc7909cbd017da6ea35cceb3d561a87cca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 27 Nov 2023 11:52:57 +0100 Subject: block manager: read_block using old layout versions if necessary --- src/rpc/layout/helper.rs | 23 +++++++++++++++++++++++ src/rpc/layout/history.rs | 12 +++++++++++- src/rpc/layout/schema.rs | 7 +++++++ 3 files changed, 41 insertions(+), 1 deletion(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 0d746ea3..5d159f3e 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::schema::*; +use crate::rpc_helper::RpcHelper; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct LayoutDigest { @@ -140,6 +141,28 @@ impl LayoutHelper { .collect() } + pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec { + let mut ret = Vec::with_capacity(12); + let ver_iter = self + .layout() + .versions + .iter() + .rev() + .chain(self.layout().old_versions.iter().rev()); + for ver in ver_iter { + if ver.version > self.sync_map_min { + continue; + } + let nodes = ver.nodes_of(position, ver.replication_factor); + for node in rpc_helper.request_order(nodes) { + if !ret.contains(&node) { + ret.push(node); + } + } + } + ret + } + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { self.layout() .versions diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 653d2a48..7d4a1b48 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,6 +18,7 @@ impl LayoutHistory { LayoutHistory { versions: vec![version], + old_versions: vec![], update_trackers: Default::default(), staging: Lww::raw(0, staging), } @@ -86,11 +87,20 @@ impl LayoutHistory { .min(&all_nongateway_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + info!( + "Layout history: moving version {} to old_versions", + removed.version + ); + self.old_versions.push(removed); } else { break; } } + + while self.old_versions.len() > OLD_VERSION_COUNT { + let removed = self.old_versions.remove(0); + info!("Layout history: removing old_version {}", removed.version); + } } pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 00a2c017..08db44ca 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -193,12 +193,18 @@ mod v010 { use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + pub const OLD_VERSION_COUNT: usize = 5; + /// The history of cluster layouts, with trackers to keep a record /// of which nodes are up-to-date to current cluster data #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutHistory { /// The versions currently in use in the cluster pub versions: Vec, + /// At most 5 of the previous versions, not used by the garage_table + /// module, but usefull for the garage_block module to find data blocks + /// that have not yet been moved + pub old_versions: Vec, /// Update trackers pub update_trackers: UpdateTrackers, @@ -300,6 +306,7 @@ mod v010 { }; Self { versions: vec![version], + old_versions: vec![], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), -- cgit v1.2.3 From 11e6fef93ce3ca56584fc99223b71da77d320dd7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 27 Nov 2023 16:17:41 +0100 Subject: cli: add layout history and layout assume-sync commands --- src/rpc/layout/schema.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 08db44ca..cb36297d 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -391,7 +391,10 @@ impl UpdateTracker { changed } - pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { + /// This bumps the update tracker for a given node up to the specified value. + /// This has potential impacts on the correctness of Garage and should only + /// be used in very specific circumstances. + pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { match self.0.get_mut(&peer) { Some(e) if *e < value => { *e = value; @@ -412,6 +415,10 @@ impl UpdateTracker { .min() .unwrap_or(min_version) } + + pub fn get(&self, node: &Uuid) -> u64 { + self.0.get(node).copied().unwrap_or(0) + } } impl UpdateTrackers { -- cgit v1.2.3 From c8356a91d9bf1d1488ec288099f2a55a1019918f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 10:30:26 +0100 Subject: layout updates: fix the set of nodes among which minima are calculated --- src/rpc/layout/helper.rs | 25 +++++++++++++++++++++---- src/rpc/layout/history.rs | 8 +++++--- src/rpc/layout/schema.rs | 2 +- 3 files changed, 27 insertions(+), 8 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 5d159f3e..881a039e 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -51,20 +51,37 @@ impl LayoutHelper { pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { layout.cleanup_old_versions(); + let all_nodes = layout.get_all_nodes(); let all_nongateway_nodes = layout.get_all_nongateway_nodes(); - layout.clamp_update_trackers(&all_nongateway_nodes); + + layout.clamp_update_trackers(&all_nodes); let min_version = layout.min_stored(); + + // ack_map_min is the minimum value of ack_map among all nodes + // in the cluster (gateway, non-gateway, current and previous layouts). + // It is the highest layout version which all of these nodes have + // acknowledged, indicating that they are aware of it and are no + // longer processing write operations that did not take it into account. let ack_map_min = layout .update_trackers .ack_map - .min(&all_nongateway_nodes, min_version); + .min_among(&all_nodes, min_version); + + // sync_map_min is the minimum value of sync_map among all storage nodes + // in the cluster (non-gateway nodes only, current and previous layouts). + // It is the highest layout version for which we know that all relevant + // storage nodes have fullfilled a sync, and therefore it is safe to + // use a read quorum within that layout to ensure consistency. + // Gateway nodes are excluded here because they hold no relevant data + // (they store the bucket and access key tables, but we don't have + // consistency on those). + // TODO: this value could take quorums into account instead. let sync_map_min = layout .update_trackers .sync_map - .min(&all_nongateway_nodes, min_version); + .min_among(&all_nongateway_nodes, min_version); - let all_nodes = layout.get_all_nodes(); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 7d4a1b48..c448ac24 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -77,14 +77,16 @@ impl LayoutHistory { } // If there are old versions that no one is reading from anymore, - // remove them + // remove them (keep them in self.old_versions). + // ASSUMPTION: we only care about where nodes in the current layout version + // are reading from, as we assume older nodes are being discarded. while self.versions.len() > 1 { - let all_nongateway_nodes = self.get_all_nongateway_nodes(); + let current_nodes = &self.current().node_id_vec; let min_version = self.min_stored(); let sync_ack_map_min = self .update_trackers .sync_ack_map - .min(&all_nongateway_nodes, min_version); + .min_among(¤t_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); info!( diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index cb36297d..49e84420 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -408,7 +408,7 @@ impl UpdateTracker { } } - pub(crate) fn min(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { storage_nodes .iter() .map(|x| self.0.get(x).copied().unwrap_or(min_version)) -- cgit v1.2.3 From 95eb13eb08d517d328e3c8aeb222440a27211ee9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 10:55:15 +0100 Subject: rpc: refactor result tracking for quorum sets --- src/rpc/layout/manager.rs | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c65831a2..17465019 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -352,6 +352,12 @@ impl AsRef for WriteLock { } } +impl AsMut for WriteLock { + fn as_mut(&mut self) -> &mut T { + &mut self.value + } +} + impl Drop for WriteLock { fn drop(&mut self) { let layout = self.layout_manager.layout(); // acquire read lock -- cgit v1.2.3 From d90de365b3b30cb631b22fcd62c98bddb5a91549 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 11:16:10 +0100 Subject: table sync: use write quorums to report global success or failure of sync --- src/rpc/layout/helper.rs | 2 +- src/rpc/layout/manager.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 881a039e..0aa7c6aa 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -180,7 +180,7 @@ impl LayoutHelper { ret } - pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { + pub fn storage_sets_of(&self, position: &Hash) -> Vec> { self.layout() .versions .iter() diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 17465019..dc963ba0 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -139,7 +139,7 @@ impl LayoutManager { pub fn write_sets_of(self: &Arc, position: &Hash) -> WriteLock>> { let layout = self.layout(); let version = layout.current().version; - let nodes = layout.write_sets_of(position); + let nodes = layout.storage_sets_of(position); layout .ack_lock .get(&version) -- cgit v1.2.3 From 9cecea64d4509e95ac9793b29c947e2ecf9bb0b8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 14:27:53 +0100 Subject: layout: allow sync update tracker to progress with only quorums --- src/rpc/layout/helper.rs | 33 +++++++++++---- src/rpc/layout/history.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/layout/manager.rs | 18 +++++---- src/rpc/layout/schema.rs | 6 +-- 4 files changed, 141 insertions(+), 17 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 0aa7c6aa..eeaf4ffa 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::schema::*; +use crate::replication_mode::ReplicationMode; use crate::rpc_helper::RpcHelper; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] @@ -22,6 +23,7 @@ pub struct LayoutDigest { } pub struct LayoutHelper { + replication_mode: ReplicationMode, layout: Option, // cached values @@ -48,7 +50,23 @@ impl Deref for LayoutHelper { } impl LayoutHelper { - pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { + pub fn new( + replication_mode: ReplicationMode, + mut layout: LayoutHistory, + mut ack_lock: HashMap, + ) -> Self { + // In the new() function of the helper, we do a bunch of cleanup + // and calculations on the layout history to make sure things are + // correct and we have rapid access to important values such as + // the layout versions to use when reading to ensure consistency. + + if !replication_mode.is_read_after_write_consistent() { + // Fast path for when no consistency is required. + // In this case we only need to keep the last version of the layout, + // we don't care about coordinating stuff in the cluster. + layout.keep_current_version_only(); + } + layout.cleanup_old_versions(); let all_nodes = layout.get_all_nodes(); @@ -68,7 +86,7 @@ impl LayoutHelper { .ack_map .min_among(&all_nodes, min_version); - // sync_map_min is the minimum value of sync_map among all storage nodes + // sync_map_min is the minimum value of sync_map among storage nodes // in the cluster (non-gateway nodes only, current and previous layouts). // It is the highest layout version for which we know that all relevant // storage nodes have fullfilled a sync, and therefore it is safe to @@ -76,11 +94,10 @@ impl LayoutHelper { // Gateway nodes are excluded here because they hold no relevant data // (they store the bucket and access key tables, but we don't have // consistency on those). - // TODO: this value could take quorums into account instead. - let sync_map_min = layout - .update_trackers - .sync_map - .min_among(&all_nongateway_nodes, min_version); + // This value is calculated using quorums to allow progress even + // if not all nodes have successfully completed a sync. + let sync_map_min = + layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); @@ -91,6 +108,7 @@ impl LayoutHelper { .or_insert(AtomicUsize::new(0)); LayoutHelper { + replication_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -115,6 +133,7 @@ impl LayoutHelper { let changed = f(&mut self.layout.as_mut().unwrap()); if changed { *self = Self::new( + self.replication_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index c448ac24..a53256cc 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,6 +6,7 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; +use crate::replication_mode::ReplicationMode; impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { @@ -64,6 +65,13 @@ impl LayoutHistory { // ---- housekeeping (all invoked by LayoutHelper) ---- + pub(crate) fn keep_current_version_only(&mut self) { + while self.versions.len() > 1 { + let removed = self.versions.remove(0); + self.old_versions.push(removed); + } + } + pub(crate) fn cleanup_old_versions(&mut self) { // If there are invalid versions before valid versions, remove them if self.versions.len() > 1 && self.current().check().is_ok() { @@ -114,6 +122,99 @@ impl LayoutHistory { } } + pub(crate) fn calculate_sync_map_min_with_quorum( + &self, + replication_mode: ReplicationMode, + all_nongateway_nodes: &[Uuid], + ) -> u64 { + // This function calculates the minimum layout version from which + // it is safe to read if we want to maintain read-after-write consistency. + // In the general case the computation can be a bit expensive so + // we try to optimize it in several ways. + + // If there is only one layout version, we know that's the one + // we need to read from. + if self.versions.len() == 1 { + return self.current().version; + } + + let quorum = replication_mode.write_quorum(); + + let min_version = self.min_stored(); + let global_min = self + .update_trackers + .sync_map + .min_among(&all_nongateway_nodes, min_version); + + // If the write quorums are equal to the total number of nodes, + // i.e. no writes can succeed while they are not written to all nodes, + // then we must in all case wait for all nodes to complete a sync. + // This is represented by reading from the layout with version + // number global_min, the smallest layout version for which all nodes + // have completed a sync. + if quorum == self.current().replication_factor { + return global_min; + } + + // In the general case, we need to look at all write sets for all partitions, + // and find a safe layout version to read for that partition. We then + // take the minimum value among all partition as the safe layout version + // to read in all cases (the layout version to which all reads are directed). + let mut current_min = self.current().version; + let mut sets_done = HashSet::>::new(); + + for (_, p_hash) in self.current().partitions() { + for v in self.versions.iter() { + if v.version == self.current().version { + // We don't care about whether nodes in the latest layout version + // have completed a sync or not, as the sync is push-only + // and by definition nodes in the latest layout version do not + // hold data that must be pushed to nodes in the latest layout + // version, since that's the same version (any data that's + // already in the latest version is assumed to have been written + // by an operation that ensured a quorum of writes within + // that version). + continue; + } + + // Determine set of nodes for partition p in layout version v. + // Sort the node set to avoid duplicate computations. + let mut set = v + .nodes_of(&p_hash, v.replication_factor) + .collect::>(); + set.sort(); + + // If this set was already processed, skip it. + if sets_done.contains(&set) { + continue; + } + + // Find the value of the sync update trackers that is the + // highest possible minimum within a quorum of nodes. + let mut sync_values = set + .iter() + .map(|x| self.update_trackers.sync_map.get(x, min_version)) + .collect::>(); + sync_values.sort(); + let set_min = sync_values[sync_values.len() - quorum]; + if set_min < current_min { + current_min = set_min; + } + // defavorable case, we know we are at the smallest possible version, + // so we can stop early + assert!(current_min >= global_min); + if current_min == global_min { + return current_min; + } + + // Add set to already processed sets + sets_done.insert(set); + } + } + + current_min + } + pub(crate) fn calculate_trackers_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index dc963ba0..ec8a2a15 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -14,12 +14,13 @@ use garage_util::error::*; use garage_util::persister::Persister; use super::*; +use crate::replication_mode::ReplicationMode; use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { node_id: Uuid, - replication_factor: usize, + replication_mode: ReplicationMode, persist_cluster_layout: Persister, layout: Arc>, @@ -37,14 +38,16 @@ impl LayoutManager { node_id: NodeID, system_endpoint: Arc>, fullmesh: Arc, - replication_factor: usize, + replication_mode: ReplicationMode, ) -> Result, Error> { + let replication_factor = replication_mode.replication_factor(); + let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { - if x.current().replication_factor != replication_factor { + if x.current().replication_factor != replication_mode.replication_factor() { return Err(Error::Message(format!( "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", x.current().replication_factor, @@ -62,7 +65,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default()); + let mut cluster_layout = + LayoutHelper::new(replication_mode, cluster_layout, Default::default()); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -77,7 +81,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), - replication_factor, + replication_mode, persist_cluster_layout, layout, change_notify, @@ -291,11 +295,11 @@ impl LayoutManager { adv.update_trackers ); - if adv.current().replication_factor != self.replication_factor { + if adv.current().replication_factor != self.replication_mode.replication_factor() { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", adv.current().replication_factor, - self.replication_factor + self.replication_mode.replication_factor() ); error!("{}", msg); return Err(Error::Message(msg)); diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 49e84420..df949906 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -411,13 +411,13 @@ impl UpdateTracker { pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { storage_nodes .iter() - .map(|x| self.0.get(x).copied().unwrap_or(min_version)) + .map(|x| self.get(x, min_version)) .min() .unwrap_or(min_version) } - pub fn get(&self, node: &Uuid) -> u64 { - self.0.get(node).copied().unwrap_or(0) + pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { + self.0.get(node).copied().unwrap_or(min_version) } } -- cgit v1.2.3 From 063294dd569e10c6d85e29eb6507249eece00956 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Dec 2023 11:50:58 +0100 Subject: layout version: refactor get_node_zone --- src/rpc/layout/test.rs | 4 ++-- src/rpc/layout/version.rs | 47 +++++++++++++++++++++++------------------------ 2 files changed, 25 insertions(+), 26 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index bb072c97..88eb518e 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -34,8 +34,8 @@ fn check_against_naive(cl: &LayoutVersion) -> Result { zone_token.insert(z.clone(), 0); } for uuid in cl.nongateway_nodes() { - let z = cl.get_node_zone(&uuid)?; - let c = cl.get_node_capacity(&uuid).unwrap(); + let z = cl.expect_get_node_zone(&uuid); + let c = cl.expect_get_node_capacity(&uuid); zone_token.insert( z.to_string(), zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize), diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 947fab56..cbfbee94 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -70,6 +70,14 @@ impl LayoutVersion { } } + /// Given a node uuids, this function returns the label of its zone if it has one + pub fn get_node_zone(&self, uuid: &Uuid) -> Option<&str> { + match self.node_role(uuid) { + Some(role) => Some(&role.zone), + _ => None, + } + } + /// Returns the number of partitions associated to this node in the ring pub fn get_node_usage(&self, uuid: &Uuid) -> Result { for (i, id) in self.node_id_vec.iter().enumerate() { @@ -129,28 +137,22 @@ impl LayoutVersion { // ===================== internal information extractors ====================== - /// Given a node uuids, this function returns the label of its zone - pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { - match self.node_role(uuid) { - Some(role) => Ok(&role.zone), - _ => Err(Error::Message( - "The Uuid does not correspond to a node present in the cluster.".into(), - )), - } - } - - fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 { + pub(crate) fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 { self.get_node_capacity(&uuid) .expect("non-gateway node with zero capacity") } + pub(crate) fn expect_get_node_zone(&self, uuid: &Uuid) -> &str { + self.get_node_zone(&uuid).expect("node without a zone") + } + /// Returns the sum of capacities of non gateway nodes in the cluster - fn get_total_capacity(&self) -> Result { + fn get_total_capacity(&self) -> u64 { let mut total_capacity = 0; for uuid in self.nongateway_nodes() { total_capacity += self.expect_get_node_capacity(&uuid); } - Ok(total_capacity) + total_capacity } /// Returns the effective value of the zone_redundancy parameter @@ -227,10 +229,7 @@ impl LayoutVersion { // Check that every partition is spread over at least zone_redundancy zones. let zones_of_p = nodes_of_p .iter() - .map(|n| { - self.get_node_zone(&self.node_id_vec[*n as usize]) - .expect("Zone not found.") - }) + .map(|n| self.expect_get_node_zone(&self.node_id_vec[*n as usize])) .collect::>(); if zones_of_p.iter().unique().count() < zone_redundancy { return Err(format!( @@ -516,7 +515,7 @@ impl LayoutVersion { } let mut s_down = 1; - let mut s_up = self.get_total_capacity()?; + let mut s_up = self.get_total_capacity(); while s_down + 1 < s_up { g = self.generate_flow_graph( (s_down + s_up) / 2, @@ -586,7 +585,7 @@ impl LayoutVersion { } for n in 0..self.nongateway_nodes().len() { let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]); - let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; + let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[n])]; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; for p in 0..NB_PARTITIONS { if !exclude_assoc.contains(&(p, n)) { @@ -632,7 +631,7 @@ impl LayoutVersion { // The algorithm is such that it will start with the flow that we just computed // and find ameliorating paths from that. for (p, n) in exclude_edge.iter() { - let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]; + let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]; g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; } g.compute_maximal_flow()?; @@ -652,7 +651,7 @@ impl LayoutVersion { let mut cost = CostFunction::new(); for (p, assoc_p) in prev_assign.iter().enumerate() { for n in assoc_p.iter() { - let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]; + let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]; cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); } } @@ -707,7 +706,7 @@ impl LayoutVersion { let mut msg = Message::new(); let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; - let total_cap = self.get_total_capacity()?; + let total_cap = self.get_total_capacity(); let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); msg.push(format!( "Usable capacity / total cluster capacity: {} / {} ({:.1} %)", @@ -754,7 +753,7 @@ impl LayoutVersion { let mut old_zones_of_p = Vec::::new(); for n in prev_assign[p].iter() { old_zones_of_p - .push(zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]); + .push(zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]); } if !old_zones_of_p.contains(&z) { new_partitions_zone[z] += 1; @@ -796,7 +795,7 @@ impl LayoutVersion { for z in 0..id_to_zone.len() { let mut nodes_of_z = Vec::::new(); for n in 0..storing_nodes.len() { - if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { + if self.expect_get_node_zone(&self.node_id_vec[n]) == id_to_zone[z] { nodes_of_z.push(n); } } -- cgit v1.2.3 From 5dd200c015aed786173f0e11541b0505f95dd6d1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Dec 2023 12:02:24 +0100 Subject: layout: move block_read_nodes_of to rpc_helper to avoid double-locking (in theory, this could have caused a deadlock) --- src/rpc/layout/helper.rs | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index eeaf4ffa..147c8b4f 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -8,7 +8,6 @@ use garage_util::data::*; use super::schema::*; use crate::replication_mode::ReplicationMode; -use crate::rpc_helper::RpcHelper; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct LayoutDigest { @@ -155,6 +154,10 @@ impl LayoutHelper { self.ack_map_min } + pub fn all_sync(&self) -> u64 { + self.sync_map_min + } + pub fn sync_versions(&self) -> (u64, u64, u64) { ( self.layout().current().version, @@ -177,28 +180,6 @@ impl LayoutHelper { .collect() } - pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec { - let mut ret = Vec::with_capacity(12); - let ver_iter = self - .layout() - .versions - .iter() - .rev() - .chain(self.layout().old_versions.iter().rev()); - for ver in ver_iter { - if ver.version > self.sync_map_min { - continue; - } - let nodes = ver.nodes_of(position, ver.replication_factor); - for node in rpc_helper.request_order(nodes) { - if !ret.contains(&node) { - ret.push(node); - } - } - } - ret - } - pub fn storage_sets_of(&self, position: &Hash) -> Vec> { self.layout() .versions -- cgit v1.2.3 From 4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Dec 2023 14:15:52 +0100 Subject: layout: refactoring, merge two files --- src/rpc/layout/helper.rs | 2 +- src/rpc/layout/mod.rs | 441 +++++++++++++++++++++++++++++++++++++++++++++- src/rpc/layout/schema.rs | 431 -------------------------------------------- src/rpc/layout/version.rs | 1 - 4 files changed, 440 insertions(+), 435 deletions(-) delete mode 100644 src/rpc/layout/schema.rs (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 147c8b4f..2ba010b8 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -use super::schema::*; +use super::*; use crate::replication_mode::ReplicationMode; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index eb127fda..facdb2ce 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,7 +1,13 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; +use garage_util::data::Uuid; + mod graph_algo; mod helper; mod history; -mod schema; mod version; #[cfg(test)] @@ -13,7 +19,6 @@ pub mod manager; pub use helper::{LayoutDigest, LayoutHelper}; pub use manager::WriteLock; -pub use schema::*; pub use version::*; // ---- defines: partitions ---- @@ -39,3 +44,435 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; // Change this to u16 the day we want to have more than 256 nodes in a cluster pub type CompactNodeType = u8; pub const MAX_NODE_NUMBER: usize = 256; + +// ======== actual data structures for the layout data ======== +// ======== that is persisted to disk ======== +// some small utility impls are at the end of this file, +// but most of the code that actually computes stuff is in +// version.rs, history.rs and helper.rs + +mod v08 { + use crate::layout::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap, + + // see comments in v010::ClusterLayout + pub node_id_vec: Vec, + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option, + /// A set of tags to recognize the node + pub tags: Vec, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} +} + +mod v09 { + use super::v08; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap, + + // see comments in v010::ClusterLayout + pub node_id_vec: Vec, + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: ZoneRedundancy, + } + + /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies + /// of each partition on at least that number of different zones. + /// Otherwise, copies will be stored on the maximum possible number of zones. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ZoneRedundancy { + AtLeast(usize), + Maximum, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"G09layout"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // By default, zone_redundancy is maximum possible value + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), // will be set in the next migration + } + } + } + + fn multiply_all_capacities( + old_roles: LwwMap, + mul: u64, + ) -> LwwMap { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap *= mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } +} + +mod v010 { + use super::v09; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + + /// Number of old (non-live) versions to keep, see LayoutHistory::old_versions + pub const OLD_VERSION_COUNT: usize = 5; + + /// The history of cluster layouts, with trackers to keep a record + /// of which nodes are up-to-date to current cluster data + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutHistory { + /// The versions currently in use in the cluster + pub versions: Vec, + /// At most 5 of the previous versions, not used by the garage_table + /// module, but usefull for the garage_block module to find data blocks + /// that have not yet been moved + pub old_versions: Vec, + + /// Update trackers + pub update_trackers: UpdateTrackers, + + /// Staged changes for the next version + pub staging: Lww, + } + + /// A version of the layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutVersion { + /// The number of this version + pub version: u64, + + /// Roles assigned to nodes in this version + pub roles: LwwMap, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + /// The number of replicas for each data partition + pub replication_factor: usize, + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec, + /// number of non-gateway nodes, which are the first ids in node_id_vec + pub nongateway_node_count: usize, + /// The assignation of data partitions to nodes, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec, + } + + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap, + } + + /// The tracker of acknowlegments and data syncs around the cluster + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] + pub struct UpdateTrackers { + /// The highest layout version number each node has ack'ed + pub ack_map: UpdateTracker, + /// The highest layout version number each node has synced data for + pub sync_map: UpdateTracker, + /// The highest layout version number each node has + /// ack'ed that all other nodes have synced data for + pub sync_ack_map: UpdateTracker, + } + + /// Generic update tracker struct + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] + pub struct UpdateTracker(pub BTreeMap); + + impl garage_util::migrate::Migrate for LayoutHistory { + const VERSION_MARKER: &'static [u8] = b"G010lh"; + + type Previous = v09::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + let nongateway_node_count = previous + .node_id_vec + .iter() + .enumerate() + .filter(|(_, uuid)| { + let role = previous.roles.get(uuid); + matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) + }) + .map(|(i, _)| i + 1) + .max() + .unwrap_or(0); + + let version = LayoutVersion { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size: previous.partition_size, + parameters: previous.parameters, + roles: previous.roles, + node_id_vec: previous.node_id_vec, + nongateway_node_count, + ring_assignment_data: previous.ring_assignment_data, + }; + let update_tracker = UpdateTracker( + version + .nongateway_nodes() + .iter() + .copied() + .map(|x| (x, version.version)) + .collect::>(), + ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; + Self { + versions: vec![version], + old_versions: vec![], + update_trackers: UpdateTrackers { + ack_map: update_tracker.clone(), + sync_map: update_tracker.clone(), + sync_ack_map: update_tracker.clone(), + }, + staging: Lww::raw(previous.version, staging), + } + } + } +} + +pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) -> bool { + let mut changed = false; + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + if *v > *v_mut { + *v_mut = *v; + changed = true; + } + } else { + self.0.insert(*k, *v); + changed = true; + } + } + changed + } + + /// This bumps the update tracker for a given node up to the specified value. + /// This has potential impacts on the correctness of Garage and should only + /// be used in very specific circumstances. + pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { + match self.0.get_mut(&peer) { + Some(e) if *e < value => { + *e = value; + true + } + None => { + self.0.insert(peer, value); + true + } + _ => false, + } + } + + pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + storage_nodes + .iter() + .map(|x| self.get(x, min_version)) + .min() + .unwrap_or(min_version) + } + + pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { + self.0.get(node).copied().unwrap_or(min_version) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { + let c1 = self.ack_map.merge(&other.ack_map); + let c2 = self.sync_map.merge(&other.sync_map); + let c3 = self.sync_ack_map.merge(&other.sync_ack_map); + c1 || c2 || c3 + } +} diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs deleted file mode 100644 index df949906..00000000 --- a/src/rpc/layout/schema.rs +++ /dev/null @@ -1,431 +0,0 @@ -use std::fmt; - -use bytesize::ByteSize; - -use garage_util::crdt::{AutoCrdt, Crdt}; -use garage_util::data::Uuid; - -mod v08 { - use crate::layout::CompactNodeType; - use garage_util::crdt::LwwMap; - use garage_util::data::{Hash, Uuid}; - use serde::{Deserialize, Serialize}; - - /// The layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - pub roles: LwwMap, - - /// node_id_vec: a vector of node IDs with a role assigned - /// in the system (this includes gateway nodes). - /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers - /// 2. nodes that don't have a role are excluded (but they need to - /// stay in the CRDT as tombstones) - pub node_id_vec: Vec, - /// the assignation of data partitions to node, the values - /// are indices in node_id_vec - #[serde(with = "serde_bytes")] - pub ring_assignation_data: Vec, - - /// Role changes which are staged for the next version of the layout - pub staging: LwwMap, - pub staging_hash: Hash, - } - - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct NodeRoleV(pub Option); - - /// The user-assigned roles of cluster nodes - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct NodeRole { - /// Datacenter at which this entry belong. This information is used to - /// perform a better geodistribution - pub zone: String, - /// The capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option, - /// A set of tags to recognize the node - pub tags: Vec, - } - - impl garage_util::migrate::InitialFormat for ClusterLayout {} -} - -mod v09 { - use super::v08; - use crate::layout::CompactNodeType; - use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::{Hash, Uuid}; - use serde::{Deserialize, Serialize}; - pub use v08::{NodeRole, NodeRoleV}; - - /// The layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignment currently given by - /// ring_assignment_data - pub parameters: LayoutParameters, - - pub roles: LwwMap, - - /// see comment in v08::ClusterLayout - pub node_id_vec: Vec, - /// see comment in v08::ClusterLayout - #[serde(with = "serde_bytes")] - pub ring_assignment_data: Vec, - - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap, - pub staging_hash: Hash, - } - - /// This struct is used to set the parameters to be used in the assignment computation - /// algorithm. It is stored as a Crdt. - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] - pub struct LayoutParameters { - pub zone_redundancy: ZoneRedundancy, - } - - /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies - /// of each partition on at least that number of different zones. - /// Otherwise, copies will be stored on the maximum possible number of zones. - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] - pub enum ZoneRedundancy { - AtLeast(usize), - Maximum, - } - - impl garage_util::migrate::Migrate for ClusterLayout { - const VERSION_MARKER: &'static [u8] = b"G09layout"; - - type Previous = v08::ClusterLayout; - - fn migrate(previous: Self::Previous) -> Self { - use itertools::Itertools; - - // In the old layout, capacities are in an arbitrary unit, - // but in the new layout they are in bytes. - // Here we arbitrarily multiply everything by 1G, - // such that 1 old capacity unit = 1GB in the new units. - // This is totally arbitrary and won't work for most users. - let cap_mul = 1024 * 1024 * 1024; - let roles = multiply_all_capacities(previous.roles, cap_mul); - let staging_roles = multiply_all_capacities(previous.staging, cap_mul); - let node_id_vec = previous.node_id_vec; - - // Determine partition size - let mut tmp = previous.ring_assignation_data.clone(); - tmp.sort(); - let partition_size = tmp - .into_iter() - .dedup_with_count() - .map(|(npart, node)| { - roles - .get(&node_id_vec[node as usize]) - .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) - .unwrap_or(0) / npart as u64 - }) - .min() - .unwrap_or(0); - - // By default, zone_redundancy is maximum possible value - let parameters = LayoutParameters { - zone_redundancy: ZoneRedundancy::Maximum, - }; - - Self { - version: previous.version, - replication_factor: previous.replication_factor, - partition_size, - parameters, - roles, - node_id_vec, - ring_assignment_data: previous.ring_assignation_data, - staging_parameters: Lww::new(parameters), - staging_roles, - staging_hash: [0u8; 32].into(), // will be set in the next migration - } - } - } - - fn multiply_all_capacities( - old_roles: LwwMap, - mul: u64, - ) -> LwwMap { - let mut new_roles = LwwMap::new(); - for (node, ts, role) in old_roles.items() { - let mut role = role.clone(); - if let NodeRoleV(Some(NodeRole { - capacity: Some(ref mut cap), - .. - })) = role - { - *cap *= mul; - } - new_roles.merge_raw(node, *ts, &role); - } - new_roles - } -} - -mod v010 { - use super::v09; - use crate::layout::CompactNodeType; - use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::Uuid; - use serde::{Deserialize, Serialize}; - use std::collections::BTreeMap; - pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; - - pub const OLD_VERSION_COUNT: usize = 5; - - /// The history of cluster layouts, with trackers to keep a record - /// of which nodes are up-to-date to current cluster data - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutHistory { - /// The versions currently in use in the cluster - pub versions: Vec, - /// At most 5 of the previous versions, not used by the garage_table - /// module, but usefull for the garage_block module to find data blocks - /// that have not yet been moved - pub old_versions: Vec, - - /// Update trackers - pub update_trackers: UpdateTrackers, - - /// Staged changes for the next version - pub staging: Lww, - } - - /// A version of the layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutVersion { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignment currently given by - /// ring_assignment_data - pub parameters: LayoutParameters, - - pub roles: LwwMap, - - /// see comment in v08::ClusterLayout - pub node_id_vec: Vec, - /// number of non-gateway nodes, which are the first ids in node_id_vec - pub nongateway_node_count: usize, - /// see comment in v08::ClusterLayout - #[serde(with = "serde_bytes")] - pub ring_assignment_data: Vec, - } - - /// The staged changes for the next layout version - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutStaging { - /// Parameters to be used in the next partition assignment computation. - pub parameters: Lww, - /// Role changes which are staged for the next version of the layout - pub roles: LwwMap, - } - - /// The tracker of acknowlegments and data syncs around the cluster - #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTrackers { - /// The highest layout version number each node has ack'ed - pub ack_map: UpdateTracker, - /// The highest layout version number each node has synced data for - pub sync_map: UpdateTracker, - /// The highest layout version number each node has - /// ack'ed that all other nodes have synced data for - pub sync_ack_map: UpdateTracker, - } - - /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker(pub BTreeMap); - - impl garage_util::migrate::Migrate for LayoutHistory { - const VERSION_MARKER: &'static [u8] = b"G010lh"; - - type Previous = v09::ClusterLayout; - - fn migrate(previous: Self::Previous) -> Self { - let nongateway_node_count = previous - .node_id_vec - .iter() - .enumerate() - .filter(|(_, uuid)| { - let role = previous.roles.get(uuid); - matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) - }) - .map(|(i, _)| i + 1) - .max() - .unwrap_or(0); - - let version = LayoutVersion { - version: previous.version, - replication_factor: previous.replication_factor, - partition_size: previous.partition_size, - parameters: previous.parameters, - roles: previous.roles, - node_id_vec: previous.node_id_vec, - nongateway_node_count, - ring_assignment_data: previous.ring_assignment_data, - }; - let update_tracker = UpdateTracker( - version - .nongateway_nodes() - .iter() - .copied() - .map(|x| (x, version.version)) - .collect::>(), - ); - let staging = LayoutStaging { - parameters: previous.staging_parameters, - roles: previous.staging_roles, - }; - Self { - versions: vec![version], - old_versions: vec![], - update_trackers: UpdateTrackers { - ack_map: update_tracker.clone(), - sync_map: update_tracker.clone(), - sync_ack_map: update_tracker.clone(), - }, - staging: Lww::raw(previous.version, staging), - } - } - } -} - -pub use v010::*; - -// ---- utility functions ---- - -impl AutoCrdt for LayoutParameters { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for NodeRoleV { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for LayoutStaging { - fn merge(&mut self, other: &LayoutStaging) { - self.parameters.merge(&other.parameters); - self.roles.merge(&other.roles); - } -} - -impl NodeRole { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } - } - - pub fn tags_string(&self) -> String { - self.tags.join(",") - } -} - -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - -impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) -> bool { - let mut changed = false; - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { - if *v > *v_mut { - *v_mut = *v; - changed = true; - } - } else { - self.0.insert(*k, *v); - changed = true; - } - } - changed - } - - /// This bumps the update tracker for a given node up to the specified value. - /// This has potential impacts on the correctness of Garage and should only - /// be used in very specific circumstances. - pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { - match self.0.get_mut(&peer) { - Some(e) if *e < value => { - *e = value; - true - } - None => { - self.0.insert(peer, value); - true - } - _ => false, - } - } - - pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { - storage_nodes - .iter() - .map(|x| self.get(x, min_version)) - .min() - .unwrap_or(min_version) - } - - pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { - self.0.get(node).copied().unwrap_or(min_version) - } -} - -impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { - let c1 = self.ack_map.merge(&other.ack_map); - let c2 = self.sync_map.merge(&other.sync_map); - let c3 = self.sync_ack_map.merge(&other.sync_ack_map); - c1 || c2 || c3 - } -} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index cbfbee94..5b307156 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -10,7 +10,6 @@ use garage_util::data::*; use garage_util::error::*; use super::graph_algo::*; -use super::schema::*; use super::*; // The Message type will be used to collect information on the algorithm. -- cgit v1.2.3 From 85b5a6bcd11c0a7651e4c589569e1935a3d18e46 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 15:31:47 +0100 Subject: fix some clippy lints --- src/rpc/layout/helper.rs | 2 +- src/rpc/layout/history.rs | 14 ++++++-------- src/rpc/layout/mod.rs | 2 +- src/rpc/layout/version.rs | 6 +++--- 4 files changed, 11 insertions(+), 13 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 2ba010b8..7e5d37e9 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -129,7 +129,7 @@ impl LayoutHelper { where F: FnOnce(&mut LayoutHistory) -> bool, { - let changed = f(&mut self.layout.as_mut().unwrap()); + let changed = f(self.layout.as_mut().unwrap()); if changed { *self = Self::new( self.replication_mode, diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index a53256cc..23196aee 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -42,8 +42,7 @@ impl LayoutHistory { let set = self .versions .iter() - .map(|x| x.all_nodes()) - .flatten() + .flat_map(|x| x.all_nodes()) .collect::>(); set.into_iter().copied().collect::>() } @@ -56,8 +55,7 @@ impl LayoutHistory { let set = self .versions .iter() - .map(|x| x.nongateway_nodes()) - .flatten() + .flat_map(|x| x.nongateway_nodes()) .collect::>(); set.into_iter().copied().collect::>() } @@ -94,7 +92,7 @@ impl LayoutHistory { let sync_ack_map_min = self .update_trackers .sync_ack_map - .min_among(¤t_nodes, min_version); + .min_among(current_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); info!( @@ -144,7 +142,7 @@ impl LayoutHistory { let global_min = self .update_trackers .sync_map - .min_among(&all_nongateway_nodes, min_version); + .min_among(all_nongateway_nodes, min_version); // If the write quorums are equal to the total number of nodes, // i.e. no writes can succeed while they are not written to all nodes, @@ -281,7 +279,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let (new_version, msg) = self .current() .clone() - .calculate_next_version(&self.staging.get())?; + .calculate_next_version(self.staging.get())?; self.versions.push(new_version); self.cleanup_old_versions(); @@ -297,7 +295,7 @@ To know the correct value of the new layout version, invoke `garage layout show` pub fn revert_staged_changes(mut self) -> Result { self.staging.update(LayoutStaging { - parameters: Lww::new(self.current().parameters.clone()), + parameters: Lww::new(self.current().parameters), roles: LwwMap::new(), }); diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index facdb2ce..162e3c6e 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -357,7 +357,7 @@ mod v010 { update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), - sync_ack_map: update_tracker.clone(), + sync_ack_map: update_tracker, }, staging: Lww::raw(previous.version, staging), } diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 5b307156..ee4b2821 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -137,19 +137,19 @@ impl LayoutVersion { // ===================== internal information extractors ====================== pub(crate) fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 { - self.get_node_capacity(&uuid) + self.get_node_capacity(uuid) .expect("non-gateway node with zero capacity") } pub(crate) fn expect_get_node_zone(&self, uuid: &Uuid) -> &str { - self.get_node_zone(&uuid).expect("node without a zone") + self.get_node_zone(uuid).expect("node without a zone") } /// Returns the sum of capacities of non gateway nodes in the cluster fn get_total_capacity(&self) -> u64 { let mut total_capacity = 0; for uuid in self.nongateway_nodes() { - total_capacity += self.expect_get_node_capacity(&uuid); + total_capacity += self.expect_get_node_capacity(uuid); } total_capacity } -- cgit v1.2.3 From adccce1145d5d82581e4a5da707be35badb2d5a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 15:45:14 +0100 Subject: layout: refactor/fix bad while loop --- src/rpc/layout/history.rs | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 23196aee..b8cc27da 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -86,23 +86,20 @@ impl LayoutHistory { // remove them (keep them in self.old_versions). // ASSUMPTION: we only care about where nodes in the current layout version // are reading from, as we assume older nodes are being discarded. - while self.versions.len() > 1 { - let current_nodes = &self.current().node_id_vec; - let min_version = self.min_stored(); - let sync_ack_map_min = self - .update_trackers - .sync_ack_map - .min_among(current_nodes, min_version); - if self.min_stored() < sync_ack_map_min { - let removed = self.versions.remove(0); - info!( - "Layout history: moving version {} to old_versions", - removed.version - ); - self.old_versions.push(removed); - } else { - break; - } + let current_nodes = &self.current().node_id_vec; + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min_among(current_nodes, min_version); + while self.min_stored() < sync_ack_map_min { + assert!(self.versions.len() > 1); + let removed = self.versions.remove(0); + info!( + "Layout history: moving version {} to old_versions", + removed.version + ); + self.old_versions.push(removed); } while self.old_versions.len() > OLD_VERSION_COUNT { -- cgit v1.2.3 From 0041b013a473e3ae72f50209d8f79db75a72848b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 16:09:22 +0100 Subject: layout: refactoring and fix in layout helper --- src/rpc/layout/helper.rs | 43 +++++++++++++++++++++++++------------------ src/rpc/layout/manager.rs | 2 +- src/rpc/layout/mod.rs | 2 +- 3 files changed, 27 insertions(+), 20 deletions(-) (limited to 'src/rpc/layout') diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 7e5d37e9..9fb738ea 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -10,7 +10,7 @@ use super::*; use crate::replication_mode::ReplicationMode; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] -pub struct LayoutDigest { +pub struct RpcLayoutDigest { /// Cluster layout version pub current_version: u64, /// Number of active layout versions @@ -21,6 +21,13 @@ pub struct LayoutDigest { pub staging_hash: Hash, } +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct SyncLayoutDigest { + current: u64, + ack_map_min: u64, + min_stored: u64, +} + pub struct LayoutHelper { replication_mode: ReplicationMode, layout: Option, @@ -150,20 +157,20 @@ impl LayoutHelper { &self.all_nongateway_nodes } - pub fn all_ack(&self) -> u64 { + pub fn ack_map_min(&self) -> u64 { self.ack_map_min } - pub fn all_sync(&self) -> u64 { + pub fn sync_map_min(&self) -> u64 { self.sync_map_min } - pub fn sync_versions(&self) -> (u64, u64, u64) { - ( - self.layout().current().version, - self.all_ack(), - self.layout().min_stored(), - ) + pub fn sync_digest(&self) -> SyncLayoutDigest { + SyncLayoutDigest { + current: self.layout().current().version, + ack_map_min: self.ack_map_min(), + min_stored: self.layout().min_stored(), + } } pub fn read_nodes_of(&self, position: &Hash) -> Vec { @@ -206,8 +213,8 @@ impl LayoutHelper { self.staging_hash } - pub fn digest(&self) -> LayoutDigest { - LayoutDigest { + pub fn digest(&self) -> RpcLayoutDigest { + RpcLayoutDigest { current_version: self.current().version, active_versions: self.versions.len(), trackers_hash: self.trackers_hash, @@ -231,13 +238,13 @@ impl LayoutHelper { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(local_node_id); - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + debug!("ack_map: {:?}", self.update_trackers.ack_map); + debug!("sync_map: {:?}", self.update_trackers.sync_map); + debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); } fn sync_first(&mut self, local_node_id: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; + let first_version = self.min_stored(); self.update(|layout| { layout .update_trackers @@ -275,13 +282,13 @@ impl LayoutHelper { .versions .iter() .map(|x| x.version) - .take_while(|v| { + .skip_while(|v| { self.ack_lock .get(v) .map(|x| x.load(Ordering::Relaxed) == 0) .unwrap_or(true) }) - .max() - .unwrap_or(self.min_stored()) + .next() + .unwrap_or(self.current().version) } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index ec8a2a15..6747b79d 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -256,7 +256,7 @@ impl LayoutManager { // ---- RPC HANDLERS ---- - pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, remote: &LayoutDigest) { + pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, remote: &RpcLayoutDigest) { let local = self.layout().digest(); if remote.current_version > local.current_version || remote.active_versions != local.active_versions diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 162e3c6e..33676c37 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -17,7 +17,7 @@ pub mod manager; // ---- re-exports ---- -pub use helper::{LayoutDigest, LayoutHelper}; +pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest}; pub use manager::WriteLock; pub use version::*; -- cgit v1.2.3