aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/k2v/index.rs7
-rw-r--r--src/model/index_counter.rs4
-rw-r--r--src/rpc/layout/history.rs30
-rw-r--r--src/rpc/layout/schema.rs17
-rw-r--r--src/rpc/layout/version.rs98
5 files changed, 95 insertions, 61 deletions
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 3c2f51a9..c189232a 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -25,8 +25,11 @@ pub async fn handle_read_index(
) -> Result<Response<Body>, Error> {
let reverse = reverse.unwrap_or(false);
- // TODO: not only current
- let node_id_vec = garage.system.cluster_layout().current().node_ids().to_vec();
+ let node_id_vec = garage
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .into_owned();
let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table,
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 9637cc4c..2d968733 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -84,8 +84,8 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> {
- let nodes = &layout.current().node_id_vec[..];
- self.filtered_values_with_nodes(nodes)
+ let nodes = layout.all_nongateway_nodes();
+ self.filtered_values_with_nodes(&nodes)
}
pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 050f5d0a..877ad3a7 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,3 +1,4 @@
+use std::borrow::Cow;
use std::collections::HashSet;
use garage_util::crdt::{Crdt, Lww, LwwMap};
@@ -59,13 +60,19 @@ impl LayoutHistory {
(self.current().version, self.all_ack(), self.min_stored())
}
- pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> {
+ pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> {
// TODO: cache this
- self.versions
- .iter()
- .map(|x| x.nongateway_nodes())
- .flatten()
- .collect::<HashSet<_>>()
+ if self.versions.len() == 1 {
+ self.versions[0].nongateway_nodes().into()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .map(|x| x.nongateway_nodes())
+ .flatten()
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>().into()
+ }
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
@@ -202,14 +209,11 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
// Compute new version and add it to history
- let mut new_version = self.current().clone();
- new_version.version += 1;
-
- new_version.roles.merge(&self.staging.get().roles);
- new_version.roles.retain(|(_, _, v)| v.0.is_some());
- new_version.parameters = *self.staging.get().parameters.get();
+ let (new_version, msg) = self
+ .current()
+ .clone()
+ .calculate_next_version(&self.staging.get())?;
- let msg = new_version.calculate_partition_assignment()?;
self.versions.push(new_version);
if self.current().check().is_ok() {
while self.versions.first().unwrap().check().is_err() {
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index 89f5c361..db298ee6 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -212,6 +212,8 @@ mod v010 {
/// see comment in v08::ClusterLayout
pub node_id_vec: Vec<Uuid>,
+ /// number of non-gateway nodes, which are the first ids in node_id_vec
+ pub nongateway_node_count: usize,
/// see comment in v08::ClusterLayout
#[serde(with = "serde_bytes")]
pub ring_assignment_data: Vec<CompactNodeType>,
@@ -265,6 +267,18 @@ mod v010 {
type Previous = v09::ClusterLayout;
fn migrate(previous: Self::Previous) -> Self {
+ let nongateway_node_count = previous
+ .node_id_vec
+ .iter()
+ .enumerate()
+ .filter(|(_, uuid)| {
+ let role = previous.roles.get(uuid);
+ matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
+ })
+ .map(|(i, _)| i)
+ .max()
+ .unwrap_or(0);
+
let version = LayoutVersion {
version: previous.version,
replication_factor: previous.replication_factor,
@@ -272,11 +286,14 @@ mod v010 {
parameters: previous.parameters,
roles: previous.roles,
node_id_vec: previous.node_id_vec,
+ nongateway_node_count,
ring_assignment_data: previous.ring_assignment_data,
};
let update_tracker = UpdateTracker(
version
.nongateway_nodes()
+ .iter()
+ .copied()
.map(|x| (x, version.version))
.collect::<BTreeMap<Uuid, u64>>(),
);
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index ffbdf277..a7f387b6 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -5,7 +5,7 @@ use std::convert::TryInto;
use bytesize::ByteSize;
use itertools::Itertools;
-use garage_util::crdt::LwwMap;
+use garage_util::crdt::{Crdt, LwwMap};
use garage_util::data::*;
use garage_util::error::*;
@@ -30,6 +30,7 @@ impl LayoutVersion {
partition_size: 0,
roles: LwwMap::new(),
node_id_vec: Vec::new(),
+ nongateway_node_count: 0,
ring_assignment_data: Vec::new(),
parameters,
}
@@ -43,6 +44,11 @@ impl LayoutVersion {
&self.node_id_vec[..]
}
+ /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
+ pub fn nongateway_nodes(&self) -> &[Uuid] {
+ &self.node_id_vec[..self.nongateway_node_count]
+ }
+
pub fn num_nodes(&self) -> usize {
self.node_id_vec.len()
}
@@ -56,18 +62,14 @@ impl LayoutVersion {
}
/// Given a node uuids, this function returns its capacity or fails if it does not have any
- pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
+ pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> {
match self.node_role(uuid) {
Some(NodeRole {
capacity: Some(cap),
zone: _,
tags: _,
- }) => Ok(*cap),
- _ => Err(Error::Message(
- "The Uuid does not correspond to a node present in the \
- cluster or this node does not have a positive capacity."
- .into(),
- )),
+ }) => Some(*cap),
+ _ => None,
}
}
@@ -131,17 +133,6 @@ impl LayoutVersion {
// ===================== internal information extractors ======================
- /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
- pub(crate) fn nongateway_nodes(&self) -> impl Iterator<Item = Uuid> + '_ {
- self.node_id_vec
- .iter()
- .copied()
- .filter(move |uuid| match self.node_role(uuid) {
- Some(role) if role.capacity.is_some() => true,
- _ => false,
- })
- }
-
/// Given a node uuids, this function returns the label of its zone
pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> {
match self.node_role(uuid) {
@@ -152,11 +143,16 @@ impl LayoutVersion {
}
}
+ fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 {
+ self.get_node_capacity(&uuid)
+ .expect("non-gateway node with zero capacity")
+ }
+
/// Returns the sum of capacities of non gateway nodes in the cluster
fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0;
for uuid in self.nongateway_nodes() {
- total_capacity += self.get_node_capacity(&uuid)?;
+ total_capacity += self.expect_get_node_capacity(&uuid);
}
Ok(total_capacity)
}
@@ -257,7 +253,7 @@ impl LayoutVersion {
if *usage > 0 {
let uuid = self.node_id_vec[n];
let partusage = usage * self.partition_size;
- let nodecap = self.get_node_capacity(&uuid).unwrap();
+ let nodecap = self.expect_get_node_capacity(&uuid);
if partusage > nodecap {
return Err(format!(
"node usage ({}) is bigger than node capacity ({})",
@@ -288,6 +284,21 @@ impl LayoutVersion {
// ================== updates to layout, internals ===================
+ pub(crate) fn calculate_next_version(
+ mut self,
+ staging: &LayoutStaging,
+ ) -> Result<(Self, Message), Error> {
+ self.version += 1;
+
+ self.roles.merge(&staging.roles);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+ self.parameters = *staging.parameters.get();
+
+ let msg = self.calculate_partition_assignment()?;
+
+ Ok((self, msg))
+ }
+
/// This function calculates a new partition-to-node assignment.
/// The computed assignment respects the node replication factor
/// and the zone redundancy parameter It maximizes the capacity of a
@@ -297,7 +308,7 @@ impl LayoutVersion {
/// data to be moved.
/// Staged role changes must be merged with nodes roles before calling this function,
/// hence it must only be called from apply_staged_changes() and hence is not public.
- pub(crate) fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
+ fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
// We update the node ids, since the node role list might have changed with the
// changes in the layout. We retrieve the old_assignment reframed with new ids
let old_assignment_opt = self.update_node_id_vec()?;
@@ -317,12 +328,12 @@ impl LayoutVersion {
// to use them as indices in the flow graphs.
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
- let nb_nongateway_nodes = self.nongateway_nodes().count();
- if nb_nongateway_nodes < self.replication_factor {
+ if self.nongateway_nodes().len() < self.replication_factor {
return Err(Error::Message(format!(
"The number of nodes with positive \
capacity ({}) is smaller than the replication factor ({}).",
- nb_nongateway_nodes, self.replication_factor
+ self.nongateway_nodes().len(),
+ self.replication_factor
)));
}
if id_to_zone.len() < zone_redundancy {
@@ -420,12 +431,14 @@ impl LayoutVersion {
.map(|(k, _, _)| *k)
.collect();
- let mut new_node_id_vec = Vec::<Uuid>::new();
- new_node_id_vec.extend(new_non_gateway_nodes);
- new_node_id_vec.extend(new_gateway_nodes);
+ let old_node_id_vec = std::mem::take(&mut self.node_id_vec);
+
+ self.nongateway_node_count = new_non_gateway_nodes.len();
+ self.node_id_vec.clear();
+ self.node_id_vec.extend(new_non_gateway_nodes);
+ self.node_id_vec.extend(new_gateway_nodes);
- let old_node_id_vec = self.node_id_vec.clone();
- self.node_id_vec = new_node_id_vec.clone();
+ let new_node_id_vec = &self.node_id_vec;
// (2) We retrieve the old association
// We rewrite the old association with the new indices. We only consider partition
@@ -464,7 +477,7 @@ impl LayoutVersion {
}
}
- // We write the ring
+ // We clear the ring assignemnt data
self.ring_assignment_data = Vec::<CompactNodeType>::new();
Ok(Some(old_assignment))
@@ -478,8 +491,7 @@ impl LayoutVersion {
let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new();
- let nongateway_nodes = self.nongateway_nodes().collect::<Vec<_>>();
- for uuid in nongateway_nodes.iter() {
+ for uuid in self.nongateway_nodes().iter() {
let r = self.node_role(uuid).unwrap();
if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() {
zone_to_id.insert(r.zone.clone(), id_to_zone.len());
@@ -556,10 +568,8 @@ impl LayoutVersion {
exclude_assoc: &HashSet<(usize, usize)>,
zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> {
- let vertices = LayoutVersion::generate_graph_vertices(
- zone_to_id.len(),
- self.nongateway_nodes().count(),
- );
+ let vertices =
+ LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS {
@@ -578,8 +588,8 @@ impl LayoutVersion {
)?;
}
}
- for n in 0..self.nongateway_nodes().count() {
- let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
+ for n in 0..self.nongateway_nodes().len() {
+ let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]);
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS {
@@ -602,7 +612,7 @@ impl LayoutVersion {
// previous assignment
let mut exclude_edge = HashSet::<(usize, usize)>::new();
if let Some(prev_assign) = prev_assign_opt {
- let nb_nodes = self.nongateway_nodes().count();
+ let nb_nodes = self.nongateway_nodes().len();
for (p, prev_assign_p) in prev_assign.iter().enumerate() {
for n in 0..nb_nodes {
exclude_edge.insert((p, n));
@@ -654,7 +664,7 @@ impl LayoutVersion {
// We compute the maximal length of a simple path in gflow. It is used in the
// Bellman-Ford algorithm in optimize_flow_with_cost to set the number
// of iterations.
- let nb_nodes = self.nongateway_nodes().count();
+ let nb_nodes = self.nongateway_nodes().len();
let path_length = 4 * nb_nodes;
gflow.optimize_flow_with_cost(&cost, path_length)?;
@@ -732,7 +742,7 @@ impl LayoutVersion {
}
// We define and fill in the following tables
- let storing_nodes = self.nongateway_nodes().collect::<Vec<_>>();
+ let storing_nodes = self.nongateway_nodes();
let mut new_partitions = vec![0; storing_nodes.len()];
let mut stored_partitions = vec![0; storing_nodes.len()];
@@ -804,13 +814,13 @@ impl LayoutVersion {
let available_cap_z: u64 = self.partition_size * replicated_partitions as u64;
let mut total_cap_z = 0;
for n in nodes_of_z.iter() {
- total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?;
+ total_cap_z += self.expect_get_node_capacity(&self.node_id_vec[*n]);
}
let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32);
for n in nodes_of_z.iter() {
let available_cap_n = stored_partitions[*n] as u64 * self.partition_size;
- let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?;
+ let total_cap_n = self.expect_get_node_capacity(&self.node_id_vec[*n]);
let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string();
table.push(format!(
" {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)",